Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ d5b031dc

History | View | Annotate | Download (52 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
import os.path
30
import time
31
import OpenSSL
32
import itertools
33

    
34
from ganeti.cli import *
35
from ganeti import opcodes
36
from ganeti import constants
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import bootstrap
40
from ganeti import ssh
41
from ganeti import objects
42
from ganeti import uidpool
43
from ganeti import compat
44
from ganeti import netutils
45
from ganeti import pathutils
46

    
47

    
48
ON_OPT = cli_option("--on", default=False,
49
                    action="store_true", dest="on",
50
                    help="Recover from an EPO")
51

    
52
GROUPS_OPT = cli_option("--groups", default=False,
53
                        action="store_true", dest="groups",
54
                        help="Arguments are node groups instead of nodes")
55

    
56
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
57
                            help="Override interactive check for --no-voting",
58
                            default=False, action="store_true")
59

    
60
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
61
_EPO_PING_TIMEOUT = 1 # 1 second
62
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
63

    
64

    
65
@UsesRPC
66
def InitCluster(opts, args):
67
  """Initialize the cluster.
68

69
  @param opts: the command line options selected by the user
70
  @type args: list
71
  @param args: should contain only one element, the desired
72
      cluster name
73
  @rtype: int
74
  @return: the desired exit code
75

76
  """
77
  if not opts.lvm_storage and opts.vg_name:
78
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
79
    return 1
80

    
81
  vg_name = opts.vg_name
82
  if opts.lvm_storage and not opts.vg_name:
83
    vg_name = constants.DEFAULT_VG
84

    
85
  if not opts.drbd_storage and opts.drbd_helper:
86
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
87
    return 1
88

    
89
  drbd_helper = opts.drbd_helper
90
  if opts.drbd_storage and not opts.drbd_helper:
91
    drbd_helper = constants.DEFAULT_DRBD_HELPER
92

    
93
  master_netdev = opts.master_netdev
94
  if master_netdev is None:
95
    master_netdev = constants.DEFAULT_BRIDGE
96

    
97
  hvlist = opts.enabled_hypervisors
98
  if hvlist is None:
99
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
100
  hvlist = hvlist.split(",")
101

    
102
  hvparams = dict(opts.hvparams)
103
  beparams = opts.beparams
104
  nicparams = opts.nicparams
105

    
106
  diskparams = dict(opts.diskparams)
107

    
108
  # check the disk template types here, as we cannot rely on the type check done
109
  # by the opcode parameter types
110
  diskparams_keys = set(diskparams.keys())
111
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
112
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
113
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
114
    return 1
115

    
116
  # prepare beparams dict
117
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
118
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
119

    
120
  # prepare nicparams dict
121
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
122
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
123

    
124
  # prepare ndparams dict
125
  if opts.ndparams is None:
126
    ndparams = dict(constants.NDC_DEFAULTS)
127
  else:
128
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
129
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
130

    
131
  # prepare hvparams dict
132
  for hv in constants.HYPER_TYPES:
133
    if hv not in hvparams:
134
      hvparams[hv] = {}
135
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
136
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
137

    
138
  # prepare diskparams dict
139
  for templ in constants.DISK_TEMPLATES:
140
    if templ not in diskparams:
141
      diskparams[templ] = {}
142
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
143
                                         diskparams[templ])
144
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
145

    
146
  # prepare ipolicy dict
147
  ipolicy_raw = CreateIPolicyFromOpts(
148
    ispecs_mem_size=opts.ispecs_mem_size,
149
    ispecs_cpu_count=opts.ispecs_cpu_count,
150
    ispecs_disk_count=opts.ispecs_disk_count,
151
    ispecs_disk_size=opts.ispecs_disk_size,
152
    ispecs_nic_count=opts.ispecs_nic_count,
153
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
154
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
155
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
156
    fill_all=True)
157
  ipolicy = objects.FillIPolicy(constants.IPOLICY_DEFAULTS, ipolicy_raw)
158

    
159
  if opts.candidate_pool_size is None:
160
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
161

    
162
  if opts.mac_prefix is None:
163
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
164

    
165
  uid_pool = opts.uid_pool
166
  if uid_pool is not None:
167
    uid_pool = uidpool.ParseUidPool(uid_pool)
168

    
169
  if opts.prealloc_wipe_disks is None:
170
    opts.prealloc_wipe_disks = False
171

    
172
  external_ip_setup_script = opts.use_external_mip_script
173
  if external_ip_setup_script is None:
174
    external_ip_setup_script = False
175

    
176
  try:
177
    primary_ip_version = int(opts.primary_ip_version)
178
  except (ValueError, TypeError), err:
179
    ToStderr("Invalid primary ip version value: %s" % str(err))
180
    return 1
181

    
182
  master_netmask = opts.master_netmask
183
  try:
184
    if master_netmask is not None:
185
      master_netmask = int(master_netmask)
186
  except (ValueError, TypeError), err:
187
    ToStderr("Invalid master netmask value: %s" % str(err))
188
    return 1
189

    
190
  if opts.disk_state:
191
    disk_state = utils.FlatToDict(opts.disk_state)
192
  else:
193
    disk_state = {}
194

    
195
  hv_state = dict(opts.hv_state)
196

    
197
  bootstrap.InitCluster(cluster_name=args[0],
198
                        secondary_ip=opts.secondary_ip,
199
                        vg_name=vg_name,
200
                        mac_prefix=opts.mac_prefix,
201
                        master_netmask=master_netmask,
202
                        master_netdev=master_netdev,
203
                        file_storage_dir=opts.file_storage_dir,
204
                        shared_file_storage_dir=opts.shared_file_storage_dir,
205
                        enabled_hypervisors=hvlist,
206
                        hvparams=hvparams,
207
                        beparams=beparams,
208
                        nicparams=nicparams,
209
                        ndparams=ndparams,
210
                        diskparams=diskparams,
211
                        ipolicy=ipolicy,
212
                        candidate_pool_size=opts.candidate_pool_size,
213
                        modify_etc_hosts=opts.modify_etc_hosts,
214
                        modify_ssh_setup=opts.modify_ssh_setup,
215
                        maintain_node_health=opts.maintain_node_health,
216
                        drbd_helper=drbd_helper,
217
                        uid_pool=uid_pool,
218
                        default_iallocator=opts.default_iallocator,
219
                        primary_ip_version=primary_ip_version,
220
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
221
                        use_external_mip_script=external_ip_setup_script,
222
                        hv_state=hv_state,
223
                        disk_state=disk_state,
224
                        )
225
  op = opcodes.OpClusterPostInit()
226
  SubmitOpCode(op, opts=opts)
227
  return 0
228

    
229

    
230
@UsesRPC
231
def DestroyCluster(opts, args):
232
  """Destroy the cluster.
233

234
  @param opts: the command line options selected by the user
235
  @type args: list
236
  @param args: should be an empty list
237
  @rtype: int
238
  @return: the desired exit code
239

240
  """
241
  if not opts.yes_do_it:
242
    ToStderr("Destroying a cluster is irreversible. If you really want"
243
             " destroy this cluster, supply the --yes-do-it option.")
244
    return 1
245

    
246
  op = opcodes.OpClusterDestroy()
247
  master = SubmitOpCode(op, opts=opts)
248
  # if we reached this, the opcode didn't fail; we can proceed to
249
  # shutdown all the daemons
250
  bootstrap.FinalizeClusterDestroy(master)
251
  return 0
252

    
253

    
254
def RenameCluster(opts, args):
255
  """Rename the cluster.
256

257
  @param opts: the command line options selected by the user
258
  @type args: list
259
  @param args: should contain only one element, the new cluster name
260
  @rtype: int
261
  @return: the desired exit code
262

263
  """
264
  cl = GetClient()
265

    
266
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
267

    
268
  new_name = args[0]
269
  if not opts.force:
270
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
271
                " connected over the network to the cluster name, the"
272
                " operation is very dangerous as the IP address will be"
273
                " removed from the node and the change may not go through."
274
                " Continue?") % (cluster_name, new_name)
275
    if not AskUser(usertext):
276
      return 1
277

    
278
  op = opcodes.OpClusterRename(name=new_name)
279
  result = SubmitOpCode(op, opts=opts, cl=cl)
280

    
281
  if result:
282
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
283

    
284
  return 0
285

    
286

    
287
def ActivateMasterIp(opts, args):
288
  """Activates the master IP.
289

290
  """
291
  op = opcodes.OpClusterActivateMasterIp()
292
  SubmitOpCode(op)
293
  return 0
294

    
295

    
296
def DeactivateMasterIp(opts, args):
297
  """Deactivates the master IP.
298

299
  """
300
  if not opts.confirm:
301
    usertext = ("This will disable the master IP. All the open connections to"
302
                " the master IP will be closed. To reach the master you will"
303
                " need to use its node IP."
304
                " Continue?")
305
    if not AskUser(usertext):
306
      return 1
307

    
308
  op = opcodes.OpClusterDeactivateMasterIp()
309
  SubmitOpCode(op)
310
  return 0
311

    
312

    
313
def RedistributeConfig(opts, args):
314
  """Forces push of the cluster configuration.
315

316
  @param opts: the command line options selected by the user
317
  @type args: list
318
  @param args: empty list
319
  @rtype: int
320
  @return: the desired exit code
321

322
  """
323
  op = opcodes.OpClusterRedistConf()
324
  SubmitOrSend(op, opts)
325
  return 0
326

    
327

    
328
def ShowClusterVersion(opts, args):
329
  """Write version of ganeti software to the standard output.
330

331
  @param opts: the command line options selected by the user
332
  @type args: list
333
  @param args: should be an empty list
334
  @rtype: int
335
  @return: the desired exit code
336

337
  """
338
  cl = GetClient(query=True)
339
  result = cl.QueryClusterInfo()
340
  ToStdout("Software version: %s", result["software_version"])
341
  ToStdout("Internode protocol: %s", result["protocol_version"])
342
  ToStdout("Configuration format: %s", result["config_version"])
343
  ToStdout("OS api version: %s", result["os_api_version"])
344
  ToStdout("Export interface: %s", result["export_version"])
345
  return 0
346

    
347

    
348
def ShowClusterMaster(opts, args):
349
  """Write name of master node to the standard output.
350

351
  @param opts: the command line options selected by the user
352
  @type args: list
353
  @param args: should be an empty list
354
  @rtype: int
355
  @return: the desired exit code
356

357
  """
358
  master = bootstrap.GetMaster()
359
  ToStdout(master)
360
  return 0
361

    
362

    
363
def _PrintGroupedParams(paramsdict, level=1, roman=False):
364
  """Print Grouped parameters (be, nic, disk) by group.
365

366
  @type paramsdict: dict of dicts
367
  @param paramsdict: {group: {param: value, ...}, ...}
368
  @type level: int
369
  @param level: Level of indention
370

371
  """
372
  indent = "  " * level
373
  for item, val in sorted(paramsdict.items()):
374
    if isinstance(val, dict):
375
      ToStdout("%s- %s:", indent, item)
376
      _PrintGroupedParams(val, level=level + 1, roman=roman)
377
    elif roman and isinstance(val, int):
378
      ToStdout("%s  %s: %s", indent, item, compat.TryToRoman(val))
379
    else:
380
      ToStdout("%s  %s: %s", indent, item, val)
381

    
382

    
383
def ShowClusterConfig(opts, args):
384
  """Shows cluster information.
385

386
  @param opts: the command line options selected by the user
387
  @type args: list
388
  @param args: should be an empty list
389
  @rtype: int
390
  @return: the desired exit code
391

392
  """
393
  cl = GetClient(query=True)
394
  result = cl.QueryClusterInfo()
395

    
396
  ToStdout("Cluster name: %s", result["name"])
397
  ToStdout("Cluster UUID: %s", result["uuid"])
398

    
399
  ToStdout("Creation time: %s", utils.FormatTime(result["ctime"]))
400
  ToStdout("Modification time: %s", utils.FormatTime(result["mtime"]))
401

    
402
  ToStdout("Master node: %s", result["master"])
403

    
404
  ToStdout("Architecture (this node): %s (%s)",
405
           result["architecture"][0], result["architecture"][1])
406

    
407
  if result["tags"]:
408
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
409
  else:
410
    tags = "(none)"
411

    
412
  ToStdout("Tags: %s", tags)
413

    
414
  ToStdout("Default hypervisor: %s", result["default_hypervisor"])
415
  ToStdout("Enabled hypervisors: %s",
416
           utils.CommaJoin(result["enabled_hypervisors"]))
417

    
418
  ToStdout("Hypervisor parameters:")
419
  _PrintGroupedParams(result["hvparams"])
420

    
421
  ToStdout("OS-specific hypervisor parameters:")
422
  _PrintGroupedParams(result["os_hvp"])
423

    
424
  ToStdout("OS parameters:")
425
  _PrintGroupedParams(result["osparams"])
426

    
427
  ToStdout("Hidden OSes: %s", utils.CommaJoin(result["hidden_os"]))
428
  ToStdout("Blacklisted OSes: %s", utils.CommaJoin(result["blacklisted_os"]))
429

    
430
  ToStdout("Cluster parameters:")
431
  ToStdout("  - candidate pool size: %s",
432
            compat.TryToRoman(result["candidate_pool_size"],
433
                              convert=opts.roman_integers))
434
  ToStdout("  - master netdev: %s", result["master_netdev"])
435
  ToStdout("  - master netmask: %s", result["master_netmask"])
436
  ToStdout("  - use external master IP address setup script: %s",
437
           result["use_external_mip_script"])
438
  ToStdout("  - lvm volume group: %s", result["volume_group_name"])
439
  if result["reserved_lvs"]:
440
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
441
  else:
442
    reserved_lvs = "(none)"
443
  ToStdout("  - lvm reserved volumes: %s", reserved_lvs)
444
  ToStdout("  - drbd usermode helper: %s", result["drbd_usermode_helper"])
445
  ToStdout("  - file storage path: %s", result["file_storage_dir"])
446
  ToStdout("  - shared file storage path: %s",
447
           result["shared_file_storage_dir"])
448
  ToStdout("  - maintenance of node health: %s",
449
           result["maintain_node_health"])
450
  ToStdout("  - uid pool: %s",
451
            uidpool.FormatUidPool(result["uid_pool"],
452
                                  roman=opts.roman_integers))
453
  ToStdout("  - default instance allocator: %s", result["default_iallocator"])
454
  ToStdout("  - primary ip version: %d", result["primary_ip_version"])
455
  ToStdout("  - preallocation wipe disks: %s", result["prealloc_wipe_disks"])
456
  ToStdout("  - OS search path: %s", utils.CommaJoin(pathutils.OS_SEARCH_PATH))
457

    
458
  ToStdout("Default node parameters:")
459
  _PrintGroupedParams(result["ndparams"], roman=opts.roman_integers)
460

    
461
  ToStdout("Default instance parameters:")
462
  _PrintGroupedParams(result["beparams"], roman=opts.roman_integers)
463

    
464
  ToStdout("Default nic parameters:")
465
  _PrintGroupedParams(result["nicparams"], roman=opts.roman_integers)
466

    
467
  ToStdout("Default disk parameters:")
468
  _PrintGroupedParams(result["diskparams"], roman=opts.roman_integers)
469

    
470
  ToStdout("Instance policy - limits for instances:")
471
  for key in constants.IPOLICY_ISPECS:
472
    ToStdout("  - %s", key)
473
    _PrintGroupedParams(result["ipolicy"][key], roman=opts.roman_integers)
474
  ToStdout("  - enabled disk templates: %s",
475
           utils.CommaJoin(result["ipolicy"][constants.IPOLICY_DTS]))
476
  for key in constants.IPOLICY_PARAMETERS:
477
    ToStdout("  - %s: %s", key, result["ipolicy"][key])
478

    
479
  return 0
480

    
481

    
482
def ClusterCopyFile(opts, args):
483
  """Copy a file from master to some nodes.
484

485
  @param opts: the command line options selected by the user
486
  @type args: list
487
  @param args: should contain only one element, the path of
488
      the file to be copied
489
  @rtype: int
490
  @return: the desired exit code
491

492
  """
493
  filename = args[0]
494
  if not os.path.exists(filename):
495
    raise errors.OpPrereqError("No such filename '%s'" % filename,
496
                               errors.ECODE_INVAL)
497

    
498
  cl = GetClient()
499

    
500
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
501

    
502
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
503
                           secondary_ips=opts.use_replication_network,
504
                           nodegroup=opts.nodegroup)
505

    
506
  srun = ssh.SshRunner(cluster_name)
507
  for node in results:
508
    if not srun.CopyFileToNode(node, filename):
509
      ToStderr("Copy of file %s to node %s failed", filename, node)
510

    
511
  return 0
512

    
513

    
514
def RunClusterCommand(opts, args):
515
  """Run a command on some nodes.
516

517
  @param opts: the command line options selected by the user
518
  @type args: list
519
  @param args: should contain the command to be run and its arguments
520
  @rtype: int
521
  @return: the desired exit code
522

523
  """
524
  cl = GetClient()
525

    
526
  command = " ".join(args)
527

    
528
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
529

    
530
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
531
                                                    "master_node"])
532

    
533
  srun = ssh.SshRunner(cluster_name=cluster_name)
534

    
535
  # Make sure master node is at list end
536
  if master_node in nodes:
537
    nodes.remove(master_node)
538
    nodes.append(master_node)
539

    
540
  for name in nodes:
541
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
542

    
543
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
544
      # Do not output anything for successful commands
545
      continue
546

    
547
    ToStdout("------------------------------------------------")
548
    if opts.show_machine_names:
549
      for line in result.output.splitlines():
550
        ToStdout("%s: %s", name, line)
551
    else:
552
      ToStdout("node: %s", name)
553
      ToStdout("%s", result.output)
554
    ToStdout("return code = %s", result.exit_code)
555

    
556
  return 0
557

    
558

    
559
def VerifyCluster(opts, args):
560
  """Verify integrity of cluster, performing various test on nodes.
561

562
  @param opts: the command line options selected by the user
563
  @type args: list
564
  @param args: should be an empty list
565
  @rtype: int
566
  @return: the desired exit code
567

568
  """
569
  skip_checks = []
570

    
571
  if opts.skip_nplusone_mem:
572
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
573

    
574
  cl = GetClient()
575

    
576
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
577
                               error_codes=opts.error_codes,
578
                               debug_simulate_errors=opts.simulate_errors,
579
                               skip_checks=skip_checks,
580
                               ignore_errors=opts.ignore_errors,
581
                               group_name=opts.nodegroup)
582
  result = SubmitOpCode(op, cl=cl, opts=opts)
583

    
584
  # Keep track of submitted jobs
585
  jex = JobExecutor(cl=cl, opts=opts)
586

    
587
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
588
    jex.AddJobId(None, status, job_id)
589

    
590
  results = jex.GetResults()
591

    
592
  (bad_jobs, bad_results) = \
593
    map(len,
594
        # Convert iterators to lists
595
        map(list,
596
            # Count errors
597
            map(compat.partial(itertools.ifilterfalse, bool),
598
                # Convert result to booleans in a tuple
599
                zip(*((job_success, len(op_results) == 1 and op_results[0])
600
                      for (job_success, op_results) in results)))))
601

    
602
  if bad_jobs == 0 and bad_results == 0:
603
    rcode = constants.EXIT_SUCCESS
604
  else:
605
    rcode = constants.EXIT_FAILURE
606
    if bad_jobs > 0:
607
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
608

    
609
  return rcode
610

    
611

    
612
def VerifyDisks(opts, args):
613
  """Verify integrity of cluster disks.
614

615
  @param opts: the command line options selected by the user
616
  @type args: list
617
  @param args: should be an empty list
618
  @rtype: int
619
  @return: the desired exit code
620

621
  """
622
  cl = GetClient()
623

    
624
  op = opcodes.OpClusterVerifyDisks()
625

    
626
  result = SubmitOpCode(op, cl=cl, opts=opts)
627

    
628
  # Keep track of submitted jobs
629
  jex = JobExecutor(cl=cl, opts=opts)
630

    
631
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
632
    jex.AddJobId(None, status, job_id)
633

    
634
  retcode = constants.EXIT_SUCCESS
635

    
636
  for (status, result) in jex.GetResults():
637
    if not status:
638
      ToStdout("Job failed: %s", result)
639
      continue
640

    
641
    ((bad_nodes, instances, missing), ) = result
642

    
643
    for node, text in bad_nodes.items():
644
      ToStdout("Error gathering data on node %s: %s",
645
               node, utils.SafeEncode(text[-400:]))
646
      retcode = constants.EXIT_FAILURE
647
      ToStdout("You need to fix these nodes first before fixing instances")
648

    
649
    for iname in instances:
650
      if iname in missing:
651
        continue
652
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
653
      try:
654
        ToStdout("Activating disks for instance '%s'", iname)
655
        SubmitOpCode(op, opts=opts, cl=cl)
656
      except errors.GenericError, err:
657
        nret, msg = FormatError(err)
658
        retcode |= nret
659
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
660

    
661
    if missing:
662
      for iname, ival in missing.iteritems():
663
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
664
        if all_missing:
665
          ToStdout("Instance %s cannot be verified as it lives on"
666
                   " broken nodes", iname)
667
        else:
668
          ToStdout("Instance %s has missing logical volumes:", iname)
669
          ival.sort()
670
          for node, vol in ival:
671
            if node in bad_nodes:
672
              ToStdout("\tbroken node %s /dev/%s", node, vol)
673
            else:
674
              ToStdout("\t%s /dev/%s", node, vol)
675

    
676
      ToStdout("You need to replace or recreate disks for all the above"
677
               " instances if this message persists after fixing broken nodes.")
678
      retcode = constants.EXIT_FAILURE
679
    elif not instances:
680
      ToStdout("No disks need to be activated.")
681

    
682
  return retcode
683

    
684

    
685
def RepairDiskSizes(opts, args):
686
  """Verify sizes of cluster disks.
687

688
  @param opts: the command line options selected by the user
689
  @type args: list
690
  @param args: optional list of instances to restrict check to
691
  @rtype: int
692
  @return: the desired exit code
693

694
  """
695
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
696
  SubmitOpCode(op, opts=opts)
697

    
698

    
699
@UsesRPC
700
def MasterFailover(opts, args):
701
  """Failover the master node.
702

703
  This command, when run on a non-master node, will cause the current
704
  master to cease being master, and the non-master to become new
705
  master.
706

707
  @param opts: the command line options selected by the user
708
  @type args: list
709
  @param args: should be an empty list
710
  @rtype: int
711
  @return: the desired exit code
712

713
  """
714
  if opts.no_voting and not opts.yes_do_it:
715
    usertext = ("This will perform the failover even if most other nodes"
716
                " are down, or if this node is outdated. This is dangerous"
717
                " as it can lead to a non-consistent cluster. Check the"
718
                " gnt-cluster(8) man page before proceeding. Continue?")
719
    if not AskUser(usertext):
720
      return 1
721

    
722
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
723

    
724

    
725
def MasterPing(opts, args):
726
  """Checks if the master is alive.
727

728
  @param opts: the command line options selected by the user
729
  @type args: list
730
  @param args: should be an empty list
731
  @rtype: int
732
  @return: the desired exit code
733

734
  """
735
  try:
736
    cl = GetClient()
737
    cl.QueryClusterInfo()
738
    return 0
739
  except Exception: # pylint: disable=W0703
740
    return 1
741

    
742

    
743
def SearchTags(opts, args):
744
  """Searches the tags on all the cluster.
745

746
  @param opts: the command line options selected by the user
747
  @type args: list
748
  @param args: should contain only one element, the tag pattern
749
  @rtype: int
750
  @return: the desired exit code
751

752
  """
753
  op = opcodes.OpTagsSearch(pattern=args[0])
754
  result = SubmitOpCode(op, opts=opts)
755
  if not result:
756
    return 1
757
  result = list(result)
758
  result.sort()
759
  for path, tag in result:
760
    ToStdout("%s %s", path, tag)
761

    
762

    
763
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
764
  """Reads and verifies an X509 certificate.
765

766
  @type cert_filename: string
767
  @param cert_filename: the path of the file containing the certificate to
768
                        verify encoded in PEM format
769
  @type verify_private_key: bool
770
  @param verify_private_key: whether to verify the private key in addition to
771
                             the public certificate
772
  @rtype: string
773
  @return: a string containing the PEM-encoded certificate.
774

775
  """
776
  try:
777
    pem = utils.ReadFile(cert_filename)
778
  except IOError, err:
779
    raise errors.X509CertError(cert_filename,
780
                               "Unable to read certificate: %s" % str(err))
781

    
782
  try:
783
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
784
  except Exception, err:
785
    raise errors.X509CertError(cert_filename,
786
                               "Unable to load certificate: %s" % str(err))
787

    
788
  if verify_private_key:
789
    try:
790
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
791
    except Exception, err:
792
      raise errors.X509CertError(cert_filename,
793
                                 "Unable to load private key: %s" % str(err))
794

    
795
  return pem
796

    
797

    
798
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
799
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
800
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
801
                 cds_filename, force):
802
  """Renews cluster certificates, keys and secrets.
803

804
  @type new_cluster_cert: bool
805
  @param new_cluster_cert: Whether to generate a new cluster certificate
806
  @type new_rapi_cert: bool
807
  @param new_rapi_cert: Whether to generate a new RAPI certificate
808
  @type rapi_cert_filename: string
809
  @param rapi_cert_filename: Path to file containing new RAPI certificate
810
  @type new_spice_cert: bool
811
  @param new_spice_cert: Whether to generate a new SPICE certificate
812
  @type spice_cert_filename: string
813
  @param spice_cert_filename: Path to file containing new SPICE certificate
814
  @type spice_cacert_filename: string
815
  @param spice_cacert_filename: Path to file containing the certificate of the
816
                                CA that signed the SPICE certificate
817
  @type new_confd_hmac_key: bool
818
  @param new_confd_hmac_key: Whether to generate a new HMAC key
819
  @type new_cds: bool
820
  @param new_cds: Whether to generate a new cluster domain secret
821
  @type cds_filename: string
822
  @param cds_filename: Path to file containing new cluster domain secret
823
  @type force: bool
824
  @param force: Whether to ask user for confirmation
825

826
  """
827
  if new_rapi_cert and rapi_cert_filename:
828
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
829
             " options can be specified at the same time.")
830
    return 1
831

    
832
  if new_cds and cds_filename:
833
    ToStderr("Only one of the --new-cluster-domain-secret and"
834
             " --cluster-domain-secret options can be specified at"
835
             " the same time.")
836
    return 1
837

    
838
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
839
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
840
             " and --spice-ca-certificate must not be used.")
841
    return 1
842

    
843
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
844
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
845
             " specified.")
846
    return 1
847

    
848
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
849
  try:
850
    if rapi_cert_filename:
851
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
852
    if spice_cert_filename:
853
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
854
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
855
  except errors.X509CertError, err:
856
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
857
    return 1
858

    
859
  if cds_filename:
860
    try:
861
      cds = utils.ReadFile(cds_filename)
862
    except Exception, err: # pylint: disable=W0703
863
      ToStderr("Can't load new cluster domain secret from %s: %s" %
864
               (cds_filename, str(err)))
865
      return 1
866
  else:
867
    cds = None
868

    
869
  if not force:
870
    usertext = ("This requires all daemons on all nodes to be restarted and"
871
                " may take some time. Continue?")
872
    if not AskUser(usertext):
873
      return 1
874

    
875
  def _RenewCryptoInner(ctx):
876
    ctx.feedback_fn("Updating certificates and keys")
877
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
878
                                    new_rapi_cert,
879
                                    new_spice_cert,
880
                                    new_confd_hmac_key,
881
                                    new_cds,
882
                                    rapi_cert_pem=rapi_cert_pem,
883
                                    spice_cert_pem=spice_cert_pem,
884
                                    spice_cacert_pem=spice_cacert_pem,
885
                                    cds=cds)
886

    
887
    files_to_copy = []
888

    
889
    if new_cluster_cert:
890
      files_to_copy.append(pathutils.NODED_CERT_FILE)
891

    
892
    if new_rapi_cert or rapi_cert_pem:
893
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
894

    
895
    if new_spice_cert or spice_cert_pem:
896
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
897
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
898

    
899
    if new_confd_hmac_key:
900
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
901

    
902
    if new_cds or cds:
903
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
904

    
905
    if files_to_copy:
906
      for node_name in ctx.nonmaster_nodes:
907
        ctx.feedback_fn("Copying %s to %s" %
908
                        (", ".join(files_to_copy), node_name))
909
        for file_name in files_to_copy:
910
          ctx.ssh.CopyFileToNode(node_name, file_name)
911

    
912
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
913

    
914
  ToStdout("All requested certificates and keys have been replaced."
915
           " Running \"gnt-cluster verify\" now is recommended.")
916

    
917
  return 0
918

    
919

    
920
def RenewCrypto(opts, args):
921
  """Renews cluster certificates, keys and secrets.
922

923
  """
924
  return _RenewCrypto(opts.new_cluster_cert,
925
                      opts.new_rapi_cert,
926
                      opts.rapi_cert,
927
                      opts.new_spice_cert,
928
                      opts.spice_cert,
929
                      opts.spice_cacert,
930
                      opts.new_confd_hmac_key,
931
                      opts.new_cluster_domain_secret,
932
                      opts.cluster_domain_secret,
933
                      opts.force)
934

    
935

    
936
def SetClusterParams(opts, args):
937
  """Modify the cluster.
938

939
  @param opts: the command line options selected by the user
940
  @type args: list
941
  @param args: should be an empty list
942
  @rtype: int
943
  @return: the desired exit code
944

945
  """
946
  if not (not opts.lvm_storage or opts.vg_name or
947
          not opts.drbd_storage or opts.drbd_helper or
948
          opts.enabled_hypervisors or opts.hvparams or
949
          opts.beparams or opts.nicparams or
950
          opts.ndparams or opts.diskparams or
951
          opts.candidate_pool_size is not None or
952
          opts.uid_pool is not None or
953
          opts.maintain_node_health is not None or
954
          opts.add_uids is not None or
955
          opts.remove_uids is not None or
956
          opts.default_iallocator is not None or
957
          opts.reserved_lvs is not None or
958
          opts.master_netdev is not None or
959
          opts.master_netmask is not None or
960
          opts.use_external_mip_script is not None or
961
          opts.prealloc_wipe_disks is not None or
962
          opts.hv_state or
963
          opts.disk_state or
964
          opts.ispecs_mem_size or
965
          opts.ispecs_cpu_count or
966
          opts.ispecs_disk_count or
967
          opts.ispecs_disk_size or
968
          opts.ispecs_nic_count or
969
          opts.ipolicy_disk_templates is not None or
970
          opts.ipolicy_vcpu_ratio is not None or
971
          opts.ipolicy_spindle_ratio is not None):
972
    ToStderr("Please give at least one of the parameters.")
973
    return 1
974

    
975
  vg_name = opts.vg_name
976
  if not opts.lvm_storage and opts.vg_name:
977
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
978
    return 1
979

    
980
  if not opts.lvm_storage:
981
    vg_name = ""
982

    
983
  drbd_helper = opts.drbd_helper
984
  if not opts.drbd_storage and opts.drbd_helper:
985
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
986
    return 1
987

    
988
  if not opts.drbd_storage:
989
    drbd_helper = ""
990

    
991
  hvlist = opts.enabled_hypervisors
992
  if hvlist is not None:
993
    hvlist = hvlist.split(",")
994

    
995
  # a list of (name, dict) we can pass directly to dict() (or [])
996
  hvparams = dict(opts.hvparams)
997
  for hv_params in hvparams.values():
998
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
999

    
1000
  diskparams = dict(opts.diskparams)
1001

    
1002
  for dt_params in diskparams.values():
1003
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1004

    
1005
  beparams = opts.beparams
1006
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1007

    
1008
  nicparams = opts.nicparams
1009
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1010

    
1011
  ndparams = opts.ndparams
1012
  if ndparams is not None:
1013
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1014

    
1015
  ipolicy = CreateIPolicyFromOpts(
1016
    ispecs_mem_size=opts.ispecs_mem_size,
1017
    ispecs_cpu_count=opts.ispecs_cpu_count,
1018
    ispecs_disk_count=opts.ispecs_disk_count,
1019
    ispecs_disk_size=opts.ispecs_disk_size,
1020
    ispecs_nic_count=opts.ispecs_nic_count,
1021
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1022
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1023
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1024
    )
1025

    
1026
  mnh = opts.maintain_node_health
1027

    
1028
  uid_pool = opts.uid_pool
1029
  if uid_pool is not None:
1030
    uid_pool = uidpool.ParseUidPool(uid_pool)
1031

    
1032
  add_uids = opts.add_uids
1033
  if add_uids is not None:
1034
    add_uids = uidpool.ParseUidPool(add_uids)
1035

    
1036
  remove_uids = opts.remove_uids
1037
  if remove_uids is not None:
1038
    remove_uids = uidpool.ParseUidPool(remove_uids)
1039

    
1040
  if opts.reserved_lvs is not None:
1041
    if opts.reserved_lvs == "":
1042
      opts.reserved_lvs = []
1043
    else:
1044
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1045

    
1046
  if opts.master_netmask is not None:
1047
    try:
1048
      opts.master_netmask = int(opts.master_netmask)
1049
    except ValueError:
1050
      ToStderr("The --master-netmask option expects an int parameter.")
1051
      return 1
1052

    
1053
  ext_ip_script = opts.use_external_mip_script
1054

    
1055
  if opts.disk_state:
1056
    disk_state = utils.FlatToDict(opts.disk_state)
1057
  else:
1058
    disk_state = {}
1059

    
1060
  hv_state = dict(opts.hv_state)
1061

    
1062
  op = opcodes.OpClusterSetParams(vg_name=vg_name,
1063
                                  drbd_helper=drbd_helper,
1064
                                  enabled_hypervisors=hvlist,
1065
                                  hvparams=hvparams,
1066
                                  os_hvp=None,
1067
                                  beparams=beparams,
1068
                                  nicparams=nicparams,
1069
                                  ndparams=ndparams,
1070
                                  diskparams=diskparams,
1071
                                  ipolicy=ipolicy,
1072
                                  candidate_pool_size=opts.candidate_pool_size,
1073
                                  maintain_node_health=mnh,
1074
                                  uid_pool=uid_pool,
1075
                                  add_uids=add_uids,
1076
                                  remove_uids=remove_uids,
1077
                                  default_iallocator=opts.default_iallocator,
1078
                                  prealloc_wipe_disks=opts.prealloc_wipe_disks,
1079
                                  master_netdev=opts.master_netdev,
1080
                                  master_netmask=opts.master_netmask,
1081
                                  reserved_lvs=opts.reserved_lvs,
1082
                                  use_external_mip_script=ext_ip_script,
1083
                                  hv_state=hv_state,
1084
                                  disk_state=disk_state,
1085
                                  )
1086
  SubmitOrSend(op, opts)
1087
  return 0
1088

    
1089

    
1090
def QueueOps(opts, args):
1091
  """Queue operations.
1092

1093
  @param opts: the command line options selected by the user
1094
  @type args: list
1095
  @param args: should contain only one element, the subcommand
1096
  @rtype: int
1097
  @return: the desired exit code
1098

1099
  """
1100
  command = args[0]
1101
  client = GetClient()
1102
  if command in ("drain", "undrain"):
1103
    drain_flag = command == "drain"
1104
    client.SetQueueDrainFlag(drain_flag)
1105
  elif command == "info":
1106
    result = client.QueryConfigValues(["drain_flag"])
1107
    if result[0]:
1108
      val = "set"
1109
    else:
1110
      val = "unset"
1111
    ToStdout("The drain flag is %s" % val)
1112
  else:
1113
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1114
                               errors.ECODE_INVAL)
1115

    
1116
  return 0
1117

    
1118

    
1119
def _ShowWatcherPause(until):
1120
  if until is None or until < time.time():
1121
    ToStdout("The watcher is not paused.")
1122
  else:
1123
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1124

    
1125

    
1126
def WatcherOps(opts, args):
1127
  """Watcher operations.
1128

1129
  @param opts: the command line options selected by the user
1130
  @type args: list
1131
  @param args: should contain only one element, the subcommand
1132
  @rtype: int
1133
  @return: the desired exit code
1134

1135
  """
1136
  command = args[0]
1137
  client = GetClient()
1138

    
1139
  if command == "continue":
1140
    client.SetWatcherPause(None)
1141
    ToStdout("The watcher is no longer paused.")
1142

    
1143
  elif command == "pause":
1144
    if len(args) < 2:
1145
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1146

    
1147
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1148
    _ShowWatcherPause(result)
1149

    
1150
  elif command == "info":
1151
    result = client.QueryConfigValues(["watcher_pause"])
1152
    _ShowWatcherPause(result[0])
1153

    
1154
  else:
1155
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1156
                               errors.ECODE_INVAL)
1157

    
1158
  return 0
1159

    
1160

    
1161
def _OobPower(opts, node_list, power):
1162
  """Puts the node in the list to desired power state.
1163

1164
  @param opts: The command line options selected by the user
1165
  @param node_list: The list of nodes to operate on
1166
  @param power: True if they should be powered on, False otherwise
1167
  @return: The success of the operation (none failed)
1168

1169
  """
1170
  if power:
1171
    command = constants.OOB_POWER_ON
1172
  else:
1173
    command = constants.OOB_POWER_OFF
1174

    
1175
  op = opcodes.OpOobCommand(node_names=node_list,
1176
                            command=command,
1177
                            ignore_status=True,
1178
                            timeout=opts.oob_timeout,
1179
                            power_delay=opts.power_delay)
1180
  result = SubmitOpCode(op, opts=opts)
1181
  errs = 0
1182
  for node_result in result:
1183
    (node_tuple, data_tuple) = node_result
1184
    (_, node_name) = node_tuple
1185
    (data_status, _) = data_tuple
1186
    if data_status != constants.RS_NORMAL:
1187
      assert data_status != constants.RS_UNAVAIL
1188
      errs += 1
1189
      ToStderr("There was a problem changing power for %s, please investigate",
1190
               node_name)
1191

    
1192
  if errs > 0:
1193
    return False
1194

    
1195
  return True
1196

    
1197

    
1198
def _InstanceStart(opts, inst_list, start, no_remember=False):
1199
  """Puts the instances in the list to desired state.
1200

1201
  @param opts: The command line options selected by the user
1202
  @param inst_list: The list of instances to operate on
1203
  @param start: True if they should be started, False for shutdown
1204
  @param no_remember: If the instance state should be remembered
1205
  @return: The success of the operation (none failed)
1206

1207
  """
1208
  if start:
1209
    opcls = opcodes.OpInstanceStartup
1210
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1211
  else:
1212
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1213
                           timeout=opts.shutdown_timeout,
1214
                           no_remember=no_remember)
1215
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1216

    
1217
  jex = JobExecutor(opts=opts)
1218

    
1219
  for inst in inst_list:
1220
    ToStdout("Submit %s of instance %s", text_submit, inst)
1221
    op = opcls(instance_name=inst)
1222
    jex.QueueJob(inst, op)
1223

    
1224
  results = jex.GetResults()
1225
  bad_cnt = len([1 for (success, _) in results if not success])
1226

    
1227
  if bad_cnt == 0:
1228
    ToStdout("All instances have been %s successfully", text_success)
1229
  else:
1230
    ToStderr("There were errors while %s instances:\n"
1231
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1232
             len(results))
1233
    return False
1234

    
1235
  return True
1236

    
1237

    
1238
class _RunWhenNodesReachableHelper:
1239
  """Helper class to make shared internal state sharing easier.
1240

1241
  @ivar success: Indicates if all action_cb calls were successful
1242

1243
  """
1244
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1245
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1246
    """Init the object.
1247

1248
    @param node_list: The list of nodes to be reachable
1249
    @param action_cb: Callback called when a new host is reachable
1250
    @type node2ip: dict
1251
    @param node2ip: Node to ip mapping
1252
    @param port: The port to use for the TCP ping
1253
    @param feedback_fn: The function used for feedback
1254
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1255
    @param _sleep_fn: Function to sleep (for unittest use only)
1256

1257
    """
1258
    self.down = set(node_list)
1259
    self.up = set()
1260
    self.node2ip = node2ip
1261
    self.success = True
1262
    self.action_cb = action_cb
1263
    self.port = port
1264
    self.feedback_fn = feedback_fn
1265
    self._ping_fn = _ping_fn
1266
    self._sleep_fn = _sleep_fn
1267

    
1268
  def __call__(self):
1269
    """When called we run action_cb.
1270

1271
    @raises utils.RetryAgain: When there are still down nodes
1272

1273
    """
1274
    if not self.action_cb(self.up):
1275
      self.success = False
1276

    
1277
    if self.down:
1278
      raise utils.RetryAgain()
1279
    else:
1280
      return self.success
1281

    
1282
  def Wait(self, secs):
1283
    """Checks if a host is up or waits remaining seconds.
1284

1285
    @param secs: The secs remaining
1286

1287
    """
1288
    start = time.time()
1289
    for node in self.down:
1290
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1291
                       live_port_needed=True):
1292
        self.feedback_fn("Node %s became available" % node)
1293
        self.up.add(node)
1294
        self.down -= self.up
1295
        # If we have a node available there is the possibility to run the
1296
        # action callback successfully, therefore we don't wait and return
1297
        return
1298

    
1299
    self._sleep_fn(max(0.0, start + secs - time.time()))
1300

    
1301

    
1302
def _RunWhenNodesReachable(node_list, action_cb, interval):
1303
  """Run action_cb when nodes become reachable.
1304

1305
  @param node_list: The list of nodes to be reachable
1306
  @param action_cb: Callback called when a new host is reachable
1307
  @param interval: The earliest time to retry
1308

1309
  """
1310
  client = GetClient()
1311
  cluster_info = client.QueryClusterInfo()
1312
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1313
    family = netutils.IPAddress.family
1314
  else:
1315
    family = netutils.IP6Address.family
1316

    
1317
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1318
                 for node in node_list)
1319

    
1320
  port = netutils.GetDaemonPort(constants.NODED)
1321
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1322
                                        ToStdout)
1323

    
1324
  try:
1325
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1326
                       wait_fn=helper.Wait)
1327
  except utils.RetryTimeout:
1328
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1329
             " again:\n  - %s", "  - ".join(helper.down))
1330
    return False
1331

    
1332

    
1333
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1334
                          _instance_start_fn=_InstanceStart):
1335
  """Start the instances conditional based on node_states.
1336

1337
  @param opts: The command line options selected by the user
1338
  @param inst_map: A dict of inst -> nodes mapping
1339
  @param nodes_online: A list of nodes online
1340
  @param _instance_start_fn: Callback to start instances (unittest use only)
1341
  @return: Success of the operation on all instances
1342

1343
  """
1344
  start_inst_list = []
1345
  for (inst, nodes) in inst_map.items():
1346
    if not (nodes - nodes_online):
1347
      # All nodes the instance lives on are back online
1348
      start_inst_list.append(inst)
1349

    
1350
  for inst in start_inst_list:
1351
    del inst_map[inst]
1352

    
1353
  if start_inst_list:
1354
    return _instance_start_fn(opts, start_inst_list, True)
1355

    
1356
  return True
1357

    
1358

    
1359
def _EpoOn(opts, full_node_list, node_list, inst_map):
1360
  """Does the actual power on.
1361

1362
  @param opts: The command line options selected by the user
1363
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1364
                         OOB)
1365
  @param node_list: The list of nodes to operate on (all need to support OOB)
1366
  @param inst_map: A dict of inst -> nodes mapping
1367
  @return: The desired exit status
1368

1369
  """
1370
  if node_list and not _OobPower(opts, node_list, False):
1371
    ToStderr("Not all nodes seem to get back up, investigate and start"
1372
             " manually if needed")
1373

    
1374
  # Wait for the nodes to be back up
1375
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1376

    
1377
  ToStdout("Waiting until all nodes are available again")
1378
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1379
    ToStderr("Please investigate and start stopped instances manually")
1380
    return constants.EXIT_FAILURE
1381

    
1382
  return constants.EXIT_SUCCESS
1383

    
1384

    
1385
def _EpoOff(opts, node_list, inst_map):
1386
  """Does the actual power off.
1387

1388
  @param opts: The command line options selected by the user
1389
  @param node_list: The list of nodes to operate on (all need to support OOB)
1390
  @param inst_map: A dict of inst -> nodes mapping
1391
  @return: The desired exit status
1392

1393
  """
1394
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1395
    ToStderr("Please investigate and stop instances manually before continuing")
1396
    return constants.EXIT_FAILURE
1397

    
1398
  if not node_list:
1399
    return constants.EXIT_SUCCESS
1400

    
1401
  if _OobPower(opts, node_list, False):
1402
    return constants.EXIT_SUCCESS
1403
  else:
1404
    return constants.EXIT_FAILURE
1405

    
1406

    
1407
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1408
        _confirm_fn=ConfirmOperation,
1409
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1410
  """EPO operations.
1411

1412
  @param opts: the command line options selected by the user
1413
  @type args: list
1414
  @param args: should contain only one element, the subcommand
1415
  @rtype: int
1416
  @return: the desired exit code
1417

1418
  """
1419
  if opts.groups and opts.show_all:
1420
    _stderr_fn("Only one of --groups or --all are allowed")
1421
    return constants.EXIT_FAILURE
1422
  elif args and opts.show_all:
1423
    _stderr_fn("Arguments in combination with --all are not allowed")
1424
    return constants.EXIT_FAILURE
1425

    
1426
  if cl is None:
1427
    cl = GetClient()
1428

    
1429
  if opts.groups:
1430
    node_query_list = \
1431
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1432
  else:
1433
    node_query_list = args
1434

    
1435
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1436
                                           "sinst_list", "powered", "offline"],
1437
                         False)
1438

    
1439
  all_nodes = map(compat.fst, result)
1440
  node_list = []
1441
  inst_map = {}
1442
  for (node, master, pinsts, sinsts, powered, offline) in result:
1443
    if not offline:
1444
      for inst in (pinsts + sinsts):
1445
        if inst in inst_map:
1446
          if not master:
1447
            inst_map[inst].add(node)
1448
        elif master:
1449
          inst_map[inst] = set()
1450
        else:
1451
          inst_map[inst] = set([node])
1452

    
1453
    if master and opts.on:
1454
      # We ignore the master for turning on the machines, in fact we are
1455
      # already operating on the master at this point :)
1456
      continue
1457
    elif master and not opts.show_all:
1458
      _stderr_fn("%s is the master node, please do a master-failover to another"
1459
                 " node not affected by the EPO or use --all if you intend to"
1460
                 " shutdown the whole cluster", node)
1461
      return constants.EXIT_FAILURE
1462
    elif powered is None:
1463
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1464
                 " handled in a fully automated manner", node)
1465
    elif powered == opts.on:
1466
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1467
    elif not offline or (offline and powered):
1468
      node_list.append(node)
1469

    
1470
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1471
    return constants.EXIT_FAILURE
1472

    
1473
  if opts.on:
1474
    return _on_fn(opts, all_nodes, node_list, inst_map)
1475
  else:
1476
    return _off_fn(opts, node_list, inst_map)
1477

    
1478

    
1479
commands = {
1480
  "init": (
1481
    InitCluster, [ArgHost(min=1, max=1)],
1482
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1483
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1484
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1485
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1486
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1487
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1488
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1489
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT] + INSTANCE_POLICY_OPTS,
1490
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1491
  "destroy": (
1492
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1493
    "", "Destroy cluster"),
1494
  "rename": (
1495
    RenameCluster, [ArgHost(min=1, max=1)],
1496
    [FORCE_OPT, DRY_RUN_OPT],
1497
    "<new_name>",
1498
    "Renames the cluster"),
1499
  "redist-conf": (
1500
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1501
    "", "Forces a push of the configuration file and ssconf files"
1502
    " to the nodes in the cluster"),
1503
  "verify": (
1504
    VerifyCluster, ARGS_NONE,
1505
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1506
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1507
    "", "Does a check on the cluster configuration"),
1508
  "verify-disks": (
1509
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1510
    "", "Does a check on the cluster disk status"),
1511
  "repair-disk-sizes": (
1512
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1513
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1514
  "master-failover": (
1515
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1516
    "", "Makes the current node the master"),
1517
  "master-ping": (
1518
    MasterPing, ARGS_NONE, [],
1519
    "", "Checks if the master is alive"),
1520
  "version": (
1521
    ShowClusterVersion, ARGS_NONE, [],
1522
    "", "Shows the cluster version"),
1523
  "getmaster": (
1524
    ShowClusterMaster, ARGS_NONE, [],
1525
    "", "Shows the cluster master"),
1526
  "copyfile": (
1527
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1528
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1529
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1530
  "command": (
1531
    RunClusterCommand, [ArgCommand(min=1)],
1532
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1533
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1534
  "info": (
1535
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1536
    "[--roman]", "Show cluster configuration"),
1537
  "list-tags": (
1538
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1539
  "add-tags": (
1540
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1541
    "tag...", "Add tags to the cluster"),
1542
  "remove-tags": (
1543
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1544
    "tag...", "Remove tags from the cluster"),
1545
  "search-tags": (
1546
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1547
    "Searches the tags on all objects on"
1548
    " the cluster for a given pattern (regex)"),
1549
  "queue": (
1550
    QueueOps,
1551
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1552
    [], "drain|undrain|info", "Change queue properties"),
1553
  "watcher": (
1554
    WatcherOps,
1555
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1556
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1557
    [],
1558
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1559
  "modify": (
1560
    SetClusterParams, ARGS_NONE,
1561
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1562
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1563
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1564
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1565
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1566
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1567
     DISK_STATE_OPT, SUBMIT_OPT] +
1568
    INSTANCE_POLICY_OPTS,
1569
    "[opts...]",
1570
    "Alters the parameters of the cluster"),
1571
  "renew-crypto": (
1572
    RenewCrypto, ARGS_NONE,
1573
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1574
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1575
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1576
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1577
    "[opts...]",
1578
    "Renews cluster certificates, keys and secrets"),
1579
  "epo": (
1580
    Epo, [ArgUnknown()],
1581
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1582
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1583
    "[opts...] [args]",
1584
    "Performs an emergency power-off on given args"),
1585
  "activate-master-ip": (
1586
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1587
  "deactivate-master-ip": (
1588
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1589
    "Deactivates the master IP"),
1590
  }
1591

    
1592

    
1593
#: dictionary with aliases for commands
1594
aliases = {
1595
  "masterfailover": "master-failover",
1596
  "show": "info",
1597
}
1598

    
1599

    
1600
def Main():
1601
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1602
                     aliases=aliases)