Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 96897af7

History | View | Annotate | Download (51.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45

    
46

    
47
ON_OPT = cli_option("--on", default=False,
48
                    action="store_true", dest="on",
49
                    help="Recover from an EPO")
50

    
51
GROUPS_OPT = cli_option("--groups", default=False,
52
                    action="store_true", dest="groups",
53
                    help="Arguments are node groups instead of nodes")
54

    
55
SHOW_MACHINE_OPT = cli_option("-M", "--show-machine-names", default=False,
56
                              action="store_true",
57
                              help="Show machine name for every line in output")
58

    
59
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
60
_EPO_PING_TIMEOUT = 1 # 1 second
61
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
62

    
63

    
64
@UsesRPC
65
def InitCluster(opts, args):
66
  """Initialize the cluster.
67

68
  @param opts: the command line options selected by the user
69
  @type args: list
70
  @param args: should contain only one element, the desired
71
      cluster name
72
  @rtype: int
73
  @return: the desired exit code
74

75
  """
76
  if not opts.lvm_storage and opts.vg_name:
77
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
78
    return 1
79

    
80
  vg_name = opts.vg_name
81
  if opts.lvm_storage and not opts.vg_name:
82
    vg_name = constants.DEFAULT_VG
83

    
84
  if not opts.drbd_storage and opts.drbd_helper:
85
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
86
    return 1
87

    
88
  drbd_helper = opts.drbd_helper
89
  if opts.drbd_storage and not opts.drbd_helper:
90
    drbd_helper = constants.DEFAULT_DRBD_HELPER
91

    
92
  master_netdev = opts.master_netdev
93
  if master_netdev is None:
94
    master_netdev = constants.DEFAULT_BRIDGE
95

    
96
  hvlist = opts.enabled_hypervisors
97
  if hvlist is None:
98
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
99
  hvlist = hvlist.split(",")
100

    
101
  hvparams = dict(opts.hvparams)
102
  beparams = opts.beparams
103
  nicparams = opts.nicparams
104

    
105
  diskparams = dict(opts.diskparams)
106

    
107
  # check the disk template types here, as we cannot rely on the type check done
108
  # by the opcode parameter types
109
  diskparams_keys = set(diskparams.keys())
110
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
111
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
112
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
113
    return 1
114

    
115
  # prepare beparams dict
116
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
117
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
118

    
119
  # prepare nicparams dict
120
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
121
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
122

    
123
  # prepare ndparams dict
124
  if opts.ndparams is None:
125
    ndparams = dict(constants.NDC_DEFAULTS)
126
  else:
127
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
128
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
129

    
130
  # prepare hvparams dict
131
  for hv in constants.HYPER_TYPES:
132
    if hv not in hvparams:
133
      hvparams[hv] = {}
134
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
135
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
136

    
137
  # prepare diskparams dict
138
  for templ in constants.DISK_TEMPLATES:
139
    if templ not in diskparams:
140
      diskparams[templ] = {}
141
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
142
                                         diskparams[templ])
143
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
144

    
145
  # prepare ipolicy dict
146
  ipolicy_raw = objects.CreateIPolicyFromOpts(
147
    ispecs_mem_size=opts.ispecs_mem_size,
148
    ispecs_cpu_count=opts.ispecs_cpu_count,
149
    ispecs_disk_count=opts.ispecs_disk_count,
150
    ispecs_disk_size=opts.ispecs_disk_size,
151
    ispecs_nic_count=opts.ispecs_nic_count,
152
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
153
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
154
    fill_all=True)
155
  ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_raw)
156

    
157
  if opts.candidate_pool_size is None:
158
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
159

    
160
  if opts.mac_prefix is None:
161
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
162

    
163
  uid_pool = opts.uid_pool
164
  if uid_pool is not None:
165
    uid_pool = uidpool.ParseUidPool(uid_pool)
166

    
167
  if opts.prealloc_wipe_disks is None:
168
    opts.prealloc_wipe_disks = False
169

    
170
  external_ip_setup_script = opts.use_external_mip_script
171
  if external_ip_setup_script is None:
172
    external_ip_setup_script = False
173

    
174
  try:
175
    primary_ip_version = int(opts.primary_ip_version)
176
  except (ValueError, TypeError), err:
177
    ToStderr("Invalid primary ip version value: %s" % str(err))
178
    return 1
179

    
180
  master_netmask = opts.master_netmask
181
  try:
182
    if master_netmask is not None:
183
      master_netmask = int(master_netmask)
184
  except (ValueError, TypeError), err:
185
    ToStderr("Invalid master netmask value: %s" % str(err))
186
    return 1
187

    
188
  if opts.disk_state:
189
    disk_state = utils.FlatToDict(opts.disk_state)
190
  else:
191
    disk_state = {}
192

    
193
  hv_state = dict(opts.hv_state)
194

    
195
  bootstrap.InitCluster(cluster_name=args[0],
196
                        secondary_ip=opts.secondary_ip,
197
                        vg_name=vg_name,
198
                        mac_prefix=opts.mac_prefix,
199
                        master_netmask=master_netmask,
200
                        master_netdev=master_netdev,
201
                        file_storage_dir=opts.file_storage_dir,
202
                        shared_file_storage_dir=opts.shared_file_storage_dir,
203
                        enabled_hypervisors=hvlist,
204
                        hvparams=hvparams,
205
                        beparams=beparams,
206
                        nicparams=nicparams,
207
                        ndparams=ndparams,
208
                        diskparams=diskparams,
209
                        ipolicy=ipolicy,
210
                        candidate_pool_size=opts.candidate_pool_size,
211
                        modify_etc_hosts=opts.modify_etc_hosts,
212
                        modify_ssh_setup=opts.modify_ssh_setup,
213
                        maintain_node_health=opts.maintain_node_health,
214
                        drbd_helper=drbd_helper,
215
                        uid_pool=uid_pool,
216
                        default_iallocator=opts.default_iallocator,
217
                        primary_ip_version=primary_ip_version,
218
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
219
                        use_external_mip_script=external_ip_setup_script,
220
                        hv_state=hv_state,
221
                        disk_state=disk_state,
222
                        )
223
  op = opcodes.OpClusterPostInit()
224
  SubmitOpCode(op, opts=opts)
225
  return 0
226

    
227

    
228
@UsesRPC
229
def DestroyCluster(opts, args):
230
  """Destroy the cluster.
231

232
  @param opts: the command line options selected by the user
233
  @type args: list
234
  @param args: should be an empty list
235
  @rtype: int
236
  @return: the desired exit code
237

238
  """
239
  if not opts.yes_do_it:
240
    ToStderr("Destroying a cluster is irreversible. If you really want"
241
             " destroy this cluster, supply the --yes-do-it option.")
242
    return 1
243

    
244
  op = opcodes.OpClusterDestroy()
245
  master = SubmitOpCode(op, opts=opts)
246
  # if we reached this, the opcode didn't fail; we can proceed to
247
  # shutdown all the daemons
248
  bootstrap.FinalizeClusterDestroy(master)
249
  return 0
250

    
251

    
252
def RenameCluster(opts, args):
253
  """Rename the cluster.
254

255
  @param opts: the command line options selected by the user
256
  @type args: list
257
  @param args: should contain only one element, the new cluster name
258
  @rtype: int
259
  @return: the desired exit code
260

261
  """
262
  cl = GetClient()
263

    
264
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
265

    
266
  new_name = args[0]
267
  if not opts.force:
268
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
269
                " connected over the network to the cluster name, the"
270
                " operation is very dangerous as the IP address will be"
271
                " removed from the node and the change may not go through."
272
                " Continue?") % (cluster_name, new_name)
273
    if not AskUser(usertext):
274
      return 1
275

    
276
  op = opcodes.OpClusterRename(name=new_name)
277
  result = SubmitOpCode(op, opts=opts, cl=cl)
278

    
279
  if result:
280
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
281

    
282
  return 0
283

    
284

    
285
def ActivateMasterIp(opts, args):
286
  """Activates the master IP.
287

288
  """
289
  op = opcodes.OpClusterActivateMasterIp()
290
  SubmitOpCode(op)
291
  return 0
292

    
293

    
294
def DeactivateMasterIp(opts, args):
295
  """Deactivates the master IP.
296

297
  """
298
  if not opts.confirm:
299
    usertext = ("This will disable the master IP. All the open connections to"
300
                " the master IP will be closed. To reach the master you will"
301
                " need to use its node IP."
302
                " Continue?")
303
    if not AskUser(usertext):
304
      return 1
305

    
306
  op = opcodes.OpClusterDeactivateMasterIp()
307
  SubmitOpCode(op)
308
  return 0
309

    
310

    
311
def RedistributeConfig(opts, args):
312
  """Forces push of the cluster configuration.
313

314
  @param opts: the command line options selected by the user
315
  @type args: list
316
  @param args: empty list
317
  @rtype: int
318
  @return: the desired exit code
319

320
  """
321
  op = opcodes.OpClusterRedistConf()
322
  SubmitOrSend(op, opts)
323
  return 0
324

    
325

    
326
def ShowClusterVersion(opts, args):
327
  """Write version of ganeti software to the standard output.
328

329
  @param opts: the command line options selected by the user
330
  @type args: list
331
  @param args: should be an empty list
332
  @rtype: int
333
  @return: the desired exit code
334

335
  """
336
  cl = GetClient()
337
  result = cl.QueryClusterInfo()
338
  ToStdout("Software version: %s", result["software_version"])
339
  ToStdout("Internode protocol: %s", result["protocol_version"])
340
  ToStdout("Configuration format: %s", result["config_version"])
341
  ToStdout("OS api version: %s", result["os_api_version"])
342
  ToStdout("Export interface: %s", result["export_version"])
343
  return 0
344

    
345

    
346
def ShowClusterMaster(opts, args):
347
  """Write name of master node to the standard output.
348

349
  @param opts: the command line options selected by the user
350
  @type args: list
351
  @param args: should be an empty list
352
  @rtype: int
353
  @return: the desired exit code
354

355
  """
356
  master = bootstrap.GetMaster()
357
  ToStdout(master)
358
  return 0
359

    
360

    
361
def _PrintGroupedParams(paramsdict, level=1, roman=False):
362
  """Print Grouped parameters (be, nic, disk) by group.
363

364
  @type paramsdict: dict of dicts
365
  @param paramsdict: {group: {param: value, ...}, ...}
366
  @type level: int
367
  @param level: Level of indention
368

369
  """
370
  indent = "  " * level
371
  for item, val in sorted(paramsdict.items()):
372
    if isinstance(val, dict):
373
      ToStdout("%s- %s:", indent, item)
374
      _PrintGroupedParams(val, level=level + 1, roman=roman)
375
    elif roman and isinstance(val, int):
376
      ToStdout("%s  %s: %s", indent, item, compat.TryToRoman(val))
377
    else:
378
      ToStdout("%s  %s: %s", indent, item, val)
379

    
380

    
381
def ShowClusterConfig(opts, args):
382
  """Shows cluster information.
383

384
  @param opts: the command line options selected by the user
385
  @type args: list
386
  @param args: should be an empty list
387
  @rtype: int
388
  @return: the desired exit code
389

390
  """
391
  cl = GetClient()
392
  result = cl.QueryClusterInfo()
393

    
394
  ToStdout("Cluster name: %s", result["name"])
395
  ToStdout("Cluster UUID: %s", result["uuid"])
396

    
397
  ToStdout("Creation time: %s", utils.FormatTime(result["ctime"]))
398
  ToStdout("Modification time: %s", utils.FormatTime(result["mtime"]))
399

    
400
  ToStdout("Master node: %s", result["master"])
401

    
402
  ToStdout("Architecture (this node): %s (%s)",
403
           result["architecture"][0], result["architecture"][1])
404

    
405
  if result["tags"]:
406
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
407
  else:
408
    tags = "(none)"
409

    
410
  ToStdout("Tags: %s", tags)
411

    
412
  ToStdout("Default hypervisor: %s", result["default_hypervisor"])
413
  ToStdout("Enabled hypervisors: %s",
414
           utils.CommaJoin(result["enabled_hypervisors"]))
415

    
416
  ToStdout("Hypervisor parameters:")
417
  _PrintGroupedParams(result["hvparams"])
418

    
419
  ToStdout("OS-specific hypervisor parameters:")
420
  _PrintGroupedParams(result["os_hvp"])
421

    
422
  ToStdout("OS parameters:")
423
  _PrintGroupedParams(result["osparams"])
424

    
425
  ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"]))
426
  ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"]))
427

    
428
  ToStdout("Cluster parameters:")
429
  ToStdout("  - candidate pool size: %s",
430
            compat.TryToRoman(result["candidate_pool_size"],
431
                              convert=opts.roman_integers))
432
  ToStdout("  - master netdev: %s", result["master_netdev"])
433
  ToStdout("  - master netmask: %s", result["master_netmask"])
434
  ToStdout("  - use external master IP address setup script: %s",
435
           result["use_external_mip_script"])
436
  ToStdout("  - lvm volume group: %s", result["volume_group_name"])
437
  if result["reserved_lvs"]:
438
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
439
  else:
440
    reserved_lvs = "(none)"
441
  ToStdout("  - lvm reserved volumes: %s", reserved_lvs)
442
  ToStdout("  - drbd usermode helper: %s", result["drbd_usermode_helper"])
443
  ToStdout("  - file storage path: %s", result["file_storage_dir"])
444
  ToStdout("  - shared file storage path: %s",
445
           result["shared_file_storage_dir"])
446
  ToStdout("  - maintenance of node health: %s",
447
           result["maintain_node_health"])
448
  ToStdout("  - uid pool: %s",
449
            uidpool.FormatUidPool(result["uid_pool"],
450
                                  roman=opts.roman_integers))
451
  ToStdout("  - default instance allocator: %s", result["default_iallocator"])
452
  ToStdout("  - primary ip version: %d", result["primary_ip_version"])
453
  ToStdout("  - preallocation wipe disks: %s", result["prealloc_wipe_disks"])
454
  ToStdout("  - OS search path: %s", utils.CommaJoin(constants.OS_SEARCH_PATH))
455

    
456
  ToStdout("Default node parameters:")
457
  _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers)
458

    
459
  ToStdout("Default instance parameters:")
460
  _PrintGroupedParams(result["beparams"], roman=opts.roman_integers)
461

    
462
  ToStdout("Default nic parameters:")
463
  _PrintGroupedParams(result["nicparams"], roman=opts.roman_integers)
464

    
465
  ToStdout("Instance policy - limits for instances:")
466
  for key in constants.IPOLICY_ISPECS:
467
    ToStdout("  - %s", key)
468
    _PrintGroupedParams(result["ipolicy"][key], roman=opts.roman_integers)
469
  ToStdout("  - enabled disk templates: %s",
470
           utils.CommaJoin(result["ipolicy"][constants.IPOLICY_DTS]))
471
  for key in constants.IPOLICY_PARAMETERS:
472
    ToStdout("  - %s: %s", key, result["ipolicy"][key])
473

    
474
  return 0
475

    
476

    
477
def ClusterCopyFile(opts, args):
478
  """Copy a file from master to some nodes.
479

480
  @param opts: the command line options selected by the user
481
  @type args: list
482
  @param args: should contain only one element, the path of
483
      the file to be copied
484
  @rtype: int
485
  @return: the desired exit code
486

487
  """
488
  filename = args[0]
489
  if not os.path.exists(filename):
490
    raise errors.OpPrereqError("No such filename '%s'" % filename,
491
                               errors.ECODE_INVAL)
492

    
493
  cl = GetClient()
494

    
495
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
496

    
497
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
498
                           secondary_ips=opts.use_replication_network,
499
                           nodegroup=opts.nodegroup)
500

    
501
  srun = ssh.SshRunner(cluster_name=cluster_name)
502
  for node in results:
503
    if not srun.CopyFileToNode(node, filename):
504
      ToStderr("Copy of file %s to node %s failed", filename, node)
505

    
506
  return 0
507

    
508

    
509
def RunClusterCommand(opts, args):
510
  """Run a command on some nodes.
511

512
  @param opts: the command line options selected by the user
513
  @type args: list
514
  @param args: should contain the command to be run and its arguments
515
  @rtype: int
516
  @return: the desired exit code
517

518
  """
519
  cl = GetClient()
520

    
521
  command = " ".join(args)
522

    
523
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
524

    
525
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
526
                                                    "master_node"])
527

    
528
  srun = ssh.SshRunner(cluster_name=cluster_name)
529

    
530
  # Make sure master node is at list end
531
  if master_node in nodes:
532
    nodes.remove(master_node)
533
    nodes.append(master_node)
534

    
535
  for name in nodes:
536
    result = srun.Run(name, "root", command)
537
    ToStdout("------------------------------------------------")
538
    if opts.show_machine_names:
539
      for line in result.output.splitlines():
540
        ToStdout("%s: %s", name, line)
541
    else:
542
      ToStdout("node: %s", name)
543
      ToStdout("%s", result.output)
544
    ToStdout("return code = %s", result.exit_code)
545

    
546
  return 0
547

    
548

    
549
def VerifyCluster(opts, args):
550
  """Verify integrity of cluster, performing various test on nodes.
551

552
  @param opts: the command line options selected by the user
553
  @type args: list
554
  @param args: should be an empty list
555
  @rtype: int
556
  @return: the desired exit code
557

558
  """
559
  skip_checks = []
560

    
561
  if opts.skip_nplusone_mem:
562
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
563

    
564
  cl = GetClient()
565

    
566
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
567
                               error_codes=opts.error_codes,
568
                               debug_simulate_errors=opts.simulate_errors,
569
                               skip_checks=skip_checks,
570
                               ignore_errors=opts.ignore_errors,
571
                               group_name=opts.nodegroup)
572
  result = SubmitOpCode(op, cl=cl, opts=opts)
573

    
574
  # Keep track of submitted jobs
575
  jex = JobExecutor(cl=cl, opts=opts)
576

    
577
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
578
    jex.AddJobId(None, status, job_id)
579

    
580
  results = jex.GetResults()
581

    
582
  (bad_jobs, bad_results) = \
583
    map(len,
584
        # Convert iterators to lists
585
        map(list,
586
            # Count errors
587
            map(compat.partial(itertools.ifilterfalse, bool),
588
                # Convert result to booleans in a tuple
589
                zip(*((job_success, len(op_results) == 1 and op_results[0])
590
                      for (job_success, op_results) in results)))))
591

    
592
  if bad_jobs == 0 and bad_results == 0:
593
    rcode = constants.EXIT_SUCCESS
594
  else:
595
    rcode = constants.EXIT_FAILURE
596
    if bad_jobs > 0:
597
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
598

    
599
  return rcode
600

    
601

    
602
def VerifyDisks(opts, args):
603
  """Verify integrity of cluster disks.
604

605
  @param opts: the command line options selected by the user
606
  @type args: list
607
  @param args: should be an empty list
608
  @rtype: int
609
  @return: the desired exit code
610

611
  """
612
  cl = GetClient()
613

    
614
  op = opcodes.OpClusterVerifyDisks()
615

    
616
  result = SubmitOpCode(op, cl=cl, opts=opts)
617

    
618
  # Keep track of submitted jobs
619
  jex = JobExecutor(cl=cl, opts=opts)
620

    
621
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
622
    jex.AddJobId(None, status, job_id)
623

    
624
  retcode = constants.EXIT_SUCCESS
625

    
626
  for (status, result) in jex.GetResults():
627
    if not status:
628
      ToStdout("Job failed: %s", result)
629
      continue
630

    
631
    ((bad_nodes, instances, missing), ) = result
632

    
633
    for node, text in bad_nodes.items():
634
      ToStdout("Error gathering data on node %s: %s",
635
               node, utils.SafeEncode(text[-400:]))
636
      retcode = constants.EXIT_FAILURE
637
      ToStdout("You need to fix these nodes first before fixing instances")
638

    
639
    for iname in instances:
640
      if iname in missing:
641
        continue
642
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
643
      try:
644
        ToStdout("Activating disks for instance '%s'", iname)
645
        SubmitOpCode(op, opts=opts, cl=cl)
646
      except errors.GenericError, err:
647
        nret, msg = FormatError(err)
648
        retcode |= nret
649
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
650

    
651
    if missing:
652
      for iname, ival in missing.iteritems():
653
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
654
        if all_missing:
655
          ToStdout("Instance %s cannot be verified as it lives on"
656
                   " broken nodes", iname)
657
        else:
658
          ToStdout("Instance %s has missing logical volumes:", iname)
659
          ival.sort()
660
          for node, vol in ival:
661
            if node in bad_nodes:
662
              ToStdout("\tbroken node %s /dev/%s", node, vol)
663
            else:
664
              ToStdout("\t%s /dev/%s", node, vol)
665

    
666
      ToStdout("You need to replace or recreate disks for all the above"
667
               " instances if this message persists after fixing broken nodes.")
668
      retcode = constants.EXIT_FAILURE
669

    
670
  return retcode
671

    
672

    
673
def RepairDiskSizes(opts, args):
674
  """Verify sizes of cluster disks.
675

676
  @param opts: the command line options selected by the user
677
  @type args: list
678
  @param args: optional list of instances to restrict check to
679
  @rtype: int
680
  @return: the desired exit code
681

682
  """
683
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
684
  SubmitOpCode(op, opts=opts)
685

    
686

    
687
@UsesRPC
688
def MasterFailover(opts, args):
689
  """Failover the master node.
690

691
  This command, when run on a non-master node, will cause the current
692
  master to cease being master, and the non-master to become new
693
  master.
694

695
  @param opts: the command line options selected by the user
696
  @type args: list
697
  @param args: should be an empty list
698
  @rtype: int
699
  @return: the desired exit code
700

701
  """
702
  if opts.no_voting:
703
    usertext = ("This will perform the failover even if most other nodes"
704
                " are down, or if this node is outdated. This is dangerous"
705
                " as it can lead to a non-consistent cluster. Check the"
706
                " gnt-cluster(8) man page before proceeding. Continue?")
707
    if not AskUser(usertext):
708
      return 1
709

    
710
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
711

    
712

    
713
def MasterPing(opts, args):
714
  """Checks if the master is alive.
715

716
  @param opts: the command line options selected by the user
717
  @type args: list
718
  @param args: should be an empty list
719
  @rtype: int
720
  @return: the desired exit code
721

722
  """
723
  try:
724
    cl = GetClient()
725
    cl.QueryClusterInfo()
726
    return 0
727
  except Exception: # pylint: disable=W0703
728
    return 1
729

    
730

    
731
def SearchTags(opts, args):
732
  """Searches the tags on all the cluster.
733

734
  @param opts: the command line options selected by the user
735
  @type args: list
736
  @param args: should contain only one element, the tag pattern
737
  @rtype: int
738
  @return: the desired exit code
739

740
  """
741
  op = opcodes.OpTagsSearch(pattern=args[0])
742
  result = SubmitOpCode(op, opts=opts)
743
  if not result:
744
    return 1
745
  result = list(result)
746
  result.sort()
747
  for path, tag in result:
748
    ToStdout("%s %s", path, tag)
749

    
750

    
751
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
752
  """Reads and verifies an X509 certificate.
753

754
  @type cert_filename: string
755
  @param cert_filename: the path of the file containing the certificate to
756
                        verify encoded in PEM format
757
  @type verify_private_key: bool
758
  @param verify_private_key: whether to verify the private key in addition to
759
                             the public certificate
760
  @rtype: string
761
  @return: a string containing the PEM-encoded certificate.
762

763
  """
764
  try:
765
    pem = utils.ReadFile(cert_filename)
766
  except IOError, err:
767
    raise errors.X509CertError(cert_filename,
768
                               "Unable to read certificate: %s" % str(err))
769

    
770
  try:
771
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
772
  except Exception, err:
773
    raise errors.X509CertError(cert_filename,
774
                               "Unable to load certificate: %s" % str(err))
775

    
776
  if verify_private_key:
777
    try:
778
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
779
    except Exception, err:
780
      raise errors.X509CertError(cert_filename,
781
                                 "Unable to load private key: %s" % str(err))
782

    
783
  return pem
784

    
785

    
786
def _RenewCrypto(new_cluster_cert, new_rapi_cert, #pylint: disable=R0911
787
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
788
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
789
                 cds_filename, force):
790
  """Renews cluster certificates, keys and secrets.
791

792
  @type new_cluster_cert: bool
793
  @param new_cluster_cert: Whether to generate a new cluster certificate
794
  @type new_rapi_cert: bool
795
  @param new_rapi_cert: Whether to generate a new RAPI certificate
796
  @type rapi_cert_filename: string
797
  @param rapi_cert_filename: Path to file containing new RAPI certificate
798
  @type new_spice_cert: bool
799
  @param new_spice_cert: Whether to generate a new SPICE certificate
800
  @type spice_cert_filename: string
801
  @param spice_cert_filename: Path to file containing new SPICE certificate
802
  @type spice_cacert_filename: string
803
  @param spice_cacert_filename: Path to file containing the certificate of the
804
                                CA that signed the SPICE certificate
805
  @type new_confd_hmac_key: bool
806
  @param new_confd_hmac_key: Whether to generate a new HMAC key
807
  @type new_cds: bool
808
  @param new_cds: Whether to generate a new cluster domain secret
809
  @type cds_filename: string
810
  @param cds_filename: Path to file containing new cluster domain secret
811
  @type force: bool
812
  @param force: Whether to ask user for confirmation
813

814
  """
815
  if new_rapi_cert and rapi_cert_filename:
816
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
817
             " options can be specified at the same time.")
818
    return 1
819

    
820
  if new_cds and cds_filename:
821
    ToStderr("Only one of the --new-cluster-domain-secret and"
822
             " --cluster-domain-secret options can be specified at"
823
             " the same time.")
824
    return 1
825

    
826
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
827
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
828
             " and --spice-ca-certificate must not be used.")
829
    return 1
830

    
831
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
832
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
833
             " specified.")
834
    return 1
835

    
836
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
837
  try:
838
    if rapi_cert_filename:
839
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
840
    if spice_cert_filename:
841
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
842
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
843
  except errors.X509CertError, err:
844
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
845
    return 1
846

    
847
  if cds_filename:
848
    try:
849
      cds = utils.ReadFile(cds_filename)
850
    except Exception, err: # pylint: disable=W0703
851
      ToStderr("Can't load new cluster domain secret from %s: %s" %
852
               (cds_filename, str(err)))
853
      return 1
854
  else:
855
    cds = None
856

    
857
  if not force:
858
    usertext = ("This requires all daemons on all nodes to be restarted and"
859
                " may take some time. Continue?")
860
    if not AskUser(usertext):
861
      return 1
862

    
863
  def _RenewCryptoInner(ctx):
864
    ctx.feedback_fn("Updating certificates and keys")
865
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
866
                                    new_rapi_cert,
867
                                    new_spice_cert,
868
                                    new_confd_hmac_key,
869
                                    new_cds,
870
                                    rapi_cert_pem=rapi_cert_pem,
871
                                    spice_cert_pem=spice_cert_pem,
872
                                    spice_cacert_pem=spice_cacert_pem,
873
                                    cds=cds)
874

    
875
    files_to_copy = []
876

    
877
    if new_cluster_cert:
878
      files_to_copy.append(constants.NODED_CERT_FILE)
879

    
880
    if new_rapi_cert or rapi_cert_pem:
881
      files_to_copy.append(constants.RAPI_CERT_FILE)
882

    
883
    if new_spice_cert or spice_cert_pem:
884
      files_to_copy.append(constants.SPICE_CERT_FILE)
885
      files_to_copy.append(constants.SPICE_CACERT_FILE)
886

    
887
    if new_confd_hmac_key:
888
      files_to_copy.append(constants.CONFD_HMAC_KEY)
889

    
890
    if new_cds or cds:
891
      files_to_copy.append(constants.CLUSTER_DOMAIN_SECRET_FILE)
892

    
893
    if files_to_copy:
894
      for node_name in ctx.nonmaster_nodes:
895
        ctx.feedback_fn("Copying %s to %s" %
896
                        (", ".join(files_to_copy), node_name))
897
        for file_name in files_to_copy:
898
          ctx.ssh.CopyFileToNode(node_name, file_name)
899

    
900
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
901

    
902
  ToStdout("All requested certificates and keys have been replaced."
903
           " Running \"gnt-cluster verify\" now is recommended.")
904

    
905
  return 0
906

    
907

    
908
def RenewCrypto(opts, args):
909
  """Renews cluster certificates, keys and secrets.
910

911
  """
912
  return _RenewCrypto(opts.new_cluster_cert,
913
                      opts.new_rapi_cert,
914
                      opts.rapi_cert,
915
                      opts.new_spice_cert,
916
                      opts.spice_cert,
917
                      opts.spice_cacert,
918
                      opts.new_confd_hmac_key,
919
                      opts.new_cluster_domain_secret,
920
                      opts.cluster_domain_secret,
921
                      opts.force)
922

    
923

    
924
def SetClusterParams(opts, args):
925
  """Modify the cluster.
926

927
  @param opts: the command line options selected by the user
928
  @type args: list
929
  @param args: should be an empty list
930
  @rtype: int
931
  @return: the desired exit code
932

933
  """
934
  if not (not opts.lvm_storage or opts.vg_name or
935
          not opts.drbd_storage or opts.drbd_helper or
936
          opts.enabled_hypervisors or opts.hvparams or
937
          opts.beparams or opts.nicparams or
938
          opts.ndparams or opts.diskparams or
939
          opts.candidate_pool_size is not None or
940
          opts.uid_pool is not None or
941
          opts.maintain_node_health is not None or
942
          opts.add_uids is not None or
943
          opts.remove_uids is not None or
944
          opts.default_iallocator is not None or
945
          opts.reserved_lvs is not None or
946
          opts.master_netdev is not None or
947
          opts.master_netmask is not None or
948
          opts.use_external_mip_script is not None or
949
          opts.prealloc_wipe_disks is not None or
950
          opts.hv_state or
951
          opts.disk_state or
952
          opts.ispecs_mem_size or
953
          opts.ispecs_cpu_count or
954
          opts.ispecs_disk_count or
955
          opts.ispecs_disk_size or
956
          opts.ispecs_nic_count or
957
          opts.ipolicy_disk_templates is not None or
958
          opts.ipolicy_vcpu_ratio is not None):
959
    ToStderr("Please give at least one of the parameters.")
960
    return 1
961

    
962
  vg_name = opts.vg_name
963
  if not opts.lvm_storage and opts.vg_name:
964
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
965
    return 1
966

    
967
  if not opts.lvm_storage:
968
    vg_name = ""
969

    
970
  drbd_helper = opts.drbd_helper
971
  if not opts.drbd_storage and opts.drbd_helper:
972
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
973
    return 1
974

    
975
  if not opts.drbd_storage:
976
    drbd_helper = ""
977

    
978
  hvlist = opts.enabled_hypervisors
979
  if hvlist is not None:
980
    hvlist = hvlist.split(",")
981

    
982
  # a list of (name, dict) we can pass directly to dict() (or [])
983
  hvparams = dict(opts.hvparams)
984
  for hv_params in hvparams.values():
985
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
986

    
987
  diskparams = dict(opts.diskparams)
988

    
989
  for dt_params in diskparams.values():
990
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
991

    
992
  beparams = opts.beparams
993
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
994

    
995
  nicparams = opts.nicparams
996
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
997

    
998
  ndparams = opts.ndparams
999
  if ndparams is not None:
1000
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1001

    
1002
  ipolicy = objects.CreateIPolicyFromOpts(
1003
    ispecs_mem_size=opts.ispecs_mem_size,
1004
    ispecs_cpu_count=opts.ispecs_cpu_count,
1005
    ispecs_disk_count=opts.ispecs_disk_count,
1006
    ispecs_disk_size=opts.ispecs_disk_size,
1007
    ispecs_nic_count=opts.ispecs_nic_count,
1008
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1009
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1010
    )
1011

    
1012
  mnh = opts.maintain_node_health
1013

    
1014
  uid_pool = opts.uid_pool
1015
  if uid_pool is not None:
1016
    uid_pool = uidpool.ParseUidPool(uid_pool)
1017

    
1018
  add_uids = opts.add_uids
1019
  if add_uids is not None:
1020
    add_uids = uidpool.ParseUidPool(add_uids)
1021

    
1022
  remove_uids = opts.remove_uids
1023
  if remove_uids is not None:
1024
    remove_uids = uidpool.ParseUidPool(remove_uids)
1025

    
1026
  if opts.reserved_lvs is not None:
1027
    if opts.reserved_lvs == "":
1028
      opts.reserved_lvs = []
1029
    else:
1030
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1031

    
1032
  if opts.master_netmask is not None:
1033
    try:
1034
      opts.master_netmask = int(opts.master_netmask)
1035
    except ValueError:
1036
      ToStderr("The --master-netmask option expects an int parameter.")
1037
      return 1
1038

    
1039
  ext_ip_script = opts.use_external_mip_script
1040

    
1041
  if opts.disk_state:
1042
    disk_state = utils.FlatToDict(opts.disk_state)
1043
  else:
1044
    disk_state = {}
1045

    
1046
  hv_state = dict(opts.hv_state)
1047

    
1048
  op = opcodes.OpClusterSetParams(vg_name=vg_name,
1049
                                  drbd_helper=drbd_helper,
1050
                                  enabled_hypervisors=hvlist,
1051
                                  hvparams=hvparams,
1052
                                  os_hvp=None,
1053
                                  beparams=beparams,
1054
                                  nicparams=nicparams,
1055
                                  ndparams=ndparams,
1056
                                  diskparams=diskparams,
1057
                                  ipolicy=ipolicy,
1058
                                  candidate_pool_size=opts.candidate_pool_size,
1059
                                  maintain_node_health=mnh,
1060
                                  uid_pool=uid_pool,
1061
                                  add_uids=add_uids,
1062
                                  remove_uids=remove_uids,
1063
                                  default_iallocator=opts.default_iallocator,
1064
                                  prealloc_wipe_disks=opts.prealloc_wipe_disks,
1065
                                  master_netdev=opts.master_netdev,
1066
                                  master_netmask=opts.master_netmask,
1067
                                  reserved_lvs=opts.reserved_lvs,
1068
                                  use_external_mip_script=ext_ip_script,
1069
                                  hv_state=hv_state,
1070
                                  disk_state=disk_state,
1071
                                  )
1072
  SubmitOpCode(op, opts=opts)
1073
  return 0
1074

    
1075

    
1076
def QueueOps(opts, args):
1077
  """Queue operations.
1078

1079
  @param opts: the command line options selected by the user
1080
  @type args: list
1081
  @param args: should contain only one element, the subcommand
1082
  @rtype: int
1083
  @return: the desired exit code
1084

1085
  """
1086
  command = args[0]
1087
  client = GetClient()
1088
  if command in ("drain", "undrain"):
1089
    drain_flag = command == "drain"
1090
    client.SetQueueDrainFlag(drain_flag)
1091
  elif command == "info":
1092
    result = client.QueryConfigValues(["drain_flag"])
1093
    if result[0]:
1094
      val = "set"
1095
    else:
1096
      val = "unset"
1097
    ToStdout("The drain flag is %s" % val)
1098
  else:
1099
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1100
                               errors.ECODE_INVAL)
1101

    
1102
  return 0
1103

    
1104

    
1105
def _ShowWatcherPause(until):
1106
  if until is None or until < time.time():
1107
    ToStdout("The watcher is not paused.")
1108
  else:
1109
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1110

    
1111

    
1112
def WatcherOps(opts, args):
1113
  """Watcher operations.
1114

1115
  @param opts: the command line options selected by the user
1116
  @type args: list
1117
  @param args: should contain only one element, the subcommand
1118
  @rtype: int
1119
  @return: the desired exit code
1120

1121
  """
1122
  command = args[0]
1123
  client = GetClient()
1124

    
1125
  if command == "continue":
1126
    client.SetWatcherPause(None)
1127
    ToStdout("The watcher is no longer paused.")
1128

    
1129
  elif command == "pause":
1130
    if len(args) < 2:
1131
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1132

    
1133
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1134
    _ShowWatcherPause(result)
1135

    
1136
  elif command == "info":
1137
    result = client.QueryConfigValues(["watcher_pause"])
1138
    _ShowWatcherPause(result[0])
1139

    
1140
  else:
1141
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1142
                               errors.ECODE_INVAL)
1143

    
1144
  return 0
1145

    
1146

    
1147
def _OobPower(opts, node_list, power):
1148
  """Puts the node in the list to desired power state.
1149

1150
  @param opts: The command line options selected by the user
1151
  @param node_list: The list of nodes to operate on
1152
  @param power: True if they should be powered on, False otherwise
1153
  @return: The success of the operation (none failed)
1154

1155
  """
1156
  if power:
1157
    command = constants.OOB_POWER_ON
1158
  else:
1159
    command = constants.OOB_POWER_OFF
1160

    
1161
  op = opcodes.OpOobCommand(node_names=node_list,
1162
                            command=command,
1163
                            ignore_status=True,
1164
                            timeout=opts.oob_timeout,
1165
                            power_delay=opts.power_delay)
1166
  result = SubmitOpCode(op, opts=opts)
1167
  errs = 0
1168
  for node_result in result:
1169
    (node_tuple, data_tuple) = node_result
1170
    (_, node_name) = node_tuple
1171
    (data_status, _) = data_tuple
1172
    if data_status != constants.RS_NORMAL:
1173
      assert data_status != constants.RS_UNAVAIL
1174
      errs += 1
1175
      ToStderr("There was a problem changing power for %s, please investigate",
1176
               node_name)
1177

    
1178
  if errs > 0:
1179
    return False
1180

    
1181
  return True
1182

    
1183

    
1184
def _InstanceStart(opts, inst_list, start, no_remember=False):
1185
  """Puts the instances in the list to desired state.
1186

1187
  @param opts: The command line options selected by the user
1188
  @param inst_list: The list of instances to operate on
1189
  @param start: True if they should be started, False for shutdown
1190
  @param no_remember: If the instance state should be remembered
1191
  @return: The success of the operation (none failed)
1192

1193
  """
1194
  if start:
1195
    opcls = opcodes.OpInstanceStartup
1196
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1197
  else:
1198
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1199
                           timeout=opts.shutdown_timeout,
1200
                           no_remember=no_remember)
1201
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1202

    
1203
  jex = JobExecutor(opts=opts)
1204

    
1205
  for inst in inst_list:
1206
    ToStdout("Submit %s of instance %s", text_submit, inst)
1207
    op = opcls(instance_name=inst)
1208
    jex.QueueJob(inst, op)
1209

    
1210
  results = jex.GetResults()
1211
  bad_cnt = len([1 for (success, _) in results if not success])
1212

    
1213
  if bad_cnt == 0:
1214
    ToStdout("All instances have been %s successfully", text_success)
1215
  else:
1216
    ToStderr("There were errors while %s instances:\n"
1217
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1218
             len(results))
1219
    return False
1220

    
1221
  return True
1222

    
1223

    
1224
class _RunWhenNodesReachableHelper:
1225
  """Helper class to make shared internal state sharing easier.
1226

1227
  @ivar success: Indicates if all action_cb calls were successful
1228

1229
  """
1230
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1231
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1232
    """Init the object.
1233

1234
    @param node_list: The list of nodes to be reachable
1235
    @param action_cb: Callback called when a new host is reachable
1236
    @type node2ip: dict
1237
    @param node2ip: Node to ip mapping
1238
    @param port: The port to use for the TCP ping
1239
    @param feedback_fn: The function used for feedback
1240
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1241
    @param _sleep_fn: Function to sleep (for unittest use only)
1242

1243
    """
1244
    self.down = set(node_list)
1245
    self.up = set()
1246
    self.node2ip = node2ip
1247
    self.success = True
1248
    self.action_cb = action_cb
1249
    self.port = port
1250
    self.feedback_fn = feedback_fn
1251
    self._ping_fn = _ping_fn
1252
    self._sleep_fn = _sleep_fn
1253

    
1254
  def __call__(self):
1255
    """When called we run action_cb.
1256

1257
    @raises utils.RetryAgain: When there are still down nodes
1258

1259
    """
1260
    if not self.action_cb(self.up):
1261
      self.success = False
1262

    
1263
    if self.down:
1264
      raise utils.RetryAgain()
1265
    else:
1266
      return self.success
1267

    
1268
  def Wait(self, secs):
1269
    """Checks if a host is up or waits remaining seconds.
1270

1271
    @param secs: The secs remaining
1272

1273
    """
1274
    start = time.time()
1275
    for node in self.down:
1276
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1277
                       live_port_needed=True):
1278
        self.feedback_fn("Node %s became available" % node)
1279
        self.up.add(node)
1280
        self.down -= self.up
1281
        # If we have a node available there is the possibility to run the
1282
        # action callback successfully, therefore we don't wait and return
1283
        return
1284

    
1285
    self._sleep_fn(max(0.0, start + secs - time.time()))
1286

    
1287

    
1288
def _RunWhenNodesReachable(node_list, action_cb, interval):
1289
  """Run action_cb when nodes become reachable.
1290

1291
  @param node_list: The list of nodes to be reachable
1292
  @param action_cb: Callback called when a new host is reachable
1293
  @param interval: The earliest time to retry
1294

1295
  """
1296
  client = GetClient()
1297
  cluster_info = client.QueryClusterInfo()
1298
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1299
    family = netutils.IPAddress.family
1300
  else:
1301
    family = netutils.IP6Address.family
1302

    
1303
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1304
                 for node in node_list)
1305

    
1306
  port = netutils.GetDaemonPort(constants.NODED)
1307
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1308
                                        ToStdout)
1309

    
1310
  try:
1311
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1312
                       wait_fn=helper.Wait)
1313
  except utils.RetryTimeout:
1314
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1315
             " again:\n  - %s", "  - ".join(helper.down))
1316
    return False
1317

    
1318

    
1319
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1320
                          _instance_start_fn=_InstanceStart):
1321
  """Start the instances conditional based on node_states.
1322

1323
  @param opts: The command line options selected by the user
1324
  @param inst_map: A dict of inst -> nodes mapping
1325
  @param nodes_online: A list of nodes online
1326
  @param _instance_start_fn: Callback to start instances (unittest use only)
1327
  @return: Success of the operation on all instances
1328

1329
  """
1330
  start_inst_list = []
1331
  for (inst, nodes) in inst_map.items():
1332
    if not (nodes - nodes_online):
1333
      # All nodes the instance lives on are back online
1334
      start_inst_list.append(inst)
1335

    
1336
  for inst in start_inst_list:
1337
    del inst_map[inst]
1338

    
1339
  if start_inst_list:
1340
    return _instance_start_fn(opts, start_inst_list, True)
1341

    
1342
  return True
1343

    
1344

    
1345
def _EpoOn(opts, full_node_list, node_list, inst_map):
1346
  """Does the actual power on.
1347

1348
  @param opts: The command line options selected by the user
1349
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1350
                         OOB)
1351
  @param node_list: The list of nodes to operate on (all need to support OOB)
1352
  @param inst_map: A dict of inst -> nodes mapping
1353
  @return: The desired exit status
1354

1355
  """
1356
  if node_list and not _OobPower(opts, node_list, False):
1357
    ToStderr("Not all nodes seem to get back up, investigate and start"
1358
             " manually if needed")
1359

    
1360
  # Wait for the nodes to be back up
1361
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1362

    
1363
  ToStdout("Waiting until all nodes are available again")
1364
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1365
    ToStderr("Please investigate and start stopped instances manually")
1366
    return constants.EXIT_FAILURE
1367

    
1368
  return constants.EXIT_SUCCESS
1369

    
1370

    
1371
def _EpoOff(opts, node_list, inst_map):
1372
  """Does the actual power off.
1373

1374
  @param opts: The command line options selected by the user
1375
  @param node_list: The list of nodes to operate on (all need to support OOB)
1376
  @param inst_map: A dict of inst -> nodes mapping
1377
  @return: The desired exit status
1378

1379
  """
1380
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1381
    ToStderr("Please investigate and stop instances manually before continuing")
1382
    return constants.EXIT_FAILURE
1383

    
1384
  if not node_list:
1385
    return constants.EXIT_SUCCESS
1386

    
1387
  if _OobPower(opts, node_list, False):
1388
    return constants.EXIT_SUCCESS
1389
  else:
1390
    return constants.EXIT_FAILURE
1391

    
1392

    
1393
def Epo(opts, args):
1394
  """EPO operations.
1395

1396
  @param opts: the command line options selected by the user
1397
  @type args: list
1398
  @param args: should contain only one element, the subcommand
1399
  @rtype: int
1400
  @return: the desired exit code
1401

1402
  """
1403
  if opts.groups and opts.show_all:
1404
    ToStderr("Only one of --groups or --all are allowed")
1405
    return constants.EXIT_FAILURE
1406
  elif args and opts.show_all:
1407
    ToStderr("Arguments in combination with --all are not allowed")
1408
    return constants.EXIT_FAILURE
1409

    
1410
  client = GetClient()
1411

    
1412
  if opts.groups:
1413
    node_query_list = itertools.chain(*client.QueryGroups(names=args,
1414
                                                          fields=["node_list"],
1415
                                                          use_locking=False))
1416
  else:
1417
    node_query_list = args
1418

    
1419
  result = client.QueryNodes(names=node_query_list,
1420
                             fields=["name", "master", "pinst_list",
1421
                                     "sinst_list", "powered", "offline"],
1422
                             use_locking=False)
1423
  node_list = []
1424
  inst_map = {}
1425
  for (idx, (node, master, pinsts, sinsts, powered,
1426
             offline)) in enumerate(result):
1427
    # Normalize the node_query_list as well
1428
    if not opts.show_all:
1429
      node_query_list[idx] = node
1430
    if not offline:
1431
      for inst in (pinsts + sinsts):
1432
        if inst in inst_map:
1433
          if not master:
1434
            inst_map[inst].add(node)
1435
        elif master:
1436
          inst_map[inst] = set()
1437
        else:
1438
          inst_map[inst] = set([node])
1439

    
1440
    if master and opts.on:
1441
      # We ignore the master for turning on the machines, in fact we are
1442
      # already operating on the master at this point :)
1443
      continue
1444
    elif master and not opts.show_all:
1445
      ToStderr("%s is the master node, please do a master-failover to another"
1446
               " node not affected by the EPO or use --all if you intend to"
1447
               " shutdown the whole cluster", node)
1448
      return constants.EXIT_FAILURE
1449
    elif powered is None:
1450
      ToStdout("Node %s does not support out-of-band handling, it can not be"
1451
               " handled in a fully automated manner", node)
1452
    elif powered == opts.on:
1453
      ToStdout("Node %s is already in desired power state, skipping", node)
1454
    elif not offline or (offline and powered):
1455
      node_list.append(node)
1456

    
1457
  if not opts.force and not ConfirmOperation(node_query_list, "nodes", "epo"):
1458
    return constants.EXIT_FAILURE
1459

    
1460
  if opts.on:
1461
    return _EpoOn(opts, node_query_list, node_list, inst_map)
1462
  else:
1463
    return _EpoOff(opts, node_list, inst_map)
1464

    
1465
commands = {
1466
  "init": (
1467
    InitCluster, [ArgHost(min=1, max=1)],
1468
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1469
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1470
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1471
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1472
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1473
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1474
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1475
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + INSTANCE_POLICY_OPTS,
1476
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1477
  "destroy": (
1478
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1479
    "", "Destroy cluster"),
1480
  "rename": (
1481
    RenameCluster, [ArgHost(min=1, max=1)],
1482
    [FORCE_OPT, DRY_RUN_OPT],
1483
    "<new_name>",
1484
    "Renames the cluster"),
1485
  "redist-conf": (
1486
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1487
    "", "Forces a push of the configuration file and ssconf files"
1488
    " to the nodes in the cluster"),
1489
  "verify": (
1490
    VerifyCluster, ARGS_NONE,
1491
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1492
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1493
    "", "Does a check on the cluster configuration"),
1494
  "verify-disks": (
1495
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1496
    "", "Does a check on the cluster disk status"),
1497
  "repair-disk-sizes": (
1498
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1499
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1500
  "master-failover": (
1501
    MasterFailover, ARGS_NONE, [NOVOTING_OPT],
1502
    "", "Makes the current node the master"),
1503
  "master-ping": (
1504
    MasterPing, ARGS_NONE, [],
1505
    "", "Checks if the master is alive"),
1506
  "version": (
1507
    ShowClusterVersion, ARGS_NONE, [],
1508
    "", "Shows the cluster version"),
1509
  "getmaster": (
1510
    ShowClusterMaster, ARGS_NONE, [],
1511
    "", "Shows the cluster master"),
1512
  "copyfile": (
1513
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1514
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1515
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1516
  "command": (
1517
    RunClusterCommand, [ArgCommand(min=1)],
1518
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT],
1519
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1520
  "info": (
1521
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1522
    "[--roman]", "Show cluster configuration"),
1523
  "list-tags": (
1524
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1525
  "add-tags": (
1526
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1527
    "tag...", "Add tags to the cluster"),
1528
  "remove-tags": (
1529
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1530
    "tag...", "Remove tags from the cluster"),
1531
  "search-tags": (
1532
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1533
    "Searches the tags on all objects on"
1534
    " the cluster for a given pattern (regex)"),
1535
  "queue": (
1536
    QueueOps,
1537
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1538
    [], "drain|undrain|info", "Change queue properties"),
1539
  "watcher": (
1540
    WatcherOps,
1541
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1542
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1543
    [],
1544
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1545
  "modify": (
1546
    SetClusterParams, ARGS_NONE,
1547
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1548
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1549
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1550
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1551
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1552
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1553
     DISK_STATE_OPT] +
1554
    INSTANCE_POLICY_OPTS,
1555
    "[opts...]",
1556
    "Alters the parameters of the cluster"),
1557
  "renew-crypto": (
1558
    RenewCrypto, ARGS_NONE,
1559
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1560
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1561
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1562
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1563
    "[opts...]",
1564
    "Renews cluster certificates, keys and secrets"),
1565
  "epo": (
1566
    Epo, [ArgUnknown()],
1567
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1568
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1569
    "[opts...] [args]",
1570
    "Performs an emergency power-off on given args"),
1571
  "activate-master-ip": (
1572
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1573
  "deactivate-master-ip": (
1574
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1575
    "Deactivates the master IP"),
1576
  }
1577

    
1578

    
1579
#: dictionary with aliases for commands
1580
aliases = {
1581
  "masterfailover": "master-failover",
1582
  "show": "info",
1583
}
1584

    
1585

    
1586
def Main():
1587
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1588
                     aliases=aliases)