Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 703fa9ab

History | View | Annotate | Download (51.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45

    
46

    
47
ON_OPT = cli_option("--on", default=False,
48
                    action="store_true", dest="on",
49
                    help="Recover from an EPO")
50

    
51
GROUPS_OPT = cli_option("--groups", default=False,
52
                    action="store_true", dest="groups",
53
                    help="Arguments are node groups instead of nodes")
54

    
55
SHOW_MACHINE_OPT = cli_option("-M", "--show-machine-names", default=False,
56
                              action="store_true",
57
                              help="Show machine name for every line in output")
58

    
59
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
60
_EPO_PING_TIMEOUT = 1 # 1 second
61
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
62

    
63

    
64
@UsesRPC
65
def InitCluster(opts, args):
66
  """Initialize the cluster.
67

68
  @param opts: the command line options selected by the user
69
  @type args: list
70
  @param args: should contain only one element, the desired
71
      cluster name
72
  @rtype: int
73
  @return: the desired exit code
74

75
  """
76
  if not opts.lvm_storage and opts.vg_name:
77
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
78
    return 1
79

    
80
  vg_name = opts.vg_name
81
  if opts.lvm_storage and not opts.vg_name:
82
    vg_name = constants.DEFAULT_VG
83

    
84
  if not opts.drbd_storage and opts.drbd_helper:
85
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
86
    return 1
87

    
88
  drbd_helper = opts.drbd_helper
89
  if opts.drbd_storage and not opts.drbd_helper:
90
    drbd_helper = constants.DEFAULT_DRBD_HELPER
91

    
92
  master_netdev = opts.master_netdev
93
  if master_netdev is None:
94
    master_netdev = constants.DEFAULT_BRIDGE
95

    
96
  hvlist = opts.enabled_hypervisors
97
  if hvlist is None:
98
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
99
  hvlist = hvlist.split(",")
100

    
101
  hvparams = dict(opts.hvparams)
102
  beparams = opts.beparams
103
  nicparams = opts.nicparams
104

    
105
  diskparams = dict(opts.diskparams)
106

    
107
  # check the disk template types here, as we cannot rely on the type check done
108
  # by the opcode parameter types
109
  diskparams_keys = set(diskparams.keys())
110
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
111
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
112
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
113
    return 1
114

    
115
  # prepare beparams dict
116
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
117
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
118

    
119
  # prepare nicparams dict
120
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
121
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
122

    
123
  # prepare ndparams dict
124
  if opts.ndparams is None:
125
    ndparams = dict(constants.NDC_DEFAULTS)
126
  else:
127
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
128
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
129

    
130
  # prepare hvparams dict
131
  for hv in constants.HYPER_TYPES:
132
    if hv not in hvparams:
133
      hvparams[hv] = {}
134
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
135
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
136

    
137
  # prepare diskparams dict
138
  for templ in constants.DISK_TEMPLATES:
139
    if templ not in diskparams:
140
      diskparams[templ] = {}
141
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
142
                                         diskparams[templ])
143
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
144

    
145
  # prepare ipolicy dict
146
  ipolicy_raw = CreateIPolicyFromOpts(
147
    ispecs_mem_size=opts.ispecs_mem_size,
148
    ispecs_cpu_count=opts.ispecs_cpu_count,
149
    ispecs_disk_count=opts.ispecs_disk_count,
150
    ispecs_disk_size=opts.ispecs_disk_size,
151
    ispecs_nic_count=opts.ispecs_nic_count,
152
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
153
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
154
    fill_all=True)
155
  ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_raw)
156

    
157
  if opts.candidate_pool_size is None:
158
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
159

    
160
  if opts.mac_prefix is None:
161
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
162

    
163
  uid_pool = opts.uid_pool
164
  if uid_pool is not None:
165
    uid_pool = uidpool.ParseUidPool(uid_pool)
166

    
167
  if opts.prealloc_wipe_disks is None:
168
    opts.prealloc_wipe_disks = False
169

    
170
  external_ip_setup_script = opts.use_external_mip_script
171
  if external_ip_setup_script is None:
172
    external_ip_setup_script = False
173

    
174
  try:
175
    primary_ip_version = int(opts.primary_ip_version)
176
  except (ValueError, TypeError), err:
177
    ToStderr("Invalid primary ip version value: %s" % str(err))
178
    return 1
179

    
180
  master_netmask = opts.master_netmask
181
  try:
182
    if master_netmask is not None:
183
      master_netmask = int(master_netmask)
184
  except (ValueError, TypeError), err:
185
    ToStderr("Invalid master netmask value: %s" % str(err))
186
    return 1
187

    
188
  if opts.disk_state:
189
    disk_state = utils.FlatToDict(opts.disk_state)
190
  else:
191
    disk_state = {}
192

    
193
  hv_state = dict(opts.hv_state)
194

    
195
  bootstrap.InitCluster(cluster_name=args[0],
196
                        secondary_ip=opts.secondary_ip,
197
                        vg_name=vg_name,
198
                        mac_prefix=opts.mac_prefix,
199
                        master_netmask=master_netmask,
200
                        master_netdev=master_netdev,
201
                        file_storage_dir=opts.file_storage_dir,
202
                        shared_file_storage_dir=opts.shared_file_storage_dir,
203
                        enabled_hypervisors=hvlist,
204
                        hvparams=hvparams,
205
                        beparams=beparams,
206
                        nicparams=nicparams,
207
                        ndparams=ndparams,
208
                        diskparams=diskparams,
209
                        ipolicy=ipolicy,
210
                        candidate_pool_size=opts.candidate_pool_size,
211
                        modify_etc_hosts=opts.modify_etc_hosts,
212
                        modify_ssh_setup=opts.modify_ssh_setup,
213
                        maintain_node_health=opts.maintain_node_health,
214
                        drbd_helper=drbd_helper,
215
                        uid_pool=uid_pool,
216
                        default_iallocator=opts.default_iallocator,
217
                        primary_ip_version=primary_ip_version,
218
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
219
                        use_external_mip_script=external_ip_setup_script,
220
                        hv_state=hv_state,
221
                        disk_state=disk_state,
222
                        )
223
  op = opcodes.OpClusterPostInit()
224
  SubmitOpCode(op, opts=opts)
225
  return 0
226

    
227

    
228
@UsesRPC
229
def DestroyCluster(opts, args):
230
  """Destroy the cluster.
231

232
  @param opts: the command line options selected by the user
233
  @type args: list
234
  @param args: should be an empty list
235
  @rtype: int
236
  @return: the desired exit code
237

238
  """
239
  if not opts.yes_do_it:
240
    ToStderr("Destroying a cluster is irreversible. If you really want"
241
             " destroy this cluster, supply the --yes-do-it option.")
242
    return 1
243

    
244
  op = opcodes.OpClusterDestroy()
245
  master = SubmitOpCode(op, opts=opts)
246
  # if we reached this, the opcode didn't fail; we can proceed to
247
  # shutdown all the daemons
248
  bootstrap.FinalizeClusterDestroy(master)
249
  return 0
250

    
251

    
252
def RenameCluster(opts, args):
253
  """Rename the cluster.
254

255
  @param opts: the command line options selected by the user
256
  @type args: list
257
  @param args: should contain only one element, the new cluster name
258
  @rtype: int
259
  @return: the desired exit code
260

261
  """
262
  cl = GetClient()
263

    
264
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
265

    
266
  new_name = args[0]
267
  if not opts.force:
268
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
269
                " connected over the network to the cluster name, the"
270
                " operation is very dangerous as the IP address will be"
271
                " removed from the node and the change may not go through."
272
                " Continue?") % (cluster_name, new_name)
273
    if not AskUser(usertext):
274
      return 1
275

    
276
  op = opcodes.OpClusterRename(name=new_name)
277
  result = SubmitOpCode(op, opts=opts, cl=cl)
278

    
279
  if result:
280
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
281

    
282
  return 0
283

    
284

    
285
def ActivateMasterIp(opts, args):
286
  """Activates the master IP.
287

288
  """
289
  op = opcodes.OpClusterActivateMasterIp()
290
  SubmitOpCode(op)
291
  return 0
292

    
293

    
294
def DeactivateMasterIp(opts, args):
295
  """Deactivates the master IP.
296

297
  """
298
  if not opts.confirm:
299
    usertext = ("This will disable the master IP. All the open connections to"
300
                " the master IP will be closed. To reach the master you will"
301
                " need to use its node IP."
302
                " Continue?")
303
    if not AskUser(usertext):
304
      return 1
305

    
306
  op = opcodes.OpClusterDeactivateMasterIp()
307
  SubmitOpCode(op)
308
  return 0
309

    
310

    
311
def RedistributeConfig(opts, args):
312
  """Forces push of the cluster configuration.
313

314
  @param opts: the command line options selected by the user
315
  @type args: list
316
  @param args: empty list
317
  @rtype: int
318
  @return: the desired exit code
319

320
  """
321
  op = opcodes.OpClusterRedistConf()
322
  SubmitOrSend(op, opts)
323
  return 0
324

    
325

    
326
def ShowClusterVersion(opts, args):
327
  """Write version of ganeti software to the standard output.
328

329
  @param opts: the command line options selected by the user
330
  @type args: list
331
  @param args: should be an empty list
332
  @rtype: int
333
  @return: the desired exit code
334

335
  """
336
  cl = GetClient()
337
  result = cl.QueryClusterInfo()
338
  ToStdout("Software version: %s", result["software_version"])
339
  ToStdout("Internode protocol: %s", result["protocol_version"])
340
  ToStdout("Configuration format: %s", result["config_version"])
341
  ToStdout("OS api version: %s", result["os_api_version"])
342
  ToStdout("Export interface: %s", result["export_version"])
343
  return 0
344

    
345

    
346
def ShowClusterMaster(opts, args):
347
  """Write name of master node to the standard output.
348

349
  @param opts: the command line options selected by the user
350
  @type args: list
351
  @param args: should be an empty list
352
  @rtype: int
353
  @return: the desired exit code
354

355
  """
356
  master = bootstrap.GetMaster()
357
  ToStdout(master)
358
  return 0
359

    
360

    
361
def _PrintGroupedParams(paramsdict, level=1, roman=False):
362
  """Print Grouped parameters (be, nic, disk) by group.
363

364
  @type paramsdict: dict of dicts
365
  @param paramsdict: {group: {param: value, ...}, ...}
366
  @type level: int
367
  @param level: Level of indention
368

369
  """
370
  indent = "  " * level
371
  for item, val in sorted(paramsdict.items()):
372
    if isinstance(val, dict):
373
      ToStdout("%s- %s:", indent, item)
374
      _PrintGroupedParams(val, level=level + 1, roman=roman)
375
    elif roman and isinstance(val, int):
376
      ToStdout("%s  %s: %s", indent, item, compat.TryToRoman(val))
377
    else:
378
      ToStdout("%s  %s: %s", indent, item, val)
379

    
380

    
381
def ShowClusterConfig(opts, args):
382
  """Shows cluster information.
383

384
  @param opts: the command line options selected by the user
385
  @type args: list
386
  @param args: should be an empty list
387
  @rtype: int
388
  @return: the desired exit code
389

390
  """
391
  cl = GetClient()
392
  result = cl.QueryClusterInfo()
393

    
394
  ToStdout("Cluster name: %s", result["name"])
395
  ToStdout("Cluster UUID: %s", result["uuid"])
396

    
397
  ToStdout("Creation time: %s", utils.FormatTime(result["ctime"]))
398
  ToStdout("Modification time: %s", utils.FormatTime(result["mtime"]))
399

    
400
  ToStdout("Master node: %s", result["master"])
401

    
402
  ToStdout("Architecture (this node): %s (%s)",
403
           result["architecture"][0], result["architecture"][1])
404

    
405
  if result["tags"]:
406
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
407
  else:
408
    tags = "(none)"
409

    
410
  ToStdout("Tags: %s", tags)
411

    
412
  ToStdout("Default hypervisor: %s", result["default_hypervisor"])
413
  ToStdout("Enabled hypervisors: %s",
414
           utils.CommaJoin(result["enabled_hypervisors"]))
415

    
416
  ToStdout("Hypervisor parameters:")
417
  _PrintGroupedParams(result["hvparams"])
418

    
419
  ToStdout("OS-specific hypervisor parameters:")
420
  _PrintGroupedParams(result["os_hvp"])
421

    
422
  ToStdout("OS parameters:")
423
  _PrintGroupedParams(result["osparams"])
424

    
425
  ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"]))
426
  ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"]))
427

    
428
  ToStdout("Cluster parameters:")
429
  ToStdout("  - candidate pool size: %s",
430
            compat.TryToRoman(result["candidate_pool_size"],
431
                              convert=opts.roman_integers))
432
  ToStdout("  - master netdev: %s", result["master_netdev"])
433
  ToStdout("  - master netmask: %s", result["master_netmask"])
434
  ToStdout("  - use external master IP address setup script: %s",
435
           result["use_external_mip_script"])
436
  ToStdout("  - lvm volume group: %s", result["volume_group_name"])
437
  if result["reserved_lvs"]:
438
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
439
  else:
440
    reserved_lvs = "(none)"
441
  ToStdout("  - lvm reserved volumes: %s", reserved_lvs)
442
  ToStdout("  - drbd usermode helper: %s", result["drbd_usermode_helper"])
443
  ToStdout("  - file storage path: %s", result["file_storage_dir"])
444
  ToStdout("  - shared file storage path: %s",
445
           result["shared_file_storage_dir"])
446
  ToStdout("  - maintenance of node health: %s",
447
           result["maintain_node_health"])
448
  ToStdout("  - uid pool: %s",
449
            uidpool.FormatUidPool(result["uid_pool"],
450
                                  roman=opts.roman_integers))
451
  ToStdout("  - default instance allocator: %s", result["default_iallocator"])
452
  ToStdout("  - primary ip version: %d", result["primary_ip_version"])
453
  ToStdout("  - preallocation wipe disks: %s", result["prealloc_wipe_disks"])
454
  ToStdout("  - OS search path: %s", utils.CommaJoin(constants.OS_SEARCH_PATH))
455

    
456
  ToStdout("Default node parameters:")
457
  _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers)
458

    
459
  ToStdout("Default instance parameters:")
460
  _PrintGroupedParams(result["beparams"], roman=opts.roman_integers)
461

    
462
  ToStdout("Default nic parameters:")
463
  _PrintGroupedParams(result["nicparams"], roman=opts.roman_integers)
464

    
465
  ToStdout("Default disk parameters:")
466
  _PrintGroupedParams(result["diskparams"], roman=opts.roman_integers)
467

    
468
  ToStdout("Instance policy - limits for instances:")
469
  for key in constants.IPOLICY_ISPECS:
470
    ToStdout("  - %s", key)
471
    _PrintGroupedParams(result["ipolicy"][key], roman=opts.roman_integers)
472
  ToStdout("  - enabled disk templates: %s",
473
           utils.CommaJoin(result["ipolicy"][constants.IPOLICY_DTS]))
474
  for key in constants.IPOLICY_PARAMETERS:
475
    ToStdout("  - %s: %s", key, result["ipolicy"][key])
476

    
477
  return 0
478

    
479

    
480
def ClusterCopyFile(opts, args):
481
  """Copy a file from master to some nodes.
482

483
  @param opts: the command line options selected by the user
484
  @type args: list
485
  @param args: should contain only one element, the path of
486
      the file to be copied
487
  @rtype: int
488
  @return: the desired exit code
489

490
  """
491
  filename = args[0]
492
  if not os.path.exists(filename):
493
    raise errors.OpPrereqError("No such filename '%s'" % filename,
494
                               errors.ECODE_INVAL)
495

    
496
  cl = GetClient()
497

    
498
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
499

    
500
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
501
                           secondary_ips=opts.use_replication_network,
502
                           nodegroup=opts.nodegroup)
503

    
504
  srun = ssh.SshRunner(cluster_name=cluster_name)
505
  for node in results:
506
    if not srun.CopyFileToNode(node, filename):
507
      ToStderr("Copy of file %s to node %s failed", filename, node)
508

    
509
  return 0
510

    
511

    
512
def RunClusterCommand(opts, args):
513
  """Run a command on some nodes.
514

515
  @param opts: the command line options selected by the user
516
  @type args: list
517
  @param args: should contain the command to be run and its arguments
518
  @rtype: int
519
  @return: the desired exit code
520

521
  """
522
  cl = GetClient()
523

    
524
  command = " ".join(args)
525

    
526
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
527

    
528
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
529
                                                    "master_node"])
530

    
531
  srun = ssh.SshRunner(cluster_name=cluster_name)
532

    
533
  # Make sure master node is at list end
534
  if master_node in nodes:
535
    nodes.remove(master_node)
536
    nodes.append(master_node)
537

    
538
  for name in nodes:
539
    result = srun.Run(name, "root", command)
540
    ToStdout("------------------------------------------------")
541
    if opts.show_machine_names:
542
      for line in result.output.splitlines():
543
        ToStdout("%s: %s", name, line)
544
    else:
545
      ToStdout("node: %s", name)
546
      ToStdout("%s", result.output)
547
    ToStdout("return code = %s", result.exit_code)
548

    
549
  return 0
550

    
551

    
552
def VerifyCluster(opts, args):
553
  """Verify integrity of cluster, performing various test on nodes.
554

555
  @param opts: the command line options selected by the user
556
  @type args: list
557
  @param args: should be an empty list
558
  @rtype: int
559
  @return: the desired exit code
560

561
  """
562
  skip_checks = []
563

    
564
  if opts.skip_nplusone_mem:
565
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
566

    
567
  cl = GetClient()
568

    
569
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
570
                               error_codes=opts.error_codes,
571
                               debug_simulate_errors=opts.simulate_errors,
572
                               skip_checks=skip_checks,
573
                               ignore_errors=opts.ignore_errors,
574
                               group_name=opts.nodegroup)
575
  result = SubmitOpCode(op, cl=cl, opts=opts)
576

    
577
  # Keep track of submitted jobs
578
  jex = JobExecutor(cl=cl, opts=opts)
579

    
580
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
581
    jex.AddJobId(None, status, job_id)
582

    
583
  results = jex.GetResults()
584

    
585
  (bad_jobs, bad_results) = \
586
    map(len,
587
        # Convert iterators to lists
588
        map(list,
589
            # Count errors
590
            map(compat.partial(itertools.ifilterfalse, bool),
591
                # Convert result to booleans in a tuple
592
                zip(*((job_success, len(op_results) == 1 and op_results[0])
593
                      for (job_success, op_results) in results)))))
594

    
595
  if bad_jobs == 0 and bad_results == 0:
596
    rcode = constants.EXIT_SUCCESS
597
  else:
598
    rcode = constants.EXIT_FAILURE
599
    if bad_jobs > 0:
600
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
601

    
602
  return rcode
603

    
604

    
605
def VerifyDisks(opts, args):
606
  """Verify integrity of cluster disks.
607

608
  @param opts: the command line options selected by the user
609
  @type args: list
610
  @param args: should be an empty list
611
  @rtype: int
612
  @return: the desired exit code
613

614
  """
615
  cl = GetClient()
616

    
617
  op = opcodes.OpClusterVerifyDisks()
618

    
619
  result = SubmitOpCode(op, cl=cl, opts=opts)
620

    
621
  # Keep track of submitted jobs
622
  jex = JobExecutor(cl=cl, opts=opts)
623

    
624
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
625
    jex.AddJobId(None, status, job_id)
626

    
627
  retcode = constants.EXIT_SUCCESS
628

    
629
  for (status, result) in jex.GetResults():
630
    if not status:
631
      ToStdout("Job failed: %s", result)
632
      continue
633

    
634
    ((bad_nodes, instances, missing), ) = result
635

    
636
    for node, text in bad_nodes.items():
637
      ToStdout("Error gathering data on node %s: %s",
638
               node, utils.SafeEncode(text[-400:]))
639
      retcode = constants.EXIT_FAILURE
640
      ToStdout("You need to fix these nodes first before fixing instances")
641

    
642
    for iname in instances:
643
      if iname in missing:
644
        continue
645
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
646
      try:
647
        ToStdout("Activating disks for instance '%s'", iname)
648
        SubmitOpCode(op, opts=opts, cl=cl)
649
      except errors.GenericError, err:
650
        nret, msg = FormatError(err)
651
        retcode |= nret
652
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
653

    
654
    if missing:
655
      for iname, ival in missing.iteritems():
656
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
657
        if all_missing:
658
          ToStdout("Instance %s cannot be verified as it lives on"
659
                   " broken nodes", iname)
660
        else:
661
          ToStdout("Instance %s has missing logical volumes:", iname)
662
          ival.sort()
663
          for node, vol in ival:
664
            if node in bad_nodes:
665
              ToStdout("\tbroken node %s /dev/%s", node, vol)
666
            else:
667
              ToStdout("\t%s /dev/%s", node, vol)
668

    
669
      ToStdout("You need to replace or recreate disks for all the above"
670
               " instances if this message persists after fixing broken nodes.")
671
      retcode = constants.EXIT_FAILURE
672

    
673
  return retcode
674

    
675

    
676
def RepairDiskSizes(opts, args):
677
  """Verify sizes of cluster disks.
678

679
  @param opts: the command line options selected by the user
680
  @type args: list
681
  @param args: optional list of instances to restrict check to
682
  @rtype: int
683
  @return: the desired exit code
684

685
  """
686
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
687
  SubmitOpCode(op, opts=opts)
688

    
689

    
690
@UsesRPC
691
def MasterFailover(opts, args):
692
  """Failover the master node.
693

694
  This command, when run on a non-master node, will cause the current
695
  master to cease being master, and the non-master to become new
696
  master.
697

698
  @param opts: the command line options selected by the user
699
  @type args: list
700
  @param args: should be an empty list
701
  @rtype: int
702
  @return: the desired exit code
703

704
  """
705
  if opts.no_voting:
706
    usertext = ("This will perform the failover even if most other nodes"
707
                " are down, or if this node is outdated. This is dangerous"
708
                " as it can lead to a non-consistent cluster. Check the"
709
                " gnt-cluster(8) man page before proceeding. Continue?")
710
    if not AskUser(usertext):
711
      return 1
712

    
713
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
714

    
715

    
716
def MasterPing(opts, args):
717
  """Checks if the master is alive.
718

719
  @param opts: the command line options selected by the user
720
  @type args: list
721
  @param args: should be an empty list
722
  @rtype: int
723
  @return: the desired exit code
724

725
  """
726
  try:
727
    cl = GetClient()
728
    cl.QueryClusterInfo()
729
    return 0
730
  except Exception: # pylint: disable=W0703
731
    return 1
732

    
733

    
734
def SearchTags(opts, args):
735
  """Searches the tags on all the cluster.
736

737
  @param opts: the command line options selected by the user
738
  @type args: list
739
  @param args: should contain only one element, the tag pattern
740
  @rtype: int
741
  @return: the desired exit code
742

743
  """
744
  op = opcodes.OpTagsSearch(pattern=args[0])
745
  result = SubmitOpCode(op, opts=opts)
746
  if not result:
747
    return 1
748
  result = list(result)
749
  result.sort()
750
  for path, tag in result:
751
    ToStdout("%s %s", path, tag)
752

    
753

    
754
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
755
  """Reads and verifies an X509 certificate.
756

757
  @type cert_filename: string
758
  @param cert_filename: the path of the file containing the certificate to
759
                        verify encoded in PEM format
760
  @type verify_private_key: bool
761
  @param verify_private_key: whether to verify the private key in addition to
762
                             the public certificate
763
  @rtype: string
764
  @return: a string containing the PEM-encoded certificate.
765

766
  """
767
  try:
768
    pem = utils.ReadFile(cert_filename)
769
  except IOError, err:
770
    raise errors.X509CertError(cert_filename,
771
                               "Unable to read certificate: %s" % str(err))
772

    
773
  try:
774
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
775
  except Exception, err:
776
    raise errors.X509CertError(cert_filename,
777
                               "Unable to load certificate: %s" % str(err))
778

    
779
  if verify_private_key:
780
    try:
781
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
782
    except Exception, err:
783
      raise errors.X509CertError(cert_filename,
784
                                 "Unable to load private key: %s" % str(err))
785

    
786
  return pem
787

    
788

    
789
def _RenewCrypto(new_cluster_cert, new_rapi_cert, #pylint: disable=R0911
790
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
791
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
792
                 cds_filename, force):
793
  """Renews cluster certificates, keys and secrets.
794

795
  @type new_cluster_cert: bool
796
  @param new_cluster_cert: Whether to generate a new cluster certificate
797
  @type new_rapi_cert: bool
798
  @param new_rapi_cert: Whether to generate a new RAPI certificate
799
  @type rapi_cert_filename: string
800
  @param rapi_cert_filename: Path to file containing new RAPI certificate
801
  @type new_spice_cert: bool
802
  @param new_spice_cert: Whether to generate a new SPICE certificate
803
  @type spice_cert_filename: string
804
  @param spice_cert_filename: Path to file containing new SPICE certificate
805
  @type spice_cacert_filename: string
806
  @param spice_cacert_filename: Path to file containing the certificate of the
807
                                CA that signed the SPICE certificate
808
  @type new_confd_hmac_key: bool
809
  @param new_confd_hmac_key: Whether to generate a new HMAC key
810
  @type new_cds: bool
811
  @param new_cds: Whether to generate a new cluster domain secret
812
  @type cds_filename: string
813
  @param cds_filename: Path to file containing new cluster domain secret
814
  @type force: bool
815
  @param force: Whether to ask user for confirmation
816

817
  """
818
  if new_rapi_cert and rapi_cert_filename:
819
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
820
             " options can be specified at the same time.")
821
    return 1
822

    
823
  if new_cds and cds_filename:
824
    ToStderr("Only one of the --new-cluster-domain-secret and"
825
             " --cluster-domain-secret options can be specified at"
826
             " the same time.")
827
    return 1
828

    
829
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
830
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
831
             " and --spice-ca-certificate must not be used.")
832
    return 1
833

    
834
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
835
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
836
             " specified.")
837
    return 1
838

    
839
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
840
  try:
841
    if rapi_cert_filename:
842
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
843
    if spice_cert_filename:
844
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
845
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
846
  except errors.X509CertError, err:
847
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
848
    return 1
849

    
850
  if cds_filename:
851
    try:
852
      cds = utils.ReadFile(cds_filename)
853
    except Exception, err: # pylint: disable=W0703
854
      ToStderr("Can't load new cluster domain secret from %s: %s" %
855
               (cds_filename, str(err)))
856
      return 1
857
  else:
858
    cds = None
859

    
860
  if not force:
861
    usertext = ("This requires all daemons on all nodes to be restarted and"
862
                " may take some time. Continue?")
863
    if not AskUser(usertext):
864
      return 1
865

    
866
  def _RenewCryptoInner(ctx):
867
    ctx.feedback_fn("Updating certificates and keys")
868
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
869
                                    new_rapi_cert,
870
                                    new_spice_cert,
871
                                    new_confd_hmac_key,
872
                                    new_cds,
873
                                    rapi_cert_pem=rapi_cert_pem,
874
                                    spice_cert_pem=spice_cert_pem,
875
                                    spice_cacert_pem=spice_cacert_pem,
876
                                    cds=cds)
877

    
878
    files_to_copy = []
879

    
880
    if new_cluster_cert:
881
      files_to_copy.append(constants.NODED_CERT_FILE)
882

    
883
    if new_rapi_cert or rapi_cert_pem:
884
      files_to_copy.append(constants.RAPI_CERT_FILE)
885

    
886
    if new_spice_cert or spice_cert_pem:
887
      files_to_copy.append(constants.SPICE_CERT_FILE)
888
      files_to_copy.append(constants.SPICE_CACERT_FILE)
889

    
890
    if new_confd_hmac_key:
891
      files_to_copy.append(constants.CONFD_HMAC_KEY)
892

    
893
    if new_cds or cds:
894
      files_to_copy.append(constants.CLUSTER_DOMAIN_SECRET_FILE)
895

    
896
    if files_to_copy:
897
      for node_name in ctx.nonmaster_nodes:
898
        ctx.feedback_fn("Copying %s to %s" %
899
                        (", ".join(files_to_copy), node_name))
900
        for file_name in files_to_copy:
901
          ctx.ssh.CopyFileToNode(node_name, file_name)
902

    
903
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
904

    
905
  ToStdout("All requested certificates and keys have been replaced."
906
           " Running \"gnt-cluster verify\" now is recommended.")
907

    
908
  return 0
909

    
910

    
911
def RenewCrypto(opts, args):
912
  """Renews cluster certificates, keys and secrets.
913

914
  """
915
  return _RenewCrypto(opts.new_cluster_cert,
916
                      opts.new_rapi_cert,
917
                      opts.rapi_cert,
918
                      opts.new_spice_cert,
919
                      opts.spice_cert,
920
                      opts.spice_cacert,
921
                      opts.new_confd_hmac_key,
922
                      opts.new_cluster_domain_secret,
923
                      opts.cluster_domain_secret,
924
                      opts.force)
925

    
926

    
927
def SetClusterParams(opts, args):
928
  """Modify the cluster.
929

930
  @param opts: the command line options selected by the user
931
  @type args: list
932
  @param args: should be an empty list
933
  @rtype: int
934
  @return: the desired exit code
935

936
  """
937
  if not (not opts.lvm_storage or opts.vg_name or
938
          not opts.drbd_storage or opts.drbd_helper or
939
          opts.enabled_hypervisors or opts.hvparams or
940
          opts.beparams or opts.nicparams or
941
          opts.ndparams or opts.diskparams or
942
          opts.candidate_pool_size is not None or
943
          opts.uid_pool is not None or
944
          opts.maintain_node_health is not None or
945
          opts.add_uids is not None or
946
          opts.remove_uids is not None or
947
          opts.default_iallocator is not None or
948
          opts.reserved_lvs is not None or
949
          opts.master_netdev is not None or
950
          opts.master_netmask is not None or
951
          opts.use_external_mip_script is not None or
952
          opts.prealloc_wipe_disks is not None or
953
          opts.hv_state or
954
          opts.disk_state or
955
          opts.ispecs_mem_size or
956
          opts.ispecs_cpu_count or
957
          opts.ispecs_disk_count or
958
          opts.ispecs_disk_size or
959
          opts.ispecs_nic_count or
960
          opts.ipolicy_disk_templates is not None or
961
          opts.ipolicy_vcpu_ratio is not None):
962
    ToStderr("Please give at least one of the parameters.")
963
    return 1
964

    
965
  vg_name = opts.vg_name
966
  if not opts.lvm_storage and opts.vg_name:
967
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
968
    return 1
969

    
970
  if not opts.lvm_storage:
971
    vg_name = ""
972

    
973
  drbd_helper = opts.drbd_helper
974
  if not opts.drbd_storage and opts.drbd_helper:
975
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
976
    return 1
977

    
978
  if not opts.drbd_storage:
979
    drbd_helper = ""
980

    
981
  hvlist = opts.enabled_hypervisors
982
  if hvlist is not None:
983
    hvlist = hvlist.split(",")
984

    
985
  # a list of (name, dict) we can pass directly to dict() (or [])
986
  hvparams = dict(opts.hvparams)
987
  for hv_params in hvparams.values():
988
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
989

    
990
  diskparams = dict(opts.diskparams)
991

    
992
  for dt_params in diskparams.values():
993
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
994

    
995
  beparams = opts.beparams
996
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
997

    
998
  nicparams = opts.nicparams
999
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1000

    
1001
  ndparams = opts.ndparams
1002
  if ndparams is not None:
1003
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1004

    
1005
  ipolicy = CreateIPolicyFromOpts(
1006
    ispecs_mem_size=opts.ispecs_mem_size,
1007
    ispecs_cpu_count=opts.ispecs_cpu_count,
1008
    ispecs_disk_count=opts.ispecs_disk_count,
1009
    ispecs_disk_size=opts.ispecs_disk_size,
1010
    ispecs_nic_count=opts.ispecs_nic_count,
1011
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1012
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1013
    )
1014

    
1015
  mnh = opts.maintain_node_health
1016

    
1017
  uid_pool = opts.uid_pool
1018
  if uid_pool is not None:
1019
    uid_pool = uidpool.ParseUidPool(uid_pool)
1020

    
1021
  add_uids = opts.add_uids
1022
  if add_uids is not None:
1023
    add_uids = uidpool.ParseUidPool(add_uids)
1024

    
1025
  remove_uids = opts.remove_uids
1026
  if remove_uids is not None:
1027
    remove_uids = uidpool.ParseUidPool(remove_uids)
1028

    
1029
  if opts.reserved_lvs is not None:
1030
    if opts.reserved_lvs == "":
1031
      opts.reserved_lvs = []
1032
    else:
1033
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1034

    
1035
  if opts.master_netmask is not None:
1036
    try:
1037
      opts.master_netmask = int(opts.master_netmask)
1038
    except ValueError:
1039
      ToStderr("The --master-netmask option expects an int parameter.")
1040
      return 1
1041

    
1042
  ext_ip_script = opts.use_external_mip_script
1043

    
1044
  if opts.disk_state:
1045
    disk_state = utils.FlatToDict(opts.disk_state)
1046
  else:
1047
    disk_state = {}
1048

    
1049
  hv_state = dict(opts.hv_state)
1050

    
1051
  op = opcodes.OpClusterSetParams(vg_name=vg_name,
1052
                                  drbd_helper=drbd_helper,
1053
                                  enabled_hypervisors=hvlist,
1054
                                  hvparams=hvparams,
1055
                                  os_hvp=None,
1056
                                  beparams=beparams,
1057
                                  nicparams=nicparams,
1058
                                  ndparams=ndparams,
1059
                                  diskparams=diskparams,
1060
                                  ipolicy=ipolicy,
1061
                                  candidate_pool_size=opts.candidate_pool_size,
1062
                                  maintain_node_health=mnh,
1063
                                  uid_pool=uid_pool,
1064
                                  add_uids=add_uids,
1065
                                  remove_uids=remove_uids,
1066
                                  default_iallocator=opts.default_iallocator,
1067
                                  prealloc_wipe_disks=opts.prealloc_wipe_disks,
1068
                                  master_netdev=opts.master_netdev,
1069
                                  master_netmask=opts.master_netmask,
1070
                                  reserved_lvs=opts.reserved_lvs,
1071
                                  use_external_mip_script=ext_ip_script,
1072
                                  hv_state=hv_state,
1073
                                  disk_state=disk_state,
1074
                                  )
1075
  SubmitOrSend(op, opts)
1076
  return 0
1077

    
1078

    
1079
def QueueOps(opts, args):
1080
  """Queue operations.
1081

1082
  @param opts: the command line options selected by the user
1083
  @type args: list
1084
  @param args: should contain only one element, the subcommand
1085
  @rtype: int
1086
  @return: the desired exit code
1087

1088
  """
1089
  command = args[0]
1090
  client = GetClient()
1091
  if command in ("drain", "undrain"):
1092
    drain_flag = command == "drain"
1093
    client.SetQueueDrainFlag(drain_flag)
1094
  elif command == "info":
1095
    result = client.QueryConfigValues(["drain_flag"])
1096
    if result[0]:
1097
      val = "set"
1098
    else:
1099
      val = "unset"
1100
    ToStdout("The drain flag is %s" % val)
1101
  else:
1102
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1103
                               errors.ECODE_INVAL)
1104

    
1105
  return 0
1106

    
1107

    
1108
def _ShowWatcherPause(until):
1109
  if until is None or until < time.time():
1110
    ToStdout("The watcher is not paused.")
1111
  else:
1112
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1113

    
1114

    
1115
def WatcherOps(opts, args):
1116
  """Watcher operations.
1117

1118
  @param opts: the command line options selected by the user
1119
  @type args: list
1120
  @param args: should contain only one element, the subcommand
1121
  @rtype: int
1122
  @return: the desired exit code
1123

1124
  """
1125
  command = args[0]
1126
  client = GetClient()
1127

    
1128
  if command == "continue":
1129
    client.SetWatcherPause(None)
1130
    ToStdout("The watcher is no longer paused.")
1131

    
1132
  elif command == "pause":
1133
    if len(args) < 2:
1134
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1135

    
1136
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1137
    _ShowWatcherPause(result)
1138

    
1139
  elif command == "info":
1140
    result = client.QueryConfigValues(["watcher_pause"])
1141
    _ShowWatcherPause(result[0])
1142

    
1143
  else:
1144
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1145
                               errors.ECODE_INVAL)
1146

    
1147
  return 0
1148

    
1149

    
1150
def _OobPower(opts, node_list, power):
1151
  """Puts the node in the list to desired power state.
1152

1153
  @param opts: The command line options selected by the user
1154
  @param node_list: The list of nodes to operate on
1155
  @param power: True if they should be powered on, False otherwise
1156
  @return: The success of the operation (none failed)
1157

1158
  """
1159
  if power:
1160
    command = constants.OOB_POWER_ON
1161
  else:
1162
    command = constants.OOB_POWER_OFF
1163

    
1164
  op = opcodes.OpOobCommand(node_names=node_list,
1165
                            command=command,
1166
                            ignore_status=True,
1167
                            timeout=opts.oob_timeout,
1168
                            power_delay=opts.power_delay)
1169
  result = SubmitOpCode(op, opts=opts)
1170
  errs = 0
1171
  for node_result in result:
1172
    (node_tuple, data_tuple) = node_result
1173
    (_, node_name) = node_tuple
1174
    (data_status, _) = data_tuple
1175
    if data_status != constants.RS_NORMAL:
1176
      assert data_status != constants.RS_UNAVAIL
1177
      errs += 1
1178
      ToStderr("There was a problem changing power for %s, please investigate",
1179
               node_name)
1180

    
1181
  if errs > 0:
1182
    return False
1183

    
1184
  return True
1185

    
1186

    
1187
def _InstanceStart(opts, inst_list, start, no_remember=False):
1188
  """Puts the instances in the list to desired state.
1189

1190
  @param opts: The command line options selected by the user
1191
  @param inst_list: The list of instances to operate on
1192
  @param start: True if they should be started, False for shutdown
1193
  @param no_remember: If the instance state should be remembered
1194
  @return: The success of the operation (none failed)
1195

1196
  """
1197
  if start:
1198
    opcls = opcodes.OpInstanceStartup
1199
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1200
  else:
1201
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1202
                           timeout=opts.shutdown_timeout,
1203
                           no_remember=no_remember)
1204
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1205

    
1206
  jex = JobExecutor(opts=opts)
1207

    
1208
  for inst in inst_list:
1209
    ToStdout("Submit %s of instance %s", text_submit, inst)
1210
    op = opcls(instance_name=inst)
1211
    jex.QueueJob(inst, op)
1212

    
1213
  results = jex.GetResults()
1214
  bad_cnt = len([1 for (success, _) in results if not success])
1215

    
1216
  if bad_cnt == 0:
1217
    ToStdout("All instances have been %s successfully", text_success)
1218
  else:
1219
    ToStderr("There were errors while %s instances:\n"
1220
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1221
             len(results))
1222
    return False
1223

    
1224
  return True
1225

    
1226

    
1227
class _RunWhenNodesReachableHelper:
1228
  """Helper class to make shared internal state sharing easier.
1229

1230
  @ivar success: Indicates if all action_cb calls were successful
1231

1232
  """
1233
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1234
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1235
    """Init the object.
1236

1237
    @param node_list: The list of nodes to be reachable
1238
    @param action_cb: Callback called when a new host is reachable
1239
    @type node2ip: dict
1240
    @param node2ip: Node to ip mapping
1241
    @param port: The port to use for the TCP ping
1242
    @param feedback_fn: The function used for feedback
1243
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1244
    @param _sleep_fn: Function to sleep (for unittest use only)
1245

1246
    """
1247
    self.down = set(node_list)
1248
    self.up = set()
1249
    self.node2ip = node2ip
1250
    self.success = True
1251
    self.action_cb = action_cb
1252
    self.port = port
1253
    self.feedback_fn = feedback_fn
1254
    self._ping_fn = _ping_fn
1255
    self._sleep_fn = _sleep_fn
1256

    
1257
  def __call__(self):
1258
    """When called we run action_cb.
1259

1260
    @raises utils.RetryAgain: When there are still down nodes
1261

1262
    """
1263
    if not self.action_cb(self.up):
1264
      self.success = False
1265

    
1266
    if self.down:
1267
      raise utils.RetryAgain()
1268
    else:
1269
      return self.success
1270

    
1271
  def Wait(self, secs):
1272
    """Checks if a host is up or waits remaining seconds.
1273

1274
    @param secs: The secs remaining
1275

1276
    """
1277
    start = time.time()
1278
    for node in self.down:
1279
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1280
                       live_port_needed=True):
1281
        self.feedback_fn("Node %s became available" % node)
1282
        self.up.add(node)
1283
        self.down -= self.up
1284
        # If we have a node available there is the possibility to run the
1285
        # action callback successfully, therefore we don't wait and return
1286
        return
1287

    
1288
    self._sleep_fn(max(0.0, start + secs - time.time()))
1289

    
1290

    
1291
def _RunWhenNodesReachable(node_list, action_cb, interval):
1292
  """Run action_cb when nodes become reachable.
1293

1294
  @param node_list: The list of nodes to be reachable
1295
  @param action_cb: Callback called when a new host is reachable
1296
  @param interval: The earliest time to retry
1297

1298
  """
1299
  client = GetClient()
1300
  cluster_info = client.QueryClusterInfo()
1301
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1302
    family = netutils.IPAddress.family
1303
  else:
1304
    family = netutils.IP6Address.family
1305

    
1306
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1307
                 for node in node_list)
1308

    
1309
  port = netutils.GetDaemonPort(constants.NODED)
1310
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1311
                                        ToStdout)
1312

    
1313
  try:
1314
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1315
                       wait_fn=helper.Wait)
1316
  except utils.RetryTimeout:
1317
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1318
             " again:\n  - %s", "  - ".join(helper.down))
1319
    return False
1320

    
1321

    
1322
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1323
                          _instance_start_fn=_InstanceStart):
1324
  """Start the instances conditional based on node_states.
1325

1326
  @param opts: The command line options selected by the user
1327
  @param inst_map: A dict of inst -> nodes mapping
1328
  @param nodes_online: A list of nodes online
1329
  @param _instance_start_fn: Callback to start instances (unittest use only)
1330
  @return: Success of the operation on all instances
1331

1332
  """
1333
  start_inst_list = []
1334
  for (inst, nodes) in inst_map.items():
1335
    if not (nodes - nodes_online):
1336
      # All nodes the instance lives on are back online
1337
      start_inst_list.append(inst)
1338

    
1339
  for inst in start_inst_list:
1340
    del inst_map[inst]
1341

    
1342
  if start_inst_list:
1343
    return _instance_start_fn(opts, start_inst_list, True)
1344

    
1345
  return True
1346

    
1347

    
1348
def _EpoOn(opts, full_node_list, node_list, inst_map):
1349
  """Does the actual power on.
1350

1351
  @param opts: The command line options selected by the user
1352
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1353
                         OOB)
1354
  @param node_list: The list of nodes to operate on (all need to support OOB)
1355
  @param inst_map: A dict of inst -> nodes mapping
1356
  @return: The desired exit status
1357

1358
  """
1359
  if node_list and not _OobPower(opts, node_list, False):
1360
    ToStderr("Not all nodes seem to get back up, investigate and start"
1361
             " manually if needed")
1362

    
1363
  # Wait for the nodes to be back up
1364
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1365

    
1366
  ToStdout("Waiting until all nodes are available again")
1367
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1368
    ToStderr("Please investigate and start stopped instances manually")
1369
    return constants.EXIT_FAILURE
1370

    
1371
  return constants.EXIT_SUCCESS
1372

    
1373

    
1374
def _EpoOff(opts, node_list, inst_map):
1375
  """Does the actual power off.
1376

1377
  @param opts: The command line options selected by the user
1378
  @param node_list: The list of nodes to operate on (all need to support OOB)
1379
  @param inst_map: A dict of inst -> nodes mapping
1380
  @return: The desired exit status
1381

1382
  """
1383
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1384
    ToStderr("Please investigate and stop instances manually before continuing")
1385
    return constants.EXIT_FAILURE
1386

    
1387
  if not node_list:
1388
    return constants.EXIT_SUCCESS
1389

    
1390
  if _OobPower(opts, node_list, False):
1391
    return constants.EXIT_SUCCESS
1392
  else:
1393
    return constants.EXIT_FAILURE
1394

    
1395

    
1396
def Epo(opts, args):
1397
  """EPO operations.
1398

1399
  @param opts: the command line options selected by the user
1400
  @type args: list
1401
  @param args: should contain only one element, the subcommand
1402
  @rtype: int
1403
  @return: the desired exit code
1404

1405
  """
1406
  if opts.groups and opts.show_all:
1407
    ToStderr("Only one of --groups or --all are allowed")
1408
    return constants.EXIT_FAILURE
1409
  elif args and opts.show_all:
1410
    ToStderr("Arguments in combination with --all are not allowed")
1411
    return constants.EXIT_FAILURE
1412

    
1413
  client = GetClient()
1414

    
1415
  if opts.groups:
1416
    node_query_list = itertools.chain(*client.QueryGroups(names=args,
1417
                                                          fields=["node_list"],
1418
                                                          use_locking=False))
1419
  else:
1420
    node_query_list = args
1421

    
1422
  result = client.QueryNodes(names=node_query_list,
1423
                             fields=["name", "master", "pinst_list",
1424
                                     "sinst_list", "powered", "offline"],
1425
                             use_locking=False)
1426
  node_list = []
1427
  inst_map = {}
1428
  for (idx, (node, master, pinsts, sinsts, powered,
1429
             offline)) in enumerate(result):
1430
    # Normalize the node_query_list as well
1431
    if not opts.show_all:
1432
      node_query_list[idx] = node
1433
    if not offline:
1434
      for inst in (pinsts + sinsts):
1435
        if inst in inst_map:
1436
          if not master:
1437
            inst_map[inst].add(node)
1438
        elif master:
1439
          inst_map[inst] = set()
1440
        else:
1441
          inst_map[inst] = set([node])
1442

    
1443
    if master and opts.on:
1444
      # We ignore the master for turning on the machines, in fact we are
1445
      # already operating on the master at this point :)
1446
      continue
1447
    elif master and not opts.show_all:
1448
      ToStderr("%s is the master node, please do a master-failover to another"
1449
               " node not affected by the EPO or use --all if you intend to"
1450
               " shutdown the whole cluster", node)
1451
      return constants.EXIT_FAILURE
1452
    elif powered is None:
1453
      ToStdout("Node %s does not support out-of-band handling, it can not be"
1454
               " handled in a fully automated manner", node)
1455
    elif powered == opts.on:
1456
      ToStdout("Node %s is already in desired power state, skipping", node)
1457
    elif not offline or (offline and powered):
1458
      node_list.append(node)
1459

    
1460
  if not opts.force and not ConfirmOperation(node_query_list, "nodes", "epo"):
1461
    return constants.EXIT_FAILURE
1462

    
1463
  if opts.on:
1464
    return _EpoOn(opts, node_query_list, node_list, inst_map)
1465
  else:
1466
    return _EpoOff(opts, node_list, inst_map)
1467

    
1468
commands = {
1469
  "init": (
1470
    InitCluster, [ArgHost(min=1, max=1)],
1471
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1472
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1473
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1474
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1475
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1476
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1477
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1478
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + INSTANCE_POLICY_OPTS,
1479
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1480
  "destroy": (
1481
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1482
    "", "Destroy cluster"),
1483
  "rename": (
1484
    RenameCluster, [ArgHost(min=1, max=1)],
1485
    [FORCE_OPT, DRY_RUN_OPT],
1486
    "<new_name>",
1487
    "Renames the cluster"),
1488
  "redist-conf": (
1489
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1490
    "", "Forces a push of the configuration file and ssconf files"
1491
    " to the nodes in the cluster"),
1492
  "verify": (
1493
    VerifyCluster, ARGS_NONE,
1494
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1495
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1496
    "", "Does a check on the cluster configuration"),
1497
  "verify-disks": (
1498
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1499
    "", "Does a check on the cluster disk status"),
1500
  "repair-disk-sizes": (
1501
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1502
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1503
  "master-failover": (
1504
    MasterFailover, ARGS_NONE, [NOVOTING_OPT],
1505
    "", "Makes the current node the master"),
1506
  "master-ping": (
1507
    MasterPing, ARGS_NONE, [],
1508
    "", "Checks if the master is alive"),
1509
  "version": (
1510
    ShowClusterVersion, ARGS_NONE, [],
1511
    "", "Shows the cluster version"),
1512
  "getmaster": (
1513
    ShowClusterMaster, ARGS_NONE, [],
1514
    "", "Shows the cluster master"),
1515
  "copyfile": (
1516
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1517
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1518
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1519
  "command": (
1520
    RunClusterCommand, [ArgCommand(min=1)],
1521
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT],
1522
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1523
  "info": (
1524
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1525
    "[--roman]", "Show cluster configuration"),
1526
  "list-tags": (
1527
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1528
  "add-tags": (
1529
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1530
    "tag...", "Add tags to the cluster"),
1531
  "remove-tags": (
1532
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1533
    "tag...", "Remove tags from the cluster"),
1534
  "search-tags": (
1535
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1536
    "Searches the tags on all objects on"
1537
    " the cluster for a given pattern (regex)"),
1538
  "queue": (
1539
    QueueOps,
1540
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1541
    [], "drain|undrain|info", "Change queue properties"),
1542
  "watcher": (
1543
    WatcherOps,
1544
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1545
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1546
    [],
1547
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1548
  "modify": (
1549
    SetClusterParams, ARGS_NONE,
1550
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1551
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1552
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1553
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1554
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1555
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1556
     DISK_STATE_OPT, SUBMIT_OPT] +
1557
    INSTANCE_POLICY_OPTS,
1558
    "[opts...]",
1559
    "Alters the parameters of the cluster"),
1560
  "renew-crypto": (
1561
    RenewCrypto, ARGS_NONE,
1562
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1563
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1564
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1565
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1566
    "[opts...]",
1567
    "Renews cluster certificates, keys and secrets"),
1568
  "epo": (
1569
    Epo, [ArgUnknown()],
1570
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1571
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1572
    "[opts...] [args]",
1573
    "Performs an emergency power-off on given args"),
1574
  "activate-master-ip": (
1575
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1576
  "deactivate-master-ip": (
1577
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1578
    "Deactivates the master IP"),
1579
  }
1580

    
1581

    
1582
#: dictionary with aliases for commands
1583
aliases = {
1584
  "masterfailover": "master-failover",
1585
  "show": "info",
1586
}
1587

    
1588

    
1589
def Main():
1590
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1591
                     aliases=aliases)