Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ ea9d3b40

History | View | Annotate | Download (52.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os.path
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import pathutils
47

    
48

    
49
ON_OPT = cli_option("--on", default=False,
50
                    action="store_true", dest="on",
51
                    help="Recover from an EPO")
52

    
53
GROUPS_OPT = cli_option("--groups", default=False,
54
                        action="store_true", dest="groups",
55
                        help="Arguments are node groups instead of nodes")
56

    
57
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
58
                            help="Override interactive check for --no-voting",
59
                            default=False, action="store_true")
60

    
61
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
62
_EPO_PING_TIMEOUT = 1 # 1 second
63
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
64

    
65

    
66
@UsesRPC
67
def InitCluster(opts, args):
68
  """Initialize the cluster.
69

70
  @param opts: the command line options selected by the user
71
  @type args: list
72
  @param args: should contain only one element, the desired
73
      cluster name
74
  @rtype: int
75
  @return: the desired exit code
76

77
  """
78
  if not opts.lvm_storage and opts.vg_name:
79
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
80
    return 1
81

    
82
  vg_name = opts.vg_name
83
  if opts.lvm_storage and not opts.vg_name:
84
    vg_name = constants.DEFAULT_VG
85

    
86
  if not opts.drbd_storage and opts.drbd_helper:
87
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
88
    return 1
89

    
90
  drbd_helper = opts.drbd_helper
91
  if opts.drbd_storage and not opts.drbd_helper:
92
    drbd_helper = constants.DEFAULT_DRBD_HELPER
93

    
94
  master_netdev = opts.master_netdev
95
  if master_netdev is None:
96
    master_netdev = constants.DEFAULT_BRIDGE
97

    
98
  hvlist = opts.enabled_hypervisors
99
  if hvlist is None:
100
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
101
  hvlist = hvlist.split(",")
102

    
103
  hvparams = dict(opts.hvparams)
104
  beparams = opts.beparams
105
  nicparams = opts.nicparams
106

    
107
  diskparams = dict(opts.diskparams)
108

    
109
  # check the disk template types here, as we cannot rely on the type check done
110
  # by the opcode parameter types
111
  diskparams_keys = set(diskparams.keys())
112
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
113
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
114
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
115
    return 1
116

    
117
  # prepare beparams dict
118
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
119
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
120

    
121
  # prepare nicparams dict
122
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
123
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
124

    
125
  # prepare ndparams dict
126
  if opts.ndparams is None:
127
    ndparams = dict(constants.NDC_DEFAULTS)
128
  else:
129
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
130
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
131

    
132
  # prepare hvparams dict
133
  for hv in constants.HYPER_TYPES:
134
    if hv not in hvparams:
135
      hvparams[hv] = {}
136
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
137
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
138

    
139
  # prepare diskparams dict
140
  for templ in constants.DISK_TEMPLATES:
141
    if templ not in diskparams:
142
      diskparams[templ] = {}
143
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
144
                                         diskparams[templ])
145
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
146

    
147
  # prepare ipolicy dict
148
  ipolicy = CreateIPolicyFromOpts(
149
    ispecs_mem_size=opts.ispecs_mem_size,
150
    ispecs_cpu_count=opts.ispecs_cpu_count,
151
    ispecs_disk_count=opts.ispecs_disk_count,
152
    ispecs_disk_size=opts.ispecs_disk_size,
153
    ispecs_nic_count=opts.ispecs_nic_count,
154
    minmax_ispecs=opts.ipolicy_bounds_specs,
155
    std_ispecs=opts.ipolicy_std_specs,
156
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
157
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
158
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
159
    fill_all=True)
160

    
161
  if opts.candidate_pool_size is None:
162
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
163

    
164
  if opts.mac_prefix is None:
165
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
166

    
167
  uid_pool = opts.uid_pool
168
  if uid_pool is not None:
169
    uid_pool = uidpool.ParseUidPool(uid_pool)
170

    
171
  if opts.prealloc_wipe_disks is None:
172
    opts.prealloc_wipe_disks = False
173

    
174
  external_ip_setup_script = opts.use_external_mip_script
175
  if external_ip_setup_script is None:
176
    external_ip_setup_script = False
177

    
178
  try:
179
    primary_ip_version = int(opts.primary_ip_version)
180
  except (ValueError, TypeError), err:
181
    ToStderr("Invalid primary ip version value: %s" % str(err))
182
    return 1
183

    
184
  master_netmask = opts.master_netmask
185
  try:
186
    if master_netmask is not None:
187
      master_netmask = int(master_netmask)
188
  except (ValueError, TypeError), err:
189
    ToStderr("Invalid master netmask value: %s" % str(err))
190
    return 1
191

    
192
  if opts.disk_state:
193
    disk_state = utils.FlatToDict(opts.disk_state)
194
  else:
195
    disk_state = {}
196

    
197
  hv_state = dict(opts.hv_state)
198

    
199
  enabled_disk_templates = opts.enabled_disk_templates
200
  if enabled_disk_templates:
201
    enabled_disk_templates = enabled_disk_templates.split(",")
202
  else:
203
    enabled_disk_templates = list(constants.DEFAULT_ENABLED_DISK_TEMPLATES)
204

    
205
  bootstrap.InitCluster(cluster_name=args[0],
206
                        secondary_ip=opts.secondary_ip,
207
                        vg_name=vg_name,
208
                        mac_prefix=opts.mac_prefix,
209
                        master_netmask=master_netmask,
210
                        master_netdev=master_netdev,
211
                        file_storage_dir=opts.file_storage_dir,
212
                        shared_file_storage_dir=opts.shared_file_storage_dir,
213
                        enabled_hypervisors=hvlist,
214
                        hvparams=hvparams,
215
                        beparams=beparams,
216
                        nicparams=nicparams,
217
                        ndparams=ndparams,
218
                        diskparams=diskparams,
219
                        ipolicy=ipolicy,
220
                        candidate_pool_size=opts.candidate_pool_size,
221
                        modify_etc_hosts=opts.modify_etc_hosts,
222
                        modify_ssh_setup=opts.modify_ssh_setup,
223
                        maintain_node_health=opts.maintain_node_health,
224
                        drbd_helper=drbd_helper,
225
                        uid_pool=uid_pool,
226
                        default_iallocator=opts.default_iallocator,
227
                        primary_ip_version=primary_ip_version,
228
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
229
                        use_external_mip_script=external_ip_setup_script,
230
                        hv_state=hv_state,
231
                        disk_state=disk_state,
232
                        enabled_disk_templates=enabled_disk_templates,
233
                        )
234
  op = opcodes.OpClusterPostInit()
235
  SubmitOpCode(op, opts=opts)
236
  return 0
237

    
238

    
239
@UsesRPC
240
def DestroyCluster(opts, args):
241
  """Destroy the cluster.
242

243
  @param opts: the command line options selected by the user
244
  @type args: list
245
  @param args: should be an empty list
246
  @rtype: int
247
  @return: the desired exit code
248

249
  """
250
  if not opts.yes_do_it:
251
    ToStderr("Destroying a cluster is irreversible. If you really want"
252
             " destroy this cluster, supply the --yes-do-it option.")
253
    return 1
254

    
255
  op = opcodes.OpClusterDestroy()
256
  master = SubmitOpCode(op, opts=opts)
257
  # if we reached this, the opcode didn't fail; we can proceed to
258
  # shutdown all the daemons
259
  bootstrap.FinalizeClusterDestroy(master)
260
  return 0
261

    
262

    
263
def RenameCluster(opts, args):
264
  """Rename the cluster.
265

266
  @param opts: the command line options selected by the user
267
  @type args: list
268
  @param args: should contain only one element, the new cluster name
269
  @rtype: int
270
  @return: the desired exit code
271

272
  """
273
  cl = GetClient()
274

    
275
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
276

    
277
  new_name = args[0]
278
  if not opts.force:
279
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
280
                " connected over the network to the cluster name, the"
281
                " operation is very dangerous as the IP address will be"
282
                " removed from the node and the change may not go through."
283
                " Continue?") % (cluster_name, new_name)
284
    if not AskUser(usertext):
285
      return 1
286

    
287
  op = opcodes.OpClusterRename(name=new_name)
288
  result = SubmitOpCode(op, opts=opts, cl=cl)
289

    
290
  if result:
291
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
292

    
293
  return 0
294

    
295

    
296
def ActivateMasterIp(opts, args):
297
  """Activates the master IP.
298

299
  """
300
  op = opcodes.OpClusterActivateMasterIp()
301
  SubmitOpCode(op)
302
  return 0
303

    
304

    
305
def DeactivateMasterIp(opts, args):
306
  """Deactivates the master IP.
307

308
  """
309
  if not opts.confirm:
310
    usertext = ("This will disable the master IP. All the open connections to"
311
                " the master IP will be closed. To reach the master you will"
312
                " need to use its node IP."
313
                " Continue?")
314
    if not AskUser(usertext):
315
      return 1
316

    
317
  op = opcodes.OpClusterDeactivateMasterIp()
318
  SubmitOpCode(op)
319
  return 0
320

    
321

    
322
def RedistributeConfig(opts, args):
323
  """Forces push of the cluster configuration.
324

325
  @param opts: the command line options selected by the user
326
  @type args: list
327
  @param args: empty list
328
  @rtype: int
329
  @return: the desired exit code
330

331
  """
332
  op = opcodes.OpClusterRedistConf()
333
  SubmitOrSend(op, opts)
334
  return 0
335

    
336

    
337
def ShowClusterVersion(opts, args):
338
  """Write version of ganeti software to the standard output.
339

340
  @param opts: the command line options selected by the user
341
  @type args: list
342
  @param args: should be an empty list
343
  @rtype: int
344
  @return: the desired exit code
345

346
  """
347
  cl = GetClient(query=True)
348
  result = cl.QueryClusterInfo()
349
  ToStdout("Software version: %s", result["software_version"])
350
  ToStdout("Internode protocol: %s", result["protocol_version"])
351
  ToStdout("Configuration format: %s", result["config_version"])
352
  ToStdout("OS api version: %s", result["os_api_version"])
353
  ToStdout("Export interface: %s", result["export_version"])
354
  return 0
355

    
356

    
357
def ShowClusterMaster(opts, args):
358
  """Write name of master node to the standard output.
359

360
  @param opts: the command line options selected by the user
361
  @type args: list
362
  @param args: should be an empty list
363
  @rtype: int
364
  @return: the desired exit code
365

366
  """
367
  master = bootstrap.GetMaster()
368
  ToStdout(master)
369
  return 0
370

    
371

    
372
def _FormatGroupedParams(paramsdict, roman=False):
373
  """Format Grouped parameters (be, nic, disk) by group.
374

375
  @type paramsdict: dict of dicts
376
  @param paramsdict: {group: {param: value, ...}, ...}
377
  @rtype: dict of dicts
378
  @return: copy of the input dictionaries with strings as values
379

380
  """
381
  ret = {}
382
  for (item, val) in paramsdict.items():
383
    if isinstance(val, dict):
384
      ret[item] = _FormatGroupedParams(val, roman=roman)
385
    elif roman and isinstance(val, int):
386
      ret[item] = compat.TryToRoman(val)
387
    else:
388
      ret[item] = str(val)
389
  return ret
390

    
391

    
392
def ShowClusterConfig(opts, args):
393
  """Shows cluster information.
394

395
  @param opts: the command line options selected by the user
396
  @type args: list
397
  @param args: should be an empty list
398
  @rtype: int
399
  @return: the desired exit code
400

401
  """
402
  cl = GetClient(query=True)
403
  result = cl.QueryClusterInfo()
404

    
405
  if result["tags"]:
406
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
407
  else:
408
    tags = "(none)"
409
  if result["reserved_lvs"]:
410
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
411
  else:
412
    reserved_lvs = "(none)"
413

    
414
  info = [
415
    ("Cluster name", result["name"]),
416
    ("Cluster UUID", result["uuid"]),
417

    
418
    ("Creation time", utils.FormatTime(result["ctime"])),
419
    ("Modification time", utils.FormatTime(result["mtime"])),
420

    
421
    ("Master node", result["master"]),
422

    
423
    ("Architecture (this node)",
424
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
425

    
426
    ("Tags", tags),
427

    
428
    ("Default hypervisor", result["default_hypervisor"]),
429
    ("Enabled hypervisors",
430
     utils.CommaJoin(result["enabled_hypervisors"])),
431

    
432
    ("Hypervisor parameters", _FormatGroupedParams(result["hvparams"])),
433

    
434
    ("OS-specific hypervisor parameters",
435
     _FormatGroupedParams(result["os_hvp"])),
436

    
437
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
438

    
439
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
440
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
441

    
442
    ("Cluster parameters", [
443
      ("candidate pool size",
444
       compat.TryToRoman(result["candidate_pool_size"],
445
                         convert=opts.roman_integers)),
446
      ("master netdev", result["master_netdev"]),
447
      ("master netmask", result["master_netmask"]),
448
      ("use external master IP address setup script",
449
       result["use_external_mip_script"]),
450
      ("lvm volume group", result["volume_group_name"]),
451
      ("lvm reserved volumes", reserved_lvs),
452
      ("drbd usermode helper", result["drbd_usermode_helper"]),
453
      ("file storage path", result["file_storage_dir"]),
454
      ("shared file storage path", result["shared_file_storage_dir"]),
455
      ("maintenance of node health", result["maintain_node_health"]),
456
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
457
      ("default instance allocator", result["default_iallocator"]),
458
      ("primary ip version", result["primary_ip_version"]),
459
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
460
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
461
      ("ExtStorage Providers search path",
462
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
463
      ("enabled disk templates",
464
       utils.CommaJoin(result["enabled_disk_templates"])),
465
      ]),
466

    
467
    ("Default node parameters",
468
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
469

    
470
    ("Default instance parameters",
471
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
472

    
473
    ("Default nic parameters",
474
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
475

    
476
    ("Default disk parameters",
477
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
478

    
479
    ("Instance policy - limits for instances",
480
     FormatPolicyInfo(result["ipolicy"], None, True)),
481
    ]
482

    
483
  PrintGenericInfo(info)
484
  return 0
485

    
486

    
487
def ClusterCopyFile(opts, args):
488
  """Copy a file from master to some nodes.
489

490
  @param opts: the command line options selected by the user
491
  @type args: list
492
  @param args: should contain only one element, the path of
493
      the file to be copied
494
  @rtype: int
495
  @return: the desired exit code
496

497
  """
498
  filename = args[0]
499
  if not os.path.exists(filename):
500
    raise errors.OpPrereqError("No such filename '%s'" % filename,
501
                               errors.ECODE_INVAL)
502

    
503
  cl = GetClient()
504

    
505
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
506

    
507
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
508
                           secondary_ips=opts.use_replication_network,
509
                           nodegroup=opts.nodegroup)
510

    
511
  srun = ssh.SshRunner(cluster_name)
512
  for node in results:
513
    if not srun.CopyFileToNode(node, filename):
514
      ToStderr("Copy of file %s to node %s failed", filename, node)
515

    
516
  return 0
517

    
518

    
519
def RunClusterCommand(opts, args):
520
  """Run a command on some nodes.
521

522
  @param opts: the command line options selected by the user
523
  @type args: list
524
  @param args: should contain the command to be run and its arguments
525
  @rtype: int
526
  @return: the desired exit code
527

528
  """
529
  cl = GetClient()
530

    
531
  command = " ".join(args)
532

    
533
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
534

    
535
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
536
                                                    "master_node"])
537

    
538
  srun = ssh.SshRunner(cluster_name=cluster_name)
539

    
540
  # Make sure master node is at list end
541
  if master_node in nodes:
542
    nodes.remove(master_node)
543
    nodes.append(master_node)
544

    
545
  for name in nodes:
546
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
547

    
548
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
549
      # Do not output anything for successful commands
550
      continue
551

    
552
    ToStdout("------------------------------------------------")
553
    if opts.show_machine_names:
554
      for line in result.output.splitlines():
555
        ToStdout("%s: %s", name, line)
556
    else:
557
      ToStdout("node: %s", name)
558
      ToStdout("%s", result.output)
559
    ToStdout("return code = %s", result.exit_code)
560

    
561
  return 0
562

    
563

    
564
def VerifyCluster(opts, args):
565
  """Verify integrity of cluster, performing various test on nodes.
566

567
  @param opts: the command line options selected by the user
568
  @type args: list
569
  @param args: should be an empty list
570
  @rtype: int
571
  @return: the desired exit code
572

573
  """
574
  skip_checks = []
575

    
576
  if opts.skip_nplusone_mem:
577
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
578

    
579
  cl = GetClient()
580

    
581
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
582
                               error_codes=opts.error_codes,
583
                               debug_simulate_errors=opts.simulate_errors,
584
                               skip_checks=skip_checks,
585
                               ignore_errors=opts.ignore_errors,
586
                               group_name=opts.nodegroup)
587
  result = SubmitOpCode(op, cl=cl, opts=opts)
588

    
589
  # Keep track of submitted jobs
590
  jex = JobExecutor(cl=cl, opts=opts)
591

    
592
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
593
    jex.AddJobId(None, status, job_id)
594

    
595
  results = jex.GetResults()
596

    
597
  (bad_jobs, bad_results) = \
598
    map(len,
599
        # Convert iterators to lists
600
        map(list,
601
            # Count errors
602
            map(compat.partial(itertools.ifilterfalse, bool),
603
                # Convert result to booleans in a tuple
604
                zip(*((job_success, len(op_results) == 1 and op_results[0])
605
                      for (job_success, op_results) in results)))))
606

    
607
  if bad_jobs == 0 and bad_results == 0:
608
    rcode = constants.EXIT_SUCCESS
609
  else:
610
    rcode = constants.EXIT_FAILURE
611
    if bad_jobs > 0:
612
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
613

    
614
  return rcode
615

    
616

    
617
def VerifyDisks(opts, args):
618
  """Verify integrity of cluster disks.
619

620
  @param opts: the command line options selected by the user
621
  @type args: list
622
  @param args: should be an empty list
623
  @rtype: int
624
  @return: the desired exit code
625

626
  """
627
  cl = GetClient()
628

    
629
  op = opcodes.OpClusterVerifyDisks()
630

    
631
  result = SubmitOpCode(op, cl=cl, opts=opts)
632

    
633
  # Keep track of submitted jobs
634
  jex = JobExecutor(cl=cl, opts=opts)
635

    
636
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
637
    jex.AddJobId(None, status, job_id)
638

    
639
  retcode = constants.EXIT_SUCCESS
640

    
641
  for (status, result) in jex.GetResults():
642
    if not status:
643
      ToStdout("Job failed: %s", result)
644
      continue
645

    
646
    ((bad_nodes, instances, missing), ) = result
647

    
648
    for node, text in bad_nodes.items():
649
      ToStdout("Error gathering data on node %s: %s",
650
               node, utils.SafeEncode(text[-400:]))
651
      retcode = constants.EXIT_FAILURE
652
      ToStdout("You need to fix these nodes first before fixing instances")
653

    
654
    for iname in instances:
655
      if iname in missing:
656
        continue
657
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
658
      try:
659
        ToStdout("Activating disks for instance '%s'", iname)
660
        SubmitOpCode(op, opts=opts, cl=cl)
661
      except errors.GenericError, err:
662
        nret, msg = FormatError(err)
663
        retcode |= nret
664
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
665

    
666
    if missing:
667
      for iname, ival in missing.iteritems():
668
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
669
        if all_missing:
670
          ToStdout("Instance %s cannot be verified as it lives on"
671
                   " broken nodes", iname)
672
        else:
673
          ToStdout("Instance %s has missing logical volumes:", iname)
674
          ival.sort()
675
          for node, vol in ival:
676
            if node in bad_nodes:
677
              ToStdout("\tbroken node %s /dev/%s", node, vol)
678
            else:
679
              ToStdout("\t%s /dev/%s", node, vol)
680

    
681
      ToStdout("You need to replace or recreate disks for all the above"
682
               " instances if this message persists after fixing broken nodes.")
683
      retcode = constants.EXIT_FAILURE
684
    elif not instances:
685
      ToStdout("No disks need to be activated.")
686

    
687
  return retcode
688

    
689

    
690
def RepairDiskSizes(opts, args):
691
  """Verify sizes of cluster disks.
692

693
  @param opts: the command line options selected by the user
694
  @type args: list
695
  @param args: optional list of instances to restrict check to
696
  @rtype: int
697
  @return: the desired exit code
698

699
  """
700
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
701
  SubmitOpCode(op, opts=opts)
702

    
703

    
704
@UsesRPC
705
def MasterFailover(opts, args):
706
  """Failover the master node.
707

708
  This command, when run on a non-master node, will cause the current
709
  master to cease being master, and the non-master to become new
710
  master.
711

712
  @param opts: the command line options selected by the user
713
  @type args: list
714
  @param args: should be an empty list
715
  @rtype: int
716
  @return: the desired exit code
717

718
  """
719
  if opts.no_voting and not opts.yes_do_it:
720
    usertext = ("This will perform the failover even if most other nodes"
721
                " are down, or if this node is outdated. This is dangerous"
722
                " as it can lead to a non-consistent cluster. Check the"
723
                " gnt-cluster(8) man page before proceeding. Continue?")
724
    if not AskUser(usertext):
725
      return 1
726

    
727
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
728

    
729

    
730
def MasterPing(opts, args):
731
  """Checks if the master is alive.
732

733
  @param opts: the command line options selected by the user
734
  @type args: list
735
  @param args: should be an empty list
736
  @rtype: int
737
  @return: the desired exit code
738

739
  """
740
  try:
741
    cl = GetClient()
742
    cl.QueryClusterInfo()
743
    return 0
744
  except Exception: # pylint: disable=W0703
745
    return 1
746

    
747

    
748
def SearchTags(opts, args):
749
  """Searches the tags on all the cluster.
750

751
  @param opts: the command line options selected by the user
752
  @type args: list
753
  @param args: should contain only one element, the tag pattern
754
  @rtype: int
755
  @return: the desired exit code
756

757
  """
758
  op = opcodes.OpTagsSearch(pattern=args[0])
759
  result = SubmitOpCode(op, opts=opts)
760
  if not result:
761
    return 1
762
  result = list(result)
763
  result.sort()
764
  for path, tag in result:
765
    ToStdout("%s %s", path, tag)
766

    
767

    
768
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
769
  """Reads and verifies an X509 certificate.
770

771
  @type cert_filename: string
772
  @param cert_filename: the path of the file containing the certificate to
773
                        verify encoded in PEM format
774
  @type verify_private_key: bool
775
  @param verify_private_key: whether to verify the private key in addition to
776
                             the public certificate
777
  @rtype: string
778
  @return: a string containing the PEM-encoded certificate.
779

780
  """
781
  try:
782
    pem = utils.ReadFile(cert_filename)
783
  except IOError, err:
784
    raise errors.X509CertError(cert_filename,
785
                               "Unable to read certificate: %s" % str(err))
786

    
787
  try:
788
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
789
  except Exception, err:
790
    raise errors.X509CertError(cert_filename,
791
                               "Unable to load certificate: %s" % str(err))
792

    
793
  if verify_private_key:
794
    try:
795
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
796
    except Exception, err:
797
      raise errors.X509CertError(cert_filename,
798
                                 "Unable to load private key: %s" % str(err))
799

    
800
  return pem
801

    
802

    
803
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
804
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
805
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
806
                 cds_filename, force):
807
  """Renews cluster certificates, keys and secrets.
808

809
  @type new_cluster_cert: bool
810
  @param new_cluster_cert: Whether to generate a new cluster certificate
811
  @type new_rapi_cert: bool
812
  @param new_rapi_cert: Whether to generate a new RAPI certificate
813
  @type rapi_cert_filename: string
814
  @param rapi_cert_filename: Path to file containing new RAPI certificate
815
  @type new_spice_cert: bool
816
  @param new_spice_cert: Whether to generate a new SPICE certificate
817
  @type spice_cert_filename: string
818
  @param spice_cert_filename: Path to file containing new SPICE certificate
819
  @type spice_cacert_filename: string
820
  @param spice_cacert_filename: Path to file containing the certificate of the
821
                                CA that signed the SPICE certificate
822
  @type new_confd_hmac_key: bool
823
  @param new_confd_hmac_key: Whether to generate a new HMAC key
824
  @type new_cds: bool
825
  @param new_cds: Whether to generate a new cluster domain secret
826
  @type cds_filename: string
827
  @param cds_filename: Path to file containing new cluster domain secret
828
  @type force: bool
829
  @param force: Whether to ask user for confirmation
830

831
  """
832
  if new_rapi_cert and rapi_cert_filename:
833
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
834
             " options can be specified at the same time.")
835
    return 1
836

    
837
  if new_cds and cds_filename:
838
    ToStderr("Only one of the --new-cluster-domain-secret and"
839
             " --cluster-domain-secret options can be specified at"
840
             " the same time.")
841
    return 1
842

    
843
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
844
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
845
             " and --spice-ca-certificate must not be used.")
846
    return 1
847

    
848
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
849
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
850
             " specified.")
851
    return 1
852

    
853
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
854
  try:
855
    if rapi_cert_filename:
856
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
857
    if spice_cert_filename:
858
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
859
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
860
  except errors.X509CertError, err:
861
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
862
    return 1
863

    
864
  if cds_filename:
865
    try:
866
      cds = utils.ReadFile(cds_filename)
867
    except Exception, err: # pylint: disable=W0703
868
      ToStderr("Can't load new cluster domain secret from %s: %s" %
869
               (cds_filename, str(err)))
870
      return 1
871
  else:
872
    cds = None
873

    
874
  if not force:
875
    usertext = ("This requires all daemons on all nodes to be restarted and"
876
                " may take some time. Continue?")
877
    if not AskUser(usertext):
878
      return 1
879

    
880
  def _RenewCryptoInner(ctx):
881
    ctx.feedback_fn("Updating certificates and keys")
882
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
883
                                    new_rapi_cert,
884
                                    new_spice_cert,
885
                                    new_confd_hmac_key,
886
                                    new_cds,
887
                                    rapi_cert_pem=rapi_cert_pem,
888
                                    spice_cert_pem=spice_cert_pem,
889
                                    spice_cacert_pem=spice_cacert_pem,
890
                                    cds=cds)
891

    
892
    files_to_copy = []
893

    
894
    if new_cluster_cert:
895
      files_to_copy.append(pathutils.NODED_CERT_FILE)
896

    
897
    if new_rapi_cert or rapi_cert_pem:
898
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
899

    
900
    if new_spice_cert or spice_cert_pem:
901
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
902
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
903

    
904
    if new_confd_hmac_key:
905
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
906

    
907
    if new_cds or cds:
908
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
909

    
910
    if files_to_copy:
911
      for node_name in ctx.nonmaster_nodes:
912
        ctx.feedback_fn("Copying %s to %s" %
913
                        (", ".join(files_to_copy), node_name))
914
        for file_name in files_to_copy:
915
          ctx.ssh.CopyFileToNode(node_name, file_name)
916

    
917
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
918

    
919
  ToStdout("All requested certificates and keys have been replaced."
920
           " Running \"gnt-cluster verify\" now is recommended.")
921

    
922
  return 0
923

    
924

    
925
def RenewCrypto(opts, args):
926
  """Renews cluster certificates, keys and secrets.
927

928
  """
929
  return _RenewCrypto(opts.new_cluster_cert,
930
                      opts.new_rapi_cert,
931
                      opts.rapi_cert,
932
                      opts.new_spice_cert,
933
                      opts.spice_cert,
934
                      opts.spice_cacert,
935
                      opts.new_confd_hmac_key,
936
                      opts.new_cluster_domain_secret,
937
                      opts.cluster_domain_secret,
938
                      opts.force)
939

    
940

    
941
def SetClusterParams(opts, args):
942
  """Modify the cluster.
943

944
  @param opts: the command line options selected by the user
945
  @type args: list
946
  @param args: should be an empty list
947
  @rtype: int
948
  @return: the desired exit code
949

950
  """
951
  if not (not opts.lvm_storage or opts.vg_name or
952
          not opts.drbd_storage or opts.drbd_helper or
953
          opts.enabled_hypervisors or opts.hvparams or
954
          opts.beparams or opts.nicparams or
955
          opts.ndparams or opts.diskparams or
956
          opts.candidate_pool_size is not None or
957
          opts.uid_pool is not None or
958
          opts.maintain_node_health is not None or
959
          opts.add_uids is not None or
960
          opts.remove_uids is not None or
961
          opts.default_iallocator is not None or
962
          opts.reserved_lvs is not None or
963
          opts.master_netdev is not None or
964
          opts.master_netmask is not None or
965
          opts.use_external_mip_script is not None or
966
          opts.prealloc_wipe_disks is not None or
967
          opts.hv_state or
968
          opts.enabled_disk_templates or
969
          opts.disk_state or
970
          opts.ispecs_mem_size or
971
          opts.ispecs_cpu_count or
972
          opts.ispecs_disk_count or
973
          opts.ispecs_disk_size or
974
          opts.ispecs_nic_count or
975
          opts.ipolicy_bounds_specs is not None or
976
          opts.ipolicy_std_specs is not None or
977
          opts.ipolicy_disk_templates is not None or
978
          opts.ipolicy_vcpu_ratio is not None or
979
          opts.ipolicy_spindle_ratio is not None):
980
    ToStderr("Please give at least one of the parameters.")
981
    return 1
982

    
983
  vg_name = opts.vg_name
984
  if not opts.lvm_storage and opts.vg_name:
985
    ToStderr("Options --no-lvm-storage and --vg-name conflict.")
986
    return 1
987

    
988
  if not opts.lvm_storage:
989
    vg_name = ""
990

    
991
  drbd_helper = opts.drbd_helper
992
  if not opts.drbd_storage and opts.drbd_helper:
993
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
994
    return 1
995

    
996
  if not opts.drbd_storage:
997
    drbd_helper = ""
998

    
999
  hvlist = opts.enabled_hypervisors
1000
  if hvlist is not None:
1001
    hvlist = hvlist.split(",")
1002

    
1003
  enabled_disk_templates = opts.enabled_disk_templates
1004
  if enabled_disk_templates:
1005
    enabled_disk_templates = enabled_disk_templates.split(",")
1006

    
1007
  # a list of (name, dict) we can pass directly to dict() (or [])
1008
  hvparams = dict(opts.hvparams)
1009
  for hv_params in hvparams.values():
1010
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1011

    
1012
  diskparams = dict(opts.diskparams)
1013

    
1014
  for dt_params in diskparams.values():
1015
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1016

    
1017
  beparams = opts.beparams
1018
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1019

    
1020
  nicparams = opts.nicparams
1021
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1022

    
1023
  ndparams = opts.ndparams
1024
  if ndparams is not None:
1025
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1026

    
1027
  ipolicy = CreateIPolicyFromOpts(
1028
    ispecs_mem_size=opts.ispecs_mem_size,
1029
    ispecs_cpu_count=opts.ispecs_cpu_count,
1030
    ispecs_disk_count=opts.ispecs_disk_count,
1031
    ispecs_disk_size=opts.ispecs_disk_size,
1032
    ispecs_nic_count=opts.ispecs_nic_count,
1033
    minmax_ispecs=opts.ipolicy_bounds_specs,
1034
    std_ispecs=opts.ipolicy_std_specs,
1035
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1036
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1037
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1038
    )
1039

    
1040
  mnh = opts.maintain_node_health
1041

    
1042
  uid_pool = opts.uid_pool
1043
  if uid_pool is not None:
1044
    uid_pool = uidpool.ParseUidPool(uid_pool)
1045

    
1046
  add_uids = opts.add_uids
1047
  if add_uids is not None:
1048
    add_uids = uidpool.ParseUidPool(add_uids)
1049

    
1050
  remove_uids = opts.remove_uids
1051
  if remove_uids is not None:
1052
    remove_uids = uidpool.ParseUidPool(remove_uids)
1053

    
1054
  if opts.reserved_lvs is not None:
1055
    if opts.reserved_lvs == "":
1056
      opts.reserved_lvs = []
1057
    else:
1058
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1059

    
1060
  if opts.master_netmask is not None:
1061
    try:
1062
      opts.master_netmask = int(opts.master_netmask)
1063
    except ValueError:
1064
      ToStderr("The --master-netmask option expects an int parameter.")
1065
      return 1
1066

    
1067
  ext_ip_script = opts.use_external_mip_script
1068

    
1069
  if opts.disk_state:
1070
    disk_state = utils.FlatToDict(opts.disk_state)
1071
  else:
1072
    disk_state = {}
1073

    
1074
  hv_state = dict(opts.hv_state)
1075

    
1076
  op = opcodes.OpClusterSetParams(
1077
    vg_name=vg_name,
1078
    drbd_helper=drbd_helper,
1079
    enabled_hypervisors=hvlist,
1080
    hvparams=hvparams,
1081
    os_hvp=None,
1082
    beparams=beparams,
1083
    nicparams=nicparams,
1084
    ndparams=ndparams,
1085
    diskparams=diskparams,
1086
    ipolicy=ipolicy,
1087
    candidate_pool_size=opts.candidate_pool_size,
1088
    maintain_node_health=mnh,
1089
    uid_pool=uid_pool,
1090
    add_uids=add_uids,
1091
    remove_uids=remove_uids,
1092
    default_iallocator=opts.default_iallocator,
1093
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1094
    master_netdev=opts.master_netdev,
1095
    master_netmask=opts.master_netmask,
1096
    reserved_lvs=opts.reserved_lvs,
1097
    use_external_mip_script=ext_ip_script,
1098
    hv_state=hv_state,
1099
    disk_state=disk_state,
1100
    enabled_disk_templates=enabled_disk_templates,
1101
    )
1102
  SubmitOrSend(op, opts)
1103
  return 0
1104

    
1105

    
1106
def QueueOps(opts, args):
1107
  """Queue operations.
1108

1109
  @param opts: the command line options selected by the user
1110
  @type args: list
1111
  @param args: should contain only one element, the subcommand
1112
  @rtype: int
1113
  @return: the desired exit code
1114

1115
  """
1116
  command = args[0]
1117
  client = GetClient()
1118
  if command in ("drain", "undrain"):
1119
    drain_flag = command == "drain"
1120
    client.SetQueueDrainFlag(drain_flag)
1121
  elif command == "info":
1122
    result = client.QueryConfigValues(["drain_flag"])
1123
    if result[0]:
1124
      val = "set"
1125
    else:
1126
      val = "unset"
1127
    ToStdout("The drain flag is %s" % val)
1128
  else:
1129
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1130
                               errors.ECODE_INVAL)
1131

    
1132
  return 0
1133

    
1134

    
1135
def _ShowWatcherPause(until):
1136
  if until is None or until < time.time():
1137
    ToStdout("The watcher is not paused.")
1138
  else:
1139
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1140

    
1141

    
1142
def WatcherOps(opts, args):
1143
  """Watcher operations.
1144

1145
  @param opts: the command line options selected by the user
1146
  @type args: list
1147
  @param args: should contain only one element, the subcommand
1148
  @rtype: int
1149
  @return: the desired exit code
1150

1151
  """
1152
  command = args[0]
1153
  client = GetClient()
1154

    
1155
  if command == "continue":
1156
    client.SetWatcherPause(None)
1157
    ToStdout("The watcher is no longer paused.")
1158

    
1159
  elif command == "pause":
1160
    if len(args) < 2:
1161
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1162

    
1163
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1164
    _ShowWatcherPause(result)
1165

    
1166
  elif command == "info":
1167
    result = client.QueryConfigValues(["watcher_pause"])
1168
    _ShowWatcherPause(result[0])
1169

    
1170
  else:
1171
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1172
                               errors.ECODE_INVAL)
1173

    
1174
  return 0
1175

    
1176

    
1177
def _OobPower(opts, node_list, power):
1178
  """Puts the node in the list to desired power state.
1179

1180
  @param opts: The command line options selected by the user
1181
  @param node_list: The list of nodes to operate on
1182
  @param power: True if they should be powered on, False otherwise
1183
  @return: The success of the operation (none failed)
1184

1185
  """
1186
  if power:
1187
    command = constants.OOB_POWER_ON
1188
  else:
1189
    command = constants.OOB_POWER_OFF
1190

    
1191
  op = opcodes.OpOobCommand(node_names=node_list,
1192
                            command=command,
1193
                            ignore_status=True,
1194
                            timeout=opts.oob_timeout,
1195
                            power_delay=opts.power_delay)
1196
  result = SubmitOpCode(op, opts=opts)
1197
  errs = 0
1198
  for node_result in result:
1199
    (node_tuple, data_tuple) = node_result
1200
    (_, node_name) = node_tuple
1201
    (data_status, _) = data_tuple
1202
    if data_status != constants.RS_NORMAL:
1203
      assert data_status != constants.RS_UNAVAIL
1204
      errs += 1
1205
      ToStderr("There was a problem changing power for %s, please investigate",
1206
               node_name)
1207

    
1208
  if errs > 0:
1209
    return False
1210

    
1211
  return True
1212

    
1213

    
1214
def _InstanceStart(opts, inst_list, start, no_remember=False):
1215
  """Puts the instances in the list to desired state.
1216

1217
  @param opts: The command line options selected by the user
1218
  @param inst_list: The list of instances to operate on
1219
  @param start: True if they should be started, False for shutdown
1220
  @param no_remember: If the instance state should be remembered
1221
  @return: The success of the operation (none failed)
1222

1223
  """
1224
  if start:
1225
    opcls = opcodes.OpInstanceStartup
1226
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1227
  else:
1228
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1229
                           timeout=opts.shutdown_timeout,
1230
                           no_remember=no_remember)
1231
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1232

    
1233
  jex = JobExecutor(opts=opts)
1234

    
1235
  for inst in inst_list:
1236
    ToStdout("Submit %s of instance %s", text_submit, inst)
1237
    op = opcls(instance_name=inst)
1238
    jex.QueueJob(inst, op)
1239

    
1240
  results = jex.GetResults()
1241
  bad_cnt = len([1 for (success, _) in results if not success])
1242

    
1243
  if bad_cnt == 0:
1244
    ToStdout("All instances have been %s successfully", text_success)
1245
  else:
1246
    ToStderr("There were errors while %s instances:\n"
1247
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1248
             len(results))
1249
    return False
1250

    
1251
  return True
1252

    
1253

    
1254
class _RunWhenNodesReachableHelper:
1255
  """Helper class to make shared internal state sharing easier.
1256

1257
  @ivar success: Indicates if all action_cb calls were successful
1258

1259
  """
1260
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1261
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1262
    """Init the object.
1263

1264
    @param node_list: The list of nodes to be reachable
1265
    @param action_cb: Callback called when a new host is reachable
1266
    @type node2ip: dict
1267
    @param node2ip: Node to ip mapping
1268
    @param port: The port to use for the TCP ping
1269
    @param feedback_fn: The function used for feedback
1270
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1271
    @param _sleep_fn: Function to sleep (for unittest use only)
1272

1273
    """
1274
    self.down = set(node_list)
1275
    self.up = set()
1276
    self.node2ip = node2ip
1277
    self.success = True
1278
    self.action_cb = action_cb
1279
    self.port = port
1280
    self.feedback_fn = feedback_fn
1281
    self._ping_fn = _ping_fn
1282
    self._sleep_fn = _sleep_fn
1283

    
1284
  def __call__(self):
1285
    """When called we run action_cb.
1286

1287
    @raises utils.RetryAgain: When there are still down nodes
1288

1289
    """
1290
    if not self.action_cb(self.up):
1291
      self.success = False
1292

    
1293
    if self.down:
1294
      raise utils.RetryAgain()
1295
    else:
1296
      return self.success
1297

    
1298
  def Wait(self, secs):
1299
    """Checks if a host is up or waits remaining seconds.
1300

1301
    @param secs: The secs remaining
1302

1303
    """
1304
    start = time.time()
1305
    for node in self.down:
1306
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1307
                       live_port_needed=True):
1308
        self.feedback_fn("Node %s became available" % node)
1309
        self.up.add(node)
1310
        self.down -= self.up
1311
        # If we have a node available there is the possibility to run the
1312
        # action callback successfully, therefore we don't wait and return
1313
        return
1314

    
1315
    self._sleep_fn(max(0.0, start + secs - time.time()))
1316

    
1317

    
1318
def _RunWhenNodesReachable(node_list, action_cb, interval):
1319
  """Run action_cb when nodes become reachable.
1320

1321
  @param node_list: The list of nodes to be reachable
1322
  @param action_cb: Callback called when a new host is reachable
1323
  @param interval: The earliest time to retry
1324

1325
  """
1326
  client = GetClient()
1327
  cluster_info = client.QueryClusterInfo()
1328
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1329
    family = netutils.IPAddress.family
1330
  else:
1331
    family = netutils.IP6Address.family
1332

    
1333
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1334
                 for node in node_list)
1335

    
1336
  port = netutils.GetDaemonPort(constants.NODED)
1337
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1338
                                        ToStdout)
1339

    
1340
  try:
1341
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1342
                       wait_fn=helper.Wait)
1343
  except utils.RetryTimeout:
1344
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1345
             " again:\n  - %s", "  - ".join(helper.down))
1346
    return False
1347

    
1348

    
1349
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1350
                          _instance_start_fn=_InstanceStart):
1351
  """Start the instances conditional based on node_states.
1352

1353
  @param opts: The command line options selected by the user
1354
  @param inst_map: A dict of inst -> nodes mapping
1355
  @param nodes_online: A list of nodes online
1356
  @param _instance_start_fn: Callback to start instances (unittest use only)
1357
  @return: Success of the operation on all instances
1358

1359
  """
1360
  start_inst_list = []
1361
  for (inst, nodes) in inst_map.items():
1362
    if not (nodes - nodes_online):
1363
      # All nodes the instance lives on are back online
1364
      start_inst_list.append(inst)
1365

    
1366
  for inst in start_inst_list:
1367
    del inst_map[inst]
1368

    
1369
  if start_inst_list:
1370
    return _instance_start_fn(opts, start_inst_list, True)
1371

    
1372
  return True
1373

    
1374

    
1375
def _EpoOn(opts, full_node_list, node_list, inst_map):
1376
  """Does the actual power on.
1377

1378
  @param opts: The command line options selected by the user
1379
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1380
                         OOB)
1381
  @param node_list: The list of nodes to operate on (all need to support OOB)
1382
  @param inst_map: A dict of inst -> nodes mapping
1383
  @return: The desired exit status
1384

1385
  """
1386
  if node_list and not _OobPower(opts, node_list, False):
1387
    ToStderr("Not all nodes seem to get back up, investigate and start"
1388
             " manually if needed")
1389

    
1390
  # Wait for the nodes to be back up
1391
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1392

    
1393
  ToStdout("Waiting until all nodes are available again")
1394
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1395
    ToStderr("Please investigate and start stopped instances manually")
1396
    return constants.EXIT_FAILURE
1397

    
1398
  return constants.EXIT_SUCCESS
1399

    
1400

    
1401
def _EpoOff(opts, node_list, inst_map):
1402
  """Does the actual power off.
1403

1404
  @param opts: The command line options selected by the user
1405
  @param node_list: The list of nodes to operate on (all need to support OOB)
1406
  @param inst_map: A dict of inst -> nodes mapping
1407
  @return: The desired exit status
1408

1409
  """
1410
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1411
    ToStderr("Please investigate and stop instances manually before continuing")
1412
    return constants.EXIT_FAILURE
1413

    
1414
  if not node_list:
1415
    return constants.EXIT_SUCCESS
1416

    
1417
  if _OobPower(opts, node_list, False):
1418
    return constants.EXIT_SUCCESS
1419
  else:
1420
    return constants.EXIT_FAILURE
1421

    
1422

    
1423
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1424
        _confirm_fn=ConfirmOperation,
1425
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1426
  """EPO operations.
1427

1428
  @param opts: the command line options selected by the user
1429
  @type args: list
1430
  @param args: should contain only one element, the subcommand
1431
  @rtype: int
1432
  @return: the desired exit code
1433

1434
  """
1435
  if opts.groups and opts.show_all:
1436
    _stderr_fn("Only one of --groups or --all are allowed")
1437
    return constants.EXIT_FAILURE
1438
  elif args and opts.show_all:
1439
    _stderr_fn("Arguments in combination with --all are not allowed")
1440
    return constants.EXIT_FAILURE
1441

    
1442
  if cl is None:
1443
    cl = GetClient()
1444

    
1445
  if opts.groups:
1446
    node_query_list = \
1447
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1448
  else:
1449
    node_query_list = args
1450

    
1451
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1452
                                           "sinst_list", "powered", "offline"],
1453
                         False)
1454

    
1455
  all_nodes = map(compat.fst, result)
1456
  node_list = []
1457
  inst_map = {}
1458
  for (node, master, pinsts, sinsts, powered, offline) in result:
1459
    if not offline:
1460
      for inst in (pinsts + sinsts):
1461
        if inst in inst_map:
1462
          if not master:
1463
            inst_map[inst].add(node)
1464
        elif master:
1465
          inst_map[inst] = set()
1466
        else:
1467
          inst_map[inst] = set([node])
1468

    
1469
    if master and opts.on:
1470
      # We ignore the master for turning on the machines, in fact we are
1471
      # already operating on the master at this point :)
1472
      continue
1473
    elif master and not opts.show_all:
1474
      _stderr_fn("%s is the master node, please do a master-failover to another"
1475
                 " node not affected by the EPO or use --all if you intend to"
1476
                 " shutdown the whole cluster", node)
1477
      return constants.EXIT_FAILURE
1478
    elif powered is None:
1479
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1480
                 " handled in a fully automated manner", node)
1481
    elif powered == opts.on:
1482
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1483
    elif not offline or (offline and powered):
1484
      node_list.append(node)
1485

    
1486
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1487
    return constants.EXIT_FAILURE
1488

    
1489
  if opts.on:
1490
    return _on_fn(opts, all_nodes, node_list, inst_map)
1491
  else:
1492
    return _off_fn(opts, node_list, inst_map)
1493

    
1494

    
1495
def _GetCreateCommand(info):
1496
  buf = StringIO()
1497
  buf.write("gnt-cluster init")
1498
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1499
  buf.write(" ")
1500
  buf.write(info["name"])
1501
  return buf.getvalue()
1502

    
1503

    
1504
def ShowCreateCommand(opts, args):
1505
  """Shows the command that can be used to re-create the cluster.
1506

1507
  Currently it works only for ipolicy specs.
1508

1509
  """
1510
  cl = GetClient(query=True)
1511
  result = cl.QueryClusterInfo()
1512
  ToStdout(_GetCreateCommand(result))
1513

    
1514

    
1515
commands = {
1516
  "init": (
1517
    InitCluster, [ArgHost(min=1, max=1)],
1518
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1519
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1520
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1521
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1522
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1523
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1524
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1525
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
1526
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS,
1527
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1528
  "destroy": (
1529
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1530
    "", "Destroy cluster"),
1531
  "rename": (
1532
    RenameCluster, [ArgHost(min=1, max=1)],
1533
    [FORCE_OPT, DRY_RUN_OPT],
1534
    "<new_name>",
1535
    "Renames the cluster"),
1536
  "redist-conf": (
1537
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1538
    "", "Forces a push of the configuration file and ssconf files"
1539
    " to the nodes in the cluster"),
1540
  "verify": (
1541
    VerifyCluster, ARGS_NONE,
1542
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1543
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1544
    "", "Does a check on the cluster configuration"),
1545
  "verify-disks": (
1546
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1547
    "", "Does a check on the cluster disk status"),
1548
  "repair-disk-sizes": (
1549
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1550
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1551
  "master-failover": (
1552
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1553
    "", "Makes the current node the master"),
1554
  "master-ping": (
1555
    MasterPing, ARGS_NONE, [],
1556
    "", "Checks if the master is alive"),
1557
  "version": (
1558
    ShowClusterVersion, ARGS_NONE, [],
1559
    "", "Shows the cluster version"),
1560
  "getmaster": (
1561
    ShowClusterMaster, ARGS_NONE, [],
1562
    "", "Shows the cluster master"),
1563
  "copyfile": (
1564
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1565
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1566
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1567
  "command": (
1568
    RunClusterCommand, [ArgCommand(min=1)],
1569
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1570
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1571
  "info": (
1572
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1573
    "[--roman]", "Show cluster configuration"),
1574
  "list-tags": (
1575
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1576
  "add-tags": (
1577
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1578
    "tag...", "Add tags to the cluster"),
1579
  "remove-tags": (
1580
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1581
    "tag...", "Remove tags from the cluster"),
1582
  "search-tags": (
1583
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1584
    "Searches the tags on all objects on"
1585
    " the cluster for a given pattern (regex)"),
1586
  "queue": (
1587
    QueueOps,
1588
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1589
    [], "drain|undrain|info", "Change queue properties"),
1590
  "watcher": (
1591
    WatcherOps,
1592
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1593
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1594
    [],
1595
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1596
  "modify": (
1597
    SetClusterParams, ARGS_NONE,
1598
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1599
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1600
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1601
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1602
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1603
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1604
     DISK_STATE_OPT, SUBMIT_OPT, ENABLED_DISK_TEMPLATES_OPT,
1605
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS,
1606
    "[opts...]",
1607
    "Alters the parameters of the cluster"),
1608
  "renew-crypto": (
1609
    RenewCrypto, ARGS_NONE,
1610
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1611
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1612
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1613
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1614
    "[opts...]",
1615
    "Renews cluster certificates, keys and secrets"),
1616
  "epo": (
1617
    Epo, [ArgUnknown()],
1618
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1619
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1620
    "[opts...] [args]",
1621
    "Performs an emergency power-off on given args"),
1622
  "activate-master-ip": (
1623
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1624
  "deactivate-master-ip": (
1625
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1626
    "Deactivates the master IP"),
1627
  "show-ispecs-cmd": (
1628
    ShowCreateCommand, ARGS_NONE, [], "",
1629
    "Show the command line to re-create the cluster"),
1630
  }
1631

    
1632

    
1633
#: dictionary with aliases for commands
1634
aliases = {
1635
  "masterfailover": "master-failover",
1636
  "show": "info",
1637
}
1638

    
1639

    
1640
def Main():
1641
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1642
                     aliases=aliases)