Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ c270ee07

History | View | Annotate | Download (51.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45
from ganeti import pathutils
46

    
47

    
48
ON_OPT = cli_option("--on", default=False,
49
                    action="store_true", dest="on",
50
                    help="Recover from an EPO")
51

    
52
GROUPS_OPT = cli_option("--groups", default=False,
53
                        action="store_true", dest="groups",
54
                        help="Arguments are node groups instead of nodes")
55

    
56
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
57
                            help="Override interactive check for --no-voting",
58
                            default=False, action="store_true")
59

    
60
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
61
_EPO_PING_TIMEOUT = 1 # 1 second
62
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
63

    
64

    
65
@UsesRPC
66
def InitCluster(opts, args):
67
  """Initialize the cluster.
68

69
  @param opts: the command line options selected by the user
70
  @type args: list
71
  @param args: should contain only one element, the desired
72
      cluster name
73
  @rtype: int
74
  @return: the desired exit code
75

76
  """
77
  if not opts.lvm_storage and opts.vg_name:
78
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
79
    return 1
80

    
81
  vg_name = opts.vg_name
82
  if opts.lvm_storage and not opts.vg_name:
83
    vg_name = constants.DEFAULT_VG
84

    
85
  if not opts.drbd_storage and opts.drbd_helper:
86
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
87
    return 1
88

    
89
  drbd_helper = opts.drbd_helper
90
  if opts.drbd_storage and not opts.drbd_helper:
91
    drbd_helper = constants.DEFAULT_DRBD_HELPER
92

    
93
  master_netdev = opts.master_netdev
94
  if master_netdev is None:
95
    master_netdev = constants.DEFAULT_BRIDGE
96

    
97
  hvlist = opts.enabled_hypervisors
98
  if hvlist is None:
99
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
100
  hvlist = hvlist.split(",")
101

    
102
  hvparams = dict(opts.hvparams)
103
  beparams = opts.beparams
104
  nicparams = opts.nicparams
105

    
106
  diskparams = dict(opts.diskparams)
107

    
108
  # check the disk template types here, as we cannot rely on the type check done
109
  # by the opcode parameter types
110
  diskparams_keys = set(diskparams.keys())
111
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
112
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
113
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
114
    return 1
115

    
116
  # prepare beparams dict
117
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
118
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
119

    
120
  # prepare nicparams dict
121
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
122
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
123

    
124
  # prepare ndparams dict
125
  if opts.ndparams is None:
126
    ndparams = dict(constants.NDC_DEFAULTS)
127
  else:
128
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
129
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
130

    
131
  # prepare hvparams dict
132
  for hv in constants.HYPER_TYPES:
133
    if hv not in hvparams:
134
      hvparams[hv] = {}
135
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
136
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
137

    
138
  # prepare diskparams dict
139
  for templ in constants.DISK_TEMPLATES:
140
    if templ not in diskparams:
141
      diskparams[templ] = {}
142
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
143
                                         diskparams[templ])
144
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
145

    
146
  # prepare ipolicy dict
147
  ipolicy_raw = CreateIPolicyFromOpts(
148
    ispecs_mem_size=opts.ispecs_mem_size,
149
    ispecs_cpu_count=opts.ispecs_cpu_count,
150
    ispecs_disk_count=opts.ispecs_disk_count,
151
    ispecs_disk_size=opts.ispecs_disk_size,
152
    ispecs_nic_count=opts.ispecs_nic_count,
153
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
154
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
155
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
156
    fill_all=True)
157
  ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_raw)
158

    
159
  if opts.candidate_pool_size is None:
160
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
161

    
162
  if opts.mac_prefix is None:
163
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
164

    
165
  uid_pool = opts.uid_pool
166
  if uid_pool is not None:
167
    uid_pool = uidpool.ParseUidPool(uid_pool)
168

    
169
  if opts.prealloc_wipe_disks is None:
170
    opts.prealloc_wipe_disks = False
171

    
172
  external_ip_setup_script = opts.use_external_mip_script
173
  if external_ip_setup_script is None:
174
    external_ip_setup_script = False
175

    
176
  try:
177
    primary_ip_version = int(opts.primary_ip_version)
178
  except (ValueError, TypeError), err:
179
    ToStderr("Invalid primary ip version value: %s" % str(err))
180
    return 1
181

    
182
  master_netmask = opts.master_netmask
183
  try:
184
    if master_netmask is not None:
185
      master_netmask = int(master_netmask)
186
  except (ValueError, TypeError), err:
187
    ToStderr("Invalid master netmask value: %s" % str(err))
188
    return 1
189

    
190
  if opts.disk_state:
191
    disk_state = utils.FlatToDict(opts.disk_state)
192
  else:
193
    disk_state = {}
194

    
195
  hv_state = dict(opts.hv_state)
196

    
197
  bootstrap.InitCluster(cluster_name=args[0],
198
                        secondary_ip=opts.secondary_ip,
199
                        vg_name=vg_name,
200
                        mac_prefix=opts.mac_prefix,
201
                        master_netmask=master_netmask,
202
                        master_netdev=master_netdev,
203
                        file_storage_dir=opts.file_storage_dir,
204
                        shared_file_storage_dir=opts.shared_file_storage_dir,
205
                        enabled_hypervisors=hvlist,
206
                        hvparams=hvparams,
207
                        beparams=beparams,
208
                        nicparams=nicparams,
209
                        ndparams=ndparams,
210
                        diskparams=diskparams,
211
                        ipolicy=ipolicy,
212
                        candidate_pool_size=opts.candidate_pool_size,
213
                        modify_etc_hosts=opts.modify_etc_hosts,
214
                        modify_ssh_setup=opts.modify_ssh_setup,
215
                        maintain_node_health=opts.maintain_node_health,
216
                        drbd_helper=drbd_helper,
217
                        uid_pool=uid_pool,
218
                        default_iallocator=opts.default_iallocator,
219
                        primary_ip_version=primary_ip_version,
220
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
221
                        use_external_mip_script=external_ip_setup_script,
222
                        hv_state=hv_state,
223
                        disk_state=disk_state,
224
                        )
225
  op = opcodes.OpClusterPostInit()
226
  SubmitOpCode(op, opts=opts)
227
  return 0
228

    
229

    
230
@UsesRPC
231
def DestroyCluster(opts, args):
232
  """Destroy the cluster.
233

234
  @param opts: the command line options selected by the user
235
  @type args: list
236
  @param args: should be an empty list
237
  @rtype: int
238
  @return: the desired exit code
239

240
  """
241
  if not opts.yes_do_it:
242
    ToStderr("Destroying a cluster is irreversible. If you really want"
243
             " destroy this cluster, supply the --yes-do-it option.")
244
    return 1
245

    
246
  op = opcodes.OpClusterDestroy()
247
  master = SubmitOpCode(op, opts=opts)
248
  # if we reached this, the opcode didn't fail; we can proceed to
249
  # shutdown all the daemons
250
  bootstrap.FinalizeClusterDestroy(master)
251
  return 0
252

    
253

    
254
def RenameCluster(opts, args):
255
  """Rename the cluster.
256

257
  @param opts: the command line options selected by the user
258
  @type args: list
259
  @param args: should contain only one element, the new cluster name
260
  @rtype: int
261
  @return: the desired exit code
262

263
  """
264
  cl = GetClient()
265

    
266
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
267

    
268
  new_name = args[0]
269
  if not opts.force:
270
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
271
                " connected over the network to the cluster name, the"
272
                " operation is very dangerous as the IP address will be"
273
                " removed from the node and the change may not go through."
274
                " Continue?") % (cluster_name, new_name)
275
    if not AskUser(usertext):
276
      return 1
277

    
278
  op = opcodes.OpClusterRename(name=new_name)
279
  result = SubmitOpCode(op, opts=opts, cl=cl)
280

    
281
  if result:
282
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
283

    
284
  return 0
285

    
286

    
287
def ActivateMasterIp(opts, args):
288
  """Activates the master IP.
289

290
  """
291
  op = opcodes.OpClusterActivateMasterIp()
292
  SubmitOpCode(op)
293
  return 0
294

    
295

    
296
def DeactivateMasterIp(opts, args):
297
  """Deactivates the master IP.
298

299
  """
300
  if not opts.confirm:
301
    usertext = ("This will disable the master IP. All the open connections to"
302
                " the master IP will be closed. To reach the master you will"
303
                " need to use its node IP."
304
                " Continue?")
305
    if not AskUser(usertext):
306
      return 1
307

    
308
  op = opcodes.OpClusterDeactivateMasterIp()
309
  SubmitOpCode(op)
310
  return 0
311

    
312

    
313
def RedistributeConfig(opts, args):
314
  """Forces push of the cluster configuration.
315

316
  @param opts: the command line options selected by the user
317
  @type args: list
318
  @param args: empty list
319
  @rtype: int
320
  @return: the desired exit code
321

322
  """
323
  op = opcodes.OpClusterRedistConf()
324
  SubmitOrSend(op, opts)
325
  return 0
326

    
327

    
328
def ShowClusterVersion(opts, args):
329
  """Write version of ganeti software to the standard output.
330

331
  @param opts: the command line options selected by the user
332
  @type args: list
333
  @param args: should be an empty list
334
  @rtype: int
335
  @return: the desired exit code
336

337
  """
338
  cl = GetClient(query=True)
339
  result = cl.QueryClusterInfo()
340
  ToStdout("Software version: %s", result["software_version"])
341
  ToStdout("Internode protocol: %s", result["protocol_version"])
342
  ToStdout("Configuration format: %s", result["config_version"])
343
  ToStdout("OS api version: %s", result["os_api_version"])
344
  ToStdout("Export interface: %s", result["export_version"])
345
  return 0
346

    
347

    
348
def ShowClusterMaster(opts, args):
349
  """Write name of master node to the standard output.
350

351
  @param opts: the command line options selected by the user
352
  @type args: list
353
  @param args: should be an empty list
354
  @rtype: int
355
  @return: the desired exit code
356

357
  """
358
  master = bootstrap.GetMaster()
359
  ToStdout(master)
360
  return 0
361

    
362

    
363
def _PrintGroupedParams(paramsdict, level=1, roman=False):
364
  """Print Grouped parameters (be, nic, disk) by group.
365

366
  @type paramsdict: dict of dicts
367
  @param paramsdict: {group: {param: value, ...}, ...}
368
  @type level: int
369
  @param level: Level of indention
370

371
  """
372
  indent = "  " * level
373
  for item, val in sorted(paramsdict.items()):
374
    if isinstance(val, dict):
375
      ToStdout("%s- %s:", indent, item)
376
      _PrintGroupedParams(val, level=level + 1, roman=roman)
377
    elif roman and isinstance(val, int):
378
      ToStdout("%s  %s: %s", indent, item, compat.TryToRoman(val))
379
    else:
380
      ToStdout("%s  %s: %s", indent, item, val)
381

    
382

    
383
def ShowClusterConfig(opts, args):
384
  """Shows cluster information.
385

386
  @param opts: the command line options selected by the user
387
  @type args: list
388
  @param args: should be an empty list
389
  @rtype: int
390
  @return: the desired exit code
391

392
  """
393
  cl = GetClient(query=True)
394
  result = cl.QueryClusterInfo()
395

    
396
  ToStdout("Cluster name: %s", result["name"])
397
  ToStdout("Cluster UUID: %s", result["uuid"])
398

    
399
  ToStdout("Creation time: %s", utils.FormatTime(result["ctime"]))
400
  ToStdout("Modification time: %s", utils.FormatTime(result["mtime"]))
401

    
402
  ToStdout("Master node: %s", result["master"])
403

    
404
  ToStdout("Architecture (this node): %s (%s)",
405
           result["architecture"][0], result["architecture"][1])
406

    
407
  if result["tags"]:
408
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
409
  else:
410
    tags = "(none)"
411

    
412
  ToStdout("Tags: %s", tags)
413

    
414
  ToStdout("Default hypervisor: %s", result["default_hypervisor"])
415
  ToStdout("Enabled hypervisors: %s",
416
           utils.CommaJoin(result["enabled_hypervisors"]))
417

    
418
  ToStdout("Hypervisor parameters:")
419
  _PrintGroupedParams(result["hvparams"])
420

    
421
  ToStdout("OS-specific hypervisor parameters:")
422
  _PrintGroupedParams(result["os_hvp"])
423

    
424
  ToStdout("OS parameters:")
425
  _PrintGroupedParams(result["osparams"])
426

    
427
  ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"]))
428
  ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"]))
429

    
430
  ToStdout("Cluster parameters:")
431
  ToStdout("  - candidate pool size: %s",
432
            compat.TryToRoman(result["candidate_pool_size"],
433
                              convert=opts.roman_integers))
434
  ToStdout("  - master netdev: %s", result["master_netdev"])
435
  ToStdout("  - master netmask: %s", result["master_netmask"])
436
  ToStdout("  - use external master IP address setup script: %s",
437
           result["use_external_mip_script"])
438
  ToStdout("  - lvm volume group: %s", result["volume_group_name"])
439
  if result["reserved_lvs"]:
440
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
441
  else:
442
    reserved_lvs = "(none)"
443
  ToStdout("  - lvm reserved volumes: %s", reserved_lvs)
444
  ToStdout("  - drbd usermode helper: %s", result["drbd_usermode_helper"])
445
  ToStdout("  - file storage path: %s", result["file_storage_dir"])
446
  ToStdout("  - shared file storage path: %s",
447
           result["shared_file_storage_dir"])
448
  ToStdout("  - maintenance of node health: %s",
449
           result["maintain_node_health"])
450
  ToStdout("  - uid pool: %s", uidpool.FormatUidPool(result["uid_pool"]))
451
  ToStdout("  - default instance allocator: %s", result["default_iallocator"])
452
  ToStdout("  - primary ip version: %d", result["primary_ip_version"])
453
  ToStdout("  - preallocation wipe disks: %s", result["prealloc_wipe_disks"])
454
  ToStdout("  - OS search path: %s", utils.CommaJoin(pathutils.OS_SEARCH_PATH))
455
  ToStdout("  - ExtStorage Providers search path: %s",
456
           utils.CommaJoin(pathutils.ES_SEARCH_PATH))
457
  ToStdout("  - enabled storage types: %s",
458
           utils.CommaJoin(result["enabled_storage_types"]))
459

    
460
  ToStdout("Default node parameters:")
461
  _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers)
462

    
463
  ToStdout("Default instance parameters:")
464
  _PrintGroupedParams(result["beparams"], roman=opts.roman_integers)
465

    
466
  ToStdout("Default nic parameters:")
467
  _PrintGroupedParams(result["nicparams"], roman=opts.roman_integers)
468

    
469
  ToStdout("Default disk parameters:")
470
  _PrintGroupedParams(result["diskparams"], roman=opts.roman_integers)
471

    
472
  ToStdout("Instance policy - limits for instances:")
473
  for key in constants.IPOLICY_ISPECS:
474
    ToStdout("  - %s", key)
475
    _PrintGroupedParams(result["ipolicy"][key], roman=opts.roman_integers)
476
  ToStdout("  - enabled disk templates: %s",
477
           utils.CommaJoin(result["ipolicy"][constants.IPOLICY_DTS]))
478
  for key in constants.IPOLICY_PARAMETERS:
479
    ToStdout("  - %s: %s", key, result["ipolicy"][key])
480

    
481
  return 0
482

    
483

    
484
def ClusterCopyFile(opts, args):
485
  """Copy a file from master to some nodes.
486

487
  @param opts: the command line options selected by the user
488
  @type args: list
489
  @param args: should contain only one element, the path of
490
      the file to be copied
491
  @rtype: int
492
  @return: the desired exit code
493

494
  """
495
  filename = args[0]
496
  if not os.path.exists(filename):
497
    raise errors.OpPrereqError("No such filename '%s'" % filename,
498
                               errors.ECODE_INVAL)
499

    
500
  cl = GetClient()
501

    
502
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
503

    
504
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
505
                           secondary_ips=opts.use_replication_network,
506
                           nodegroup=opts.nodegroup)
507

    
508
  srun = ssh.SshRunner(cluster_name)
509
  for node in results:
510
    if not srun.CopyFileToNode(node, filename):
511
      ToStderr("Copy of file %s to node %s failed", filename, node)
512

    
513
  return 0
514

    
515

    
516
def RunClusterCommand(opts, args):
517
  """Run a command on some nodes.
518

519
  @param opts: the command line options selected by the user
520
  @type args: list
521
  @param args: should contain the command to be run and its arguments
522
  @rtype: int
523
  @return: the desired exit code
524

525
  """
526
  cl = GetClient()
527

    
528
  command = " ".join(args)
529

    
530
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
531

    
532
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
533
                                                    "master_node"])
534

    
535
  srun = ssh.SshRunner(cluster_name=cluster_name)
536

    
537
  # Make sure master node is at list end
538
  if master_node in nodes:
539
    nodes.remove(master_node)
540
    nodes.append(master_node)
541

    
542
  for name in nodes:
543
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
544

    
545
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
546
      # Do not output anything for successful commands
547
      continue
548

    
549
    ToStdout("------------------------------------------------")
550
    if opts.show_machine_names:
551
      for line in result.output.splitlines():
552
        ToStdout("%s: %s", name, line)
553
    else:
554
      ToStdout("node: %s", name)
555
      ToStdout("%s", result.output)
556
    ToStdout("return code = %s", result.exit_code)
557

    
558
  return 0
559

    
560

    
561
def VerifyCluster(opts, args):
562
  """Verify integrity of cluster, performing various test on nodes.
563

564
  @param opts: the command line options selected by the user
565
  @type args: list
566
  @param args: should be an empty list
567
  @rtype: int
568
  @return: the desired exit code
569

570
  """
571
  skip_checks = []
572

    
573
  if opts.skip_nplusone_mem:
574
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
575

    
576
  cl = GetClient()
577

    
578
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
579
                               error_codes=opts.error_codes,
580
                               debug_simulate_errors=opts.simulate_errors,
581
                               skip_checks=skip_checks,
582
                               ignore_errors=opts.ignore_errors,
583
                               group_name=opts.nodegroup)
584
  result = SubmitOpCode(op, cl=cl, opts=opts)
585

    
586
  # Keep track of submitted jobs
587
  jex = JobExecutor(cl=cl, opts=opts)
588

    
589
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
590
    jex.AddJobId(None, status, job_id)
591

    
592
  results = jex.GetResults()
593

    
594
  (bad_jobs, bad_results) = \
595
    map(len,
596
        # Convert iterators to lists
597
        map(list,
598
            # Count errors
599
            map(compat.partial(itertools.ifilterfalse, bool),
600
                # Convert result to booleans in a tuple
601
                zip(*((job_success, len(op_results) == 1 and op_results[0])
602
                      for (job_success, op_results) in results)))))
603

    
604
  if bad_jobs == 0 and bad_results == 0:
605
    rcode = constants.EXIT_SUCCESS
606
  else:
607
    rcode = constants.EXIT_FAILURE
608
    if bad_jobs > 0:
609
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
610

    
611
  return rcode
612

    
613

    
614
def VerifyDisks(opts, args):
615
  """Verify integrity of cluster disks.
616

617
  @param opts: the command line options selected by the user
618
  @type args: list
619
  @param args: should be an empty list
620
  @rtype: int
621
  @return: the desired exit code
622

623
  """
624
  cl = GetClient()
625

    
626
  op = opcodes.OpClusterVerifyDisks()
627

    
628
  result = SubmitOpCode(op, cl=cl, opts=opts)
629

    
630
  # Keep track of submitted jobs
631
  jex = JobExecutor(cl=cl, opts=opts)
632

    
633
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
634
    jex.AddJobId(None, status, job_id)
635

    
636
  retcode = constants.EXIT_SUCCESS
637

    
638
  for (status, result) in jex.GetResults():
639
    if not status:
640
      ToStdout("Job failed: %s", result)
641
      continue
642

    
643
    ((bad_nodes, instances, missing), ) = result
644

    
645
    for node, text in bad_nodes.items():
646
      ToStdout("Error gathering data on node %s: %s",
647
               node, utils.SafeEncode(text[-400:]))
648
      retcode = constants.EXIT_FAILURE
649
      ToStdout("You need to fix these nodes first before fixing instances")
650

    
651
    for iname in instances:
652
      if iname in missing:
653
        continue
654
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
655
      try:
656
        ToStdout("Activating disks for instance '%s'", iname)
657
        SubmitOpCode(op, opts=opts, cl=cl)
658
      except errors.GenericError, err:
659
        nret, msg = FormatError(err)
660
        retcode |= nret
661
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
662

    
663
    if missing:
664
      for iname, ival in missing.iteritems():
665
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
666
        if all_missing:
667
          ToStdout("Instance %s cannot be verified as it lives on"
668
                   " broken nodes", iname)
669
        else:
670
          ToStdout("Instance %s has missing logical volumes:", iname)
671
          ival.sort()
672
          for node, vol in ival:
673
            if node in bad_nodes:
674
              ToStdout("\tbroken node %s /dev/%s", node, vol)
675
            else:
676
              ToStdout("\t%s /dev/%s", node, vol)
677

    
678
      ToStdout("You need to replace or recreate disks for all the above"
679
               " instances if this message persists after fixing broken nodes.")
680
      retcode = constants.EXIT_FAILURE
681
    elif not instances:
682
      ToStdout("No disks need to be activated.")
683

    
684
  return retcode
685

    
686

    
687
def RepairDiskSizes(opts, args):
688
  """Verify sizes of cluster disks.
689

690
  @param opts: the command line options selected by the user
691
  @type args: list
692
  @param args: optional list of instances to restrict check to
693
  @rtype: int
694
  @return: the desired exit code
695

696
  """
697
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
698
  SubmitOpCode(op, opts=opts)
699

    
700

    
701
@UsesRPC
702
def MasterFailover(opts, args):
703
  """Failover the master node.
704

705
  This command, when run on a non-master node, will cause the current
706
  master to cease being master, and the non-master to become new
707
  master.
708

709
  @param opts: the command line options selected by the user
710
  @type args: list
711
  @param args: should be an empty list
712
  @rtype: int
713
  @return: the desired exit code
714

715
  """
716
  if opts.no_voting and not opts.yes_do_it:
717
    usertext = ("This will perform the failover even if most other nodes"
718
                " are down, or if this node is outdated. This is dangerous"
719
                " as it can lead to a non-consistent cluster. Check the"
720
                " gnt-cluster(8) man page before proceeding. Continue?")
721
    if not AskUser(usertext):
722
      return 1
723

    
724
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
725

    
726

    
727
def MasterPing(opts, args):
728
  """Checks if the master is alive.
729

730
  @param opts: the command line options selected by the user
731
  @type args: list
732
  @param args: should be an empty list
733
  @rtype: int
734
  @return: the desired exit code
735

736
  """
737
  try:
738
    cl = GetClient()
739
    cl.QueryClusterInfo()
740
    return 0
741
  except Exception: # pylint: disable=W0703
742
    return 1
743

    
744

    
745
def SearchTags(opts, args):
746
  """Searches the tags on all the cluster.
747

748
  @param opts: the command line options selected by the user
749
  @type args: list
750
  @param args: should contain only one element, the tag pattern
751
  @rtype: int
752
  @return: the desired exit code
753

754
  """
755
  op = opcodes.OpTagsSearch(pattern=args[0])
756
  result = SubmitOpCode(op, opts=opts)
757
  if not result:
758
    return 1
759
  result = list(result)
760
  result.sort()
761
  for path, tag in result:
762
    ToStdout("%s %s", path, tag)
763

    
764

    
765
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
766
  """Reads and verifies an X509 certificate.
767

768
  @type cert_filename: string
769
  @param cert_filename: the path of the file containing the certificate to
770
                        verify encoded in PEM format
771
  @type verify_private_key: bool
772
  @param verify_private_key: whether to verify the private key in addition to
773
                             the public certificate
774
  @rtype: string
775
  @return: a string containing the PEM-encoded certificate.
776

777
  """
778
  try:
779
    pem = utils.ReadFile(cert_filename)
780
  except IOError, err:
781
    raise errors.X509CertError(cert_filename,
782
                               "Unable to read certificate: %s" % str(err))
783

    
784
  try:
785
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
786
  except Exception, err:
787
    raise errors.X509CertError(cert_filename,
788
                               "Unable to load certificate: %s" % str(err))
789

    
790
  if verify_private_key:
791
    try:
792
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
793
    except Exception, err:
794
      raise errors.X509CertError(cert_filename,
795
                                 "Unable to load private key: %s" % str(err))
796

    
797
  return pem
798

    
799

    
800
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
801
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
802
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
803
                 cds_filename, force):
804
  """Renews cluster certificates, keys and secrets.
805

806
  @type new_cluster_cert: bool
807
  @param new_cluster_cert: Whether to generate a new cluster certificate
808
  @type new_rapi_cert: bool
809
  @param new_rapi_cert: Whether to generate a new RAPI certificate
810
  @type rapi_cert_filename: string
811
  @param rapi_cert_filename: Path to file containing new RAPI certificate
812
  @type new_spice_cert: bool
813
  @param new_spice_cert: Whether to generate a new SPICE certificate
814
  @type spice_cert_filename: string
815
  @param spice_cert_filename: Path to file containing new SPICE certificate
816
  @type spice_cacert_filename: string
817
  @param spice_cacert_filename: Path to file containing the certificate of the
818
                                CA that signed the SPICE certificate
819
  @type new_confd_hmac_key: bool
820
  @param new_confd_hmac_key: Whether to generate a new HMAC key
821
  @type new_cds: bool
822
  @param new_cds: Whether to generate a new cluster domain secret
823
  @type cds_filename: string
824
  @param cds_filename: Path to file containing new cluster domain secret
825
  @type force: bool
826
  @param force: Whether to ask user for confirmation
827

828
  """
829
  if new_rapi_cert and rapi_cert_filename:
830
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
831
             " options can be specified at the same time.")
832
    return 1
833

    
834
  if new_cds and cds_filename:
835
    ToStderr("Only one of the --new-cluster-domain-secret and"
836
             " --cluster-domain-secret options can be specified at"
837
             " the same time.")
838
    return 1
839

    
840
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
841
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
842
             " and --spice-ca-certificate must not be used.")
843
    return 1
844

    
845
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
846
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
847
             " specified.")
848
    return 1
849

    
850
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
851
  try:
852
    if rapi_cert_filename:
853
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
854
    if spice_cert_filename:
855
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
856
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
857
  except errors.X509CertError, err:
858
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
859
    return 1
860

    
861
  if cds_filename:
862
    try:
863
      cds = utils.ReadFile(cds_filename)
864
    except Exception, err: # pylint: disable=W0703
865
      ToStderr("Can't load new cluster domain secret from %s: %s" %
866
               (cds_filename, str(err)))
867
      return 1
868
  else:
869
    cds = None
870

    
871
  if not force:
872
    usertext = ("This requires all daemons on all nodes to be restarted and"
873
                " may take some time. Continue?")
874
    if not AskUser(usertext):
875
      return 1
876

    
877
  def _RenewCryptoInner(ctx):
878
    ctx.feedback_fn("Updating certificates and keys")
879
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
880
                                    new_rapi_cert,
881
                                    new_spice_cert,
882
                                    new_confd_hmac_key,
883
                                    new_cds,
884
                                    rapi_cert_pem=rapi_cert_pem,
885
                                    spice_cert_pem=spice_cert_pem,
886
                                    spice_cacert_pem=spice_cacert_pem,
887
                                    cds=cds)
888

    
889
    files_to_copy = []
890

    
891
    if new_cluster_cert:
892
      files_to_copy.append(pathutils.NODED_CERT_FILE)
893

    
894
    if new_rapi_cert or rapi_cert_pem:
895
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
896

    
897
    if new_spice_cert or spice_cert_pem:
898
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
899
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
900

    
901
    if new_confd_hmac_key:
902
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
903

    
904
    if new_cds or cds:
905
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
906

    
907
    if files_to_copy:
908
      for node_name in ctx.nonmaster_nodes:
909
        ctx.feedback_fn("Copying %s to %s" %
910
                        (", ".join(files_to_copy), node_name))
911
        for file_name in files_to_copy:
912
          ctx.ssh.CopyFileToNode(node_name, file_name)
913

    
914
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
915

    
916
  ToStdout("All requested certificates and keys have been replaced."
917
           " Running \"gnt-cluster verify\" now is recommended.")
918

    
919
  return 0
920

    
921

    
922
def RenewCrypto(opts, args):
923
  """Renews cluster certificates, keys and secrets.
924

925
  """
926
  return _RenewCrypto(opts.new_cluster_cert,
927
                      opts.new_rapi_cert,
928
                      opts.rapi_cert,
929
                      opts.new_spice_cert,
930
                      opts.spice_cert,
931
                      opts.spice_cacert,
932
                      opts.new_confd_hmac_key,
933
                      opts.new_cluster_domain_secret,
934
                      opts.cluster_domain_secret,
935
                      opts.force)
936

    
937

    
938
def SetClusterParams(opts, args):
939
  """Modify the cluster.
940

941
  @param opts: the command line options selected by the user
942
  @type args: list
943
  @param args: should be an empty list
944
  @rtype: int
945
  @return: the desired exit code
946

947
  """
948
  if not (not opts.lvm_storage or opts.vg_name or
949
          not opts.drbd_storage or opts.drbd_helper or
950
          opts.enabled_hypervisors or opts.hvparams or
951
          opts.beparams or opts.nicparams or
952
          opts.ndparams or opts.diskparams or
953
          opts.candidate_pool_size is not None or
954
          opts.uid_pool is not None or
955
          opts.maintain_node_health is not None or
956
          opts.add_uids is not None or
957
          opts.remove_uids is not None or
958
          opts.default_iallocator is not None or
959
          opts.reserved_lvs is not None or
960
          opts.master_netdev is not None or
961
          opts.master_netmask is not None or
962
          opts.use_external_mip_script is not None or
963
          opts.prealloc_wipe_disks is not None or
964
          opts.hv_state or
965
          opts.enabled_storage_types or
966
          opts.disk_state or
967
          opts.ispecs_mem_size or
968
          opts.ispecs_cpu_count or
969
          opts.ispecs_disk_count or
970
          opts.ispecs_disk_size or
971
          opts.ispecs_nic_count or
972
          opts.ipolicy_disk_templates is not None or
973
          opts.ipolicy_vcpu_ratio is not None or
974
          opts.ipolicy_spindle_ratio is not None):
975
    ToStderr("Please give at least one of the parameters.")
976
    return 1
977

    
978
  vg_name = opts.vg_name
979
  if not opts.lvm_storage and opts.vg_name:
980
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
981
    return 1
982

    
983
  if not opts.lvm_storage:
984
    vg_name = ""
985

    
986
  drbd_helper = opts.drbd_helper
987
  if not opts.drbd_storage and opts.drbd_helper:
988
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
989
    return 1
990

    
991
  if not opts.drbd_storage:
992
    drbd_helper = ""
993

    
994
  hvlist = opts.enabled_hypervisors
995
  if hvlist is not None:
996
    hvlist = hvlist.split(",")
997

    
998
  enabled_storage_types = opts.enabled_storage_types
999
  if enabled_storage_types is not None:
1000
    enabled_storage_types = enabled_storage_types.split(",")
1001

    
1002
  # a list of (name, dict) we can pass directly to dict() (or [])
1003
  hvparams = dict(opts.hvparams)
1004
  for hv_params in hvparams.values():
1005
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1006

    
1007
  diskparams = dict(opts.diskparams)
1008

    
1009
  for dt_params in diskparams.values():
1010
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1011

    
1012
  beparams = opts.beparams
1013
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1014

    
1015
  nicparams = opts.nicparams
1016
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1017

    
1018
  ndparams = opts.ndparams
1019
  if ndparams is not None:
1020
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1021

    
1022
  ipolicy = CreateIPolicyFromOpts(
1023
    ispecs_mem_size=opts.ispecs_mem_size,
1024
    ispecs_cpu_count=opts.ispecs_cpu_count,
1025
    ispecs_disk_count=opts.ispecs_disk_count,
1026
    ispecs_disk_size=opts.ispecs_disk_size,
1027
    ispecs_nic_count=opts.ispecs_nic_count,
1028
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1029
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1030
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1031
    )
1032

    
1033
  mnh = opts.maintain_node_health
1034

    
1035
  uid_pool = opts.uid_pool
1036
  if uid_pool is not None:
1037
    uid_pool = uidpool.ParseUidPool(uid_pool)
1038

    
1039
  add_uids = opts.add_uids
1040
  if add_uids is not None:
1041
    add_uids = uidpool.ParseUidPool(add_uids)
1042

    
1043
  remove_uids = opts.remove_uids
1044
  if remove_uids is not None:
1045
    remove_uids = uidpool.ParseUidPool(remove_uids)
1046

    
1047
  if opts.reserved_lvs is not None:
1048
    if opts.reserved_lvs == "":
1049
      opts.reserved_lvs = []
1050
    else:
1051
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1052

    
1053
  if opts.master_netmask is not None:
1054
    try:
1055
      opts.master_netmask = int(opts.master_netmask)
1056
    except ValueError:
1057
      ToStderr("The --master-netmask option expects an int parameter.")
1058
      return 1
1059

    
1060
  ext_ip_script = opts.use_external_mip_script
1061

    
1062
  if opts.disk_state:
1063
    disk_state = utils.FlatToDict(opts.disk_state)
1064
  else:
1065
    disk_state = {}
1066

    
1067
  hv_state = dict(opts.hv_state)
1068

    
1069
  op = opcodes.OpClusterSetParams(
1070
    vg_name=vg_name,
1071
    drbd_helper=drbd_helper,
1072
    enabled_hypervisors=hvlist,
1073
    hvparams=hvparams,
1074
    os_hvp=None,
1075
    beparams=beparams,
1076
    nicparams=nicparams,
1077
    ndparams=ndparams,
1078
    diskparams=diskparams,
1079
    ipolicy=ipolicy,
1080
    candidate_pool_size=opts.candidate_pool_size,
1081
    maintain_node_health=mnh,
1082
    uid_pool=uid_pool,
1083
    add_uids=add_uids,
1084
    remove_uids=remove_uids,
1085
    default_iallocator=opts.default_iallocator,
1086
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1087
    master_netdev=opts.master_netdev,
1088
    master_netmask=opts.master_netmask,
1089
    reserved_lvs=opts.reserved_lvs,
1090
    use_external_mip_script=ext_ip_script,
1091
    hv_state=hv_state,
1092
    disk_state=disk_state,
1093
    enabled_storage_types=enabled_storage_types,
1094
    )
1095
  SubmitOrSend(op, opts)
1096
  return 0
1097

    
1098

    
1099
def QueueOps(opts, args):
1100
  """Queue operations.
1101

1102
  @param opts: the command line options selected by the user
1103
  @type args: list
1104
  @param args: should contain only one element, the subcommand
1105
  @rtype: int
1106
  @return: the desired exit code
1107

1108
  """
1109
  command = args[0]
1110
  client = GetClient()
1111
  if command in ("drain", "undrain"):
1112
    drain_flag = command == "drain"
1113
    client.SetQueueDrainFlag(drain_flag)
1114
  elif command == "info":
1115
    result = client.QueryConfigValues(["drain_flag"])
1116
    if result[0]:
1117
      val = "set"
1118
    else:
1119
      val = "unset"
1120
    ToStdout("The drain flag is %s" % val)
1121
  else:
1122
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1123
                               errors.ECODE_INVAL)
1124

    
1125
  return 0
1126

    
1127

    
1128
def _ShowWatcherPause(until):
1129
  if until is None or until < time.time():
1130
    ToStdout("The watcher is not paused.")
1131
  else:
1132
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1133

    
1134

    
1135
def WatcherOps(opts, args):
1136
  """Watcher operations.
1137

1138
  @param opts: the command line options selected by the user
1139
  @type args: list
1140
  @param args: should contain only one element, the subcommand
1141
  @rtype: int
1142
  @return: the desired exit code
1143

1144
  """
1145
  command = args[0]
1146
  client = GetClient()
1147

    
1148
  if command == "continue":
1149
    client.SetWatcherPause(None)
1150
    ToStdout("The watcher is no longer paused.")
1151

    
1152
  elif command == "pause":
1153
    if len(args) < 2:
1154
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1155

    
1156
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1157
    _ShowWatcherPause(result)
1158

    
1159
  elif command == "info":
1160
    result = client.QueryConfigValues(["watcher_pause"])
1161
    _ShowWatcherPause(result[0])
1162

    
1163
  else:
1164
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1165
                               errors.ECODE_INVAL)
1166

    
1167
  return 0
1168

    
1169

    
1170
def _OobPower(opts, node_list, power):
1171
  """Puts the node in the list to desired power state.
1172

1173
  @param opts: The command line options selected by the user
1174
  @param node_list: The list of nodes to operate on
1175
  @param power: True if they should be powered on, False otherwise
1176
  @return: The success of the operation (none failed)
1177

1178
  """
1179
  if power:
1180
    command = constants.OOB_POWER_ON
1181
  else:
1182
    command = constants.OOB_POWER_OFF
1183

    
1184
  op = opcodes.OpOobCommand(node_names=node_list,
1185
                            command=command,
1186
                            ignore_status=True,
1187
                            timeout=opts.oob_timeout,
1188
                            power_delay=opts.power_delay)
1189
  result = SubmitOpCode(op, opts=opts)
1190
  errs = 0
1191
  for node_result in result:
1192
    (node_tuple, data_tuple) = node_result
1193
    (_, node_name) = node_tuple
1194
    (data_status, _) = data_tuple
1195
    if data_status != constants.RS_NORMAL:
1196
      assert data_status != constants.RS_UNAVAIL
1197
      errs += 1
1198
      ToStderr("There was a problem changing power for %s, please investigate",
1199
               node_name)
1200

    
1201
  if errs > 0:
1202
    return False
1203

    
1204
  return True
1205

    
1206

    
1207
def _InstanceStart(opts, inst_list, start, no_remember=False):
1208
  """Puts the instances in the list to desired state.
1209

1210
  @param opts: The command line options selected by the user
1211
  @param inst_list: The list of instances to operate on
1212
  @param start: True if they should be started, False for shutdown
1213
  @param no_remember: If the instance state should be remembered
1214
  @return: The success of the operation (none failed)
1215

1216
  """
1217
  if start:
1218
    opcls = opcodes.OpInstanceStartup
1219
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1220
  else:
1221
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1222
                           timeout=opts.shutdown_timeout,
1223
                           no_remember=no_remember)
1224
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1225

    
1226
  jex = JobExecutor(opts=opts)
1227

    
1228
  for inst in inst_list:
1229
    ToStdout("Submit %s of instance %s", text_submit, inst)
1230
    op = opcls(instance_name=inst)
1231
    jex.QueueJob(inst, op)
1232

    
1233
  results = jex.GetResults()
1234
  bad_cnt = len([1 for (success, _) in results if not success])
1235

    
1236
  if bad_cnt == 0:
1237
    ToStdout("All instances have been %s successfully", text_success)
1238
  else:
1239
    ToStderr("There were errors while %s instances:\n"
1240
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1241
             len(results))
1242
    return False
1243

    
1244
  return True
1245

    
1246

    
1247
class _RunWhenNodesReachableHelper:
1248
  """Helper class to make shared internal state sharing easier.
1249

1250
  @ivar success: Indicates if all action_cb calls were successful
1251

1252
  """
1253
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1254
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1255
    """Init the object.
1256

1257
    @param node_list: The list of nodes to be reachable
1258
    @param action_cb: Callback called when a new host is reachable
1259
    @type node2ip: dict
1260
    @param node2ip: Node to ip mapping
1261
    @param port: The port to use for the TCP ping
1262
    @param feedback_fn: The function used for feedback
1263
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1264
    @param _sleep_fn: Function to sleep (for unittest use only)
1265

1266
    """
1267
    self.down = set(node_list)
1268
    self.up = set()
1269
    self.node2ip = node2ip
1270
    self.success = True
1271
    self.action_cb = action_cb
1272
    self.port = port
1273
    self.feedback_fn = feedback_fn
1274
    self._ping_fn = _ping_fn
1275
    self._sleep_fn = _sleep_fn
1276

    
1277
  def __call__(self):
1278
    """When called we run action_cb.
1279

1280
    @raises utils.RetryAgain: When there are still down nodes
1281

1282
    """
1283
    if not self.action_cb(self.up):
1284
      self.success = False
1285

    
1286
    if self.down:
1287
      raise utils.RetryAgain()
1288
    else:
1289
      return self.success
1290

    
1291
  def Wait(self, secs):
1292
    """Checks if a host is up or waits remaining seconds.
1293

1294
    @param secs: The secs remaining
1295

1296
    """
1297
    start = time.time()
1298
    for node in self.down:
1299
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1300
                       live_port_needed=True):
1301
        self.feedback_fn("Node %s became available" % node)
1302
        self.up.add(node)
1303
        self.down -= self.up
1304
        # If we have a node available there is the possibility to run the
1305
        # action callback successfully, therefore we don't wait and return
1306
        return
1307

    
1308
    self._sleep_fn(max(0.0, start + secs - time.time()))
1309

    
1310

    
1311
def _RunWhenNodesReachable(node_list, action_cb, interval):
1312
  """Run action_cb when nodes become reachable.
1313

1314
  @param node_list: The list of nodes to be reachable
1315
  @param action_cb: Callback called when a new host is reachable
1316
  @param interval: The earliest time to retry
1317

1318
  """
1319
  client = GetClient()
1320
  cluster_info = client.QueryClusterInfo()
1321
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1322
    family = netutils.IPAddress.family
1323
  else:
1324
    family = netutils.IP6Address.family
1325

    
1326
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1327
                 for node in node_list)
1328

    
1329
  port = netutils.GetDaemonPort(constants.NODED)
1330
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1331
                                        ToStdout)
1332

    
1333
  try:
1334
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1335
                       wait_fn=helper.Wait)
1336
  except utils.RetryTimeout:
1337
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1338
             " again:\n  - %s", "  - ".join(helper.down))
1339
    return False
1340

    
1341

    
1342
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1343
                          _instance_start_fn=_InstanceStart):
1344
  """Start the instances conditional based on node_states.
1345

1346
  @param opts: The command line options selected by the user
1347
  @param inst_map: A dict of inst -> nodes mapping
1348
  @param nodes_online: A list of nodes online
1349
  @param _instance_start_fn: Callback to start instances (unittest use only)
1350
  @return: Success of the operation on all instances
1351

1352
  """
1353
  start_inst_list = []
1354
  for (inst, nodes) in inst_map.items():
1355
    if not (nodes - nodes_online):
1356
      # All nodes the instance lives on are back online
1357
      start_inst_list.append(inst)
1358

    
1359
  for inst in start_inst_list:
1360
    del inst_map[inst]
1361

    
1362
  if start_inst_list:
1363
    return _instance_start_fn(opts, start_inst_list, True)
1364

    
1365
  return True
1366

    
1367

    
1368
def _EpoOn(opts, full_node_list, node_list, inst_map):
1369
  """Does the actual power on.
1370

1371
  @param opts: The command line options selected by the user
1372
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1373
                         OOB)
1374
  @param node_list: The list of nodes to operate on (all need to support OOB)
1375
  @param inst_map: A dict of inst -> nodes mapping
1376
  @return: The desired exit status
1377

1378
  """
1379
  if node_list and not _OobPower(opts, node_list, False):
1380
    ToStderr("Not all nodes seem to get back up, investigate and start"
1381
             " manually if needed")
1382

    
1383
  # Wait for the nodes to be back up
1384
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1385

    
1386
  ToStdout("Waiting until all nodes are available again")
1387
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1388
    ToStderr("Please investigate and start stopped instances manually")
1389
    return constants.EXIT_FAILURE
1390

    
1391
  return constants.EXIT_SUCCESS
1392

    
1393

    
1394
def _EpoOff(opts, node_list, inst_map):
1395
  """Does the actual power off.
1396

1397
  @param opts: The command line options selected by the user
1398
  @param node_list: The list of nodes to operate on (all need to support OOB)
1399
  @param inst_map: A dict of inst -> nodes mapping
1400
  @return: The desired exit status
1401

1402
  """
1403
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1404
    ToStderr("Please investigate and stop instances manually before continuing")
1405
    return constants.EXIT_FAILURE
1406

    
1407
  if not node_list:
1408
    return constants.EXIT_SUCCESS
1409

    
1410
  if _OobPower(opts, node_list, False):
1411
    return constants.EXIT_SUCCESS
1412
  else:
1413
    return constants.EXIT_FAILURE
1414

    
1415

    
1416
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1417
        _confirm_fn=ConfirmOperation,
1418
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1419
  """EPO operations.
1420

1421
  @param opts: the command line options selected by the user
1422
  @type args: list
1423
  @param args: should contain only one element, the subcommand
1424
  @rtype: int
1425
  @return: the desired exit code
1426

1427
  """
1428
  if opts.groups and opts.show_all:
1429
    _stderr_fn("Only one of --groups or --all are allowed")
1430
    return constants.EXIT_FAILURE
1431
  elif args and opts.show_all:
1432
    _stderr_fn("Arguments in combination with --all are not allowed")
1433
    return constants.EXIT_FAILURE
1434

    
1435
  if cl is None:
1436
    cl = GetClient()
1437

    
1438
  if opts.groups:
1439
    node_query_list = \
1440
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1441
  else:
1442
    node_query_list = args
1443

    
1444
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1445
                                           "sinst_list", "powered", "offline"],
1446
                         False)
1447

    
1448
  all_nodes = map(compat.fst, result)
1449
  node_list = []
1450
  inst_map = {}
1451
  for (node, master, pinsts, sinsts, powered, offline) in result:
1452
    if not offline:
1453
      for inst in (pinsts + sinsts):
1454
        if inst in inst_map:
1455
          if not master:
1456
            inst_map[inst].add(node)
1457
        elif master:
1458
          inst_map[inst] = set()
1459
        else:
1460
          inst_map[inst] = set([node])
1461

    
1462
    if master and opts.on:
1463
      # We ignore the master for turning on the machines, in fact we are
1464
      # already operating on the master at this point :)
1465
      continue
1466
    elif master and not opts.show_all:
1467
      _stderr_fn("%s is the master node, please do a master-failover to another"
1468
                 " node not affected by the EPO or use --all if you intend to"
1469
                 " shutdown the whole cluster", node)
1470
      return constants.EXIT_FAILURE
1471
    elif powered is None:
1472
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1473
                 " handled in a fully automated manner", node)
1474
    elif powered == opts.on:
1475
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1476
    elif not offline or (offline and powered):
1477
      node_list.append(node)
1478

    
1479
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1480
    return constants.EXIT_FAILURE
1481

    
1482
  if opts.on:
1483
    return _on_fn(opts, all_nodes, node_list, inst_map)
1484
  else:
1485
    return _off_fn(opts, node_list, inst_map)
1486

    
1487

    
1488
commands = {
1489
  "init": (
1490
    InitCluster, [ArgHost(min=1, max=1)],
1491
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1492
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1493
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1494
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1495
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1496
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1497
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1498
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + INSTANCE_POLICY_OPTS,
1499
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1500
  "destroy": (
1501
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1502
    "", "Destroy cluster"),
1503
  "rename": (
1504
    RenameCluster, [ArgHost(min=1, max=1)],
1505
    [FORCE_OPT, DRY_RUN_OPT],
1506
    "<new_name>",
1507
    "Renames the cluster"),
1508
  "redist-conf": (
1509
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1510
    "", "Forces a push of the configuration file and ssconf files"
1511
    " to the nodes in the cluster"),
1512
  "verify": (
1513
    VerifyCluster, ARGS_NONE,
1514
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1515
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1516
    "", "Does a check on the cluster configuration"),
1517
  "verify-disks": (
1518
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1519
    "", "Does a check on the cluster disk status"),
1520
  "repair-disk-sizes": (
1521
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1522
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1523
  "master-failover": (
1524
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1525
    "", "Makes the current node the master"),
1526
  "master-ping": (
1527
    MasterPing, ARGS_NONE, [],
1528
    "", "Checks if the master is alive"),
1529
  "version": (
1530
    ShowClusterVersion, ARGS_NONE, [],
1531
    "", "Shows the cluster version"),
1532
  "getmaster": (
1533
    ShowClusterMaster, ARGS_NONE, [],
1534
    "", "Shows the cluster master"),
1535
  "copyfile": (
1536
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1537
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1538
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1539
  "command": (
1540
    RunClusterCommand, [ArgCommand(min=1)],
1541
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1542
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1543
  "info": (
1544
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1545
    "[--roman]", "Show cluster configuration"),
1546
  "list-tags": (
1547
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1548
  "add-tags": (
1549
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1550
    "tag...", "Add tags to the cluster"),
1551
  "remove-tags": (
1552
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1553
    "tag...", "Remove tags from the cluster"),
1554
  "search-tags": (
1555
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1556
    "Searches the tags on all objects on"
1557
    " the cluster for a given pattern (regex)"),
1558
  "queue": (
1559
    QueueOps,
1560
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1561
    [], "drain|undrain|info", "Change queue properties"),
1562
  "watcher": (
1563
    WatcherOps,
1564
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1565
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1566
    [],
1567
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1568
  "modify": (
1569
    SetClusterParams, ARGS_NONE,
1570
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1571
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1572
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1573
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1574
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1575
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1576
     DISK_STATE_OPT, SUBMIT_OPT, ENABLED_STORAGE_TYPES_OPT] +
1577
    INSTANCE_POLICY_OPTS,
1578
    "[opts...]",
1579
    "Alters the parameters of the cluster"),
1580
  "renew-crypto": (
1581
    RenewCrypto, ARGS_NONE,
1582
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1583
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1584
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1585
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1586
    "[opts...]",
1587
    "Renews cluster certificates, keys and secrets"),
1588
  "epo": (
1589
    Epo, [ArgUnknown()],
1590
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1591
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1592
    "[opts...] [args]",
1593
    "Performs an emergency power-off on given args"),
1594
  "activate-master-ip": (
1595
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1596
  "deactivate-master-ip": (
1597
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1598
    "Deactivates the master IP"),
1599
  }
1600

    
1601

    
1602
#: dictionary with aliases for commands
1603
aliases = {
1604
  "masterfailover": "master-failover",
1605
  "show": "info",
1606
}
1607

    
1608

    
1609
def Main():
1610
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1611
                     aliases=aliases)