Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ b54ecf12

History | View | Annotate | Download (52.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os.path
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import pathutils
47

    
48

    
49
ON_OPT = cli_option("--on", default=False,
50
                    action="store_true", dest="on",
51
                    help="Recover from an EPO")
52

    
53
GROUPS_OPT = cli_option("--groups", default=False,
54
                        action="store_true", dest="groups",
55
                        help="Arguments are node groups instead of nodes")
56

    
57
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
58
                            help="Override interactive check for --no-voting",
59
                            default=False, action="store_true")
60

    
61
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
62
_EPO_PING_TIMEOUT = 1 # 1 second
63
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
64

    
65

    
66
def _CheckNoLvmStorageOptDeprecated(opts):
67
  """Checks if the legacy option '--no-lvm-storage' is used.
68

69
  """
70
  if not opts.lvm_storage:
71
    ToStderr("The option --no-lvm-storage is no longer supported. If you want"
72
             " to disable lvm-based storage cluster-wide, use the option"
73
             " --enabled-disk-templates to disable all of these lvm-base disk "
74
             "  templates: %s" %
75
             utils.CommaJoin(utils.GetLvmDiskTemplates()))
76
    return 1
77

    
78

    
79
@UsesRPC
80
def InitCluster(opts, args):
81
  """Initialize the cluster.
82

83
  @param opts: the command line options selected by the user
84
  @type args: list
85
  @param args: should contain only one element, the desired
86
      cluster name
87
  @rtype: int
88
  @return: the desired exit code
89

90
  """
91
  if _CheckNoLvmStorageOptDeprecated(opts):
92
    return 1
93
  enabled_disk_templates = opts.enabled_disk_templates
94
  if enabled_disk_templates:
95
    enabled_disk_templates = enabled_disk_templates.split(",")
96
  else:
97
    enabled_disk_templates = constants.DEFAULT_ENABLED_DISK_TEMPLATES
98

    
99
  vg_name = None
100
  if opts.vg_name is not None:
101
    vg_name = opts.vg_name
102
    if vg_name:
103
      if not utils.IsLvmEnabled(enabled_disk_templates):
104
        ToStdout("You specified a volume group with --vg-name, but you did not"
105
                 " enable any disk template that uses lvm.")
106
    else:
107
      if utils.IsLvmEnabled(enabled_disk_templates):
108
        ToStderr("LVM disk templates are enabled, but vg name not set.")
109
        return 1
110
  else:
111
    if utils.IsLvmEnabled(enabled_disk_templates):
112
      vg_name = constants.DEFAULT_VG
113

    
114
  if not opts.drbd_storage and opts.drbd_helper:
115
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
116
    return 1
117

    
118
  drbd_helper = opts.drbd_helper
119
  if opts.drbd_storage and not opts.drbd_helper:
120
    drbd_helper = constants.DEFAULT_DRBD_HELPER
121

    
122
  master_netdev = opts.master_netdev
123
  if master_netdev is None:
124
    master_netdev = constants.DEFAULT_BRIDGE
125

    
126
  hvlist = opts.enabled_hypervisors
127
  if hvlist is None:
128
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
129
  hvlist = hvlist.split(",")
130

    
131
  hvparams = dict(opts.hvparams)
132
  beparams = opts.beparams
133
  nicparams = opts.nicparams
134

    
135
  diskparams = dict(opts.diskparams)
136

    
137
  # check the disk template types here, as we cannot rely on the type check done
138
  # by the opcode parameter types
139
  diskparams_keys = set(diskparams.keys())
140
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
141
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
142
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
143
    return 1
144

    
145
  # prepare beparams dict
146
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
147
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
148

    
149
  # prepare nicparams dict
150
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
151
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
152

    
153
  # prepare ndparams dict
154
  if opts.ndparams is None:
155
    ndparams = dict(constants.NDC_DEFAULTS)
156
  else:
157
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
158
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
159

    
160
  # prepare hvparams dict
161
  for hv in constants.HYPER_TYPES:
162
    if hv not in hvparams:
163
      hvparams[hv] = {}
164
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
165
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
166

    
167
  # prepare diskparams dict
168
  for templ in constants.DISK_TEMPLATES:
169
    if templ not in diskparams:
170
      diskparams[templ] = {}
171
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
172
                                         diskparams[templ])
173
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
174

    
175
  # prepare ipolicy dict
176
  ipolicy = CreateIPolicyFromOpts(
177
    ispecs_mem_size=opts.ispecs_mem_size,
178
    ispecs_cpu_count=opts.ispecs_cpu_count,
179
    ispecs_disk_count=opts.ispecs_disk_count,
180
    ispecs_disk_size=opts.ispecs_disk_size,
181
    ispecs_nic_count=opts.ispecs_nic_count,
182
    minmax_ispecs=opts.ipolicy_bounds_specs,
183
    std_ispecs=opts.ipolicy_std_specs,
184
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
185
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
186
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
187
    fill_all=True)
188

    
189
  if opts.candidate_pool_size is None:
190
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
191

    
192
  if opts.mac_prefix is None:
193
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
194

    
195
  uid_pool = opts.uid_pool
196
  if uid_pool is not None:
197
    uid_pool = uidpool.ParseUidPool(uid_pool)
198

    
199
  if opts.prealloc_wipe_disks is None:
200
    opts.prealloc_wipe_disks = False
201

    
202
  external_ip_setup_script = opts.use_external_mip_script
203
  if external_ip_setup_script is None:
204
    external_ip_setup_script = False
205

    
206
  try:
207
    primary_ip_version = int(opts.primary_ip_version)
208
  except (ValueError, TypeError), err:
209
    ToStderr("Invalid primary ip version value: %s" % str(err))
210
    return 1
211

    
212
  master_netmask = opts.master_netmask
213
  try:
214
    if master_netmask is not None:
215
      master_netmask = int(master_netmask)
216
  except (ValueError, TypeError), err:
217
    ToStderr("Invalid master netmask value: %s" % str(err))
218
    return 1
219

    
220
  if opts.disk_state:
221
    disk_state = utils.FlatToDict(opts.disk_state)
222
  else:
223
    disk_state = {}
224

    
225
  hv_state = dict(opts.hv_state)
226

    
227
  bootstrap.InitCluster(cluster_name=args[0],
228
                        secondary_ip=opts.secondary_ip,
229
                        vg_name=vg_name,
230
                        mac_prefix=opts.mac_prefix,
231
                        master_netmask=master_netmask,
232
                        master_netdev=master_netdev,
233
                        file_storage_dir=opts.file_storage_dir,
234
                        shared_file_storage_dir=opts.shared_file_storage_dir,
235
                        enabled_hypervisors=hvlist,
236
                        hvparams=hvparams,
237
                        beparams=beparams,
238
                        nicparams=nicparams,
239
                        ndparams=ndparams,
240
                        diskparams=diskparams,
241
                        ipolicy=ipolicy,
242
                        candidate_pool_size=opts.candidate_pool_size,
243
                        modify_etc_hosts=opts.modify_etc_hosts,
244
                        modify_ssh_setup=opts.modify_ssh_setup,
245
                        maintain_node_health=opts.maintain_node_health,
246
                        drbd_helper=drbd_helper,
247
                        uid_pool=uid_pool,
248
                        default_iallocator=opts.default_iallocator,
249
                        primary_ip_version=primary_ip_version,
250
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
251
                        use_external_mip_script=external_ip_setup_script,
252
                        hv_state=hv_state,
253
                        disk_state=disk_state,
254
                        enabled_disk_templates=enabled_disk_templates,
255
                        )
256
  op = opcodes.OpClusterPostInit()
257
  SubmitOpCode(op, opts=opts)
258
  return 0
259

    
260

    
261
@UsesRPC
262
def DestroyCluster(opts, args):
263
  """Destroy the cluster.
264

265
  @param opts: the command line options selected by the user
266
  @type args: list
267
  @param args: should be an empty list
268
  @rtype: int
269
  @return: the desired exit code
270

271
  """
272
  if not opts.yes_do_it:
273
    ToStderr("Destroying a cluster is irreversible. If you really want"
274
             " destroy this cluster, supply the --yes-do-it option.")
275
    return 1
276

    
277
  op = opcodes.OpClusterDestroy()
278
  master = SubmitOpCode(op, opts=opts)
279
  # if we reached this, the opcode didn't fail; we can proceed to
280
  # shutdown all the daemons
281
  bootstrap.FinalizeClusterDestroy(master)
282
  return 0
283

    
284

    
285
def RenameCluster(opts, args):
286
  """Rename the cluster.
287

288
  @param opts: the command line options selected by the user
289
  @type args: list
290
  @param args: should contain only one element, the new cluster name
291
  @rtype: int
292
  @return: the desired exit code
293

294
  """
295
  cl = GetClient()
296

    
297
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
298

    
299
  new_name = args[0]
300
  if not opts.force:
301
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
302
                " connected over the network to the cluster name, the"
303
                " operation is very dangerous as the IP address will be"
304
                " removed from the node and the change may not go through."
305
                " Continue?") % (cluster_name, new_name)
306
    if not AskUser(usertext):
307
      return 1
308

    
309
  op = opcodes.OpClusterRename(name=new_name)
310
  result = SubmitOpCode(op, opts=opts, cl=cl)
311

    
312
  if result:
313
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
314

    
315
  return 0
316

    
317

    
318
def ActivateMasterIp(opts, args):
319
  """Activates the master IP.
320

321
  """
322
  op = opcodes.OpClusterActivateMasterIp()
323
  SubmitOpCode(op)
324
  return 0
325

    
326

    
327
def DeactivateMasterIp(opts, args):
328
  """Deactivates the master IP.
329

330
  """
331
  if not opts.confirm:
332
    usertext = ("This will disable the master IP. All the open connections to"
333
                " the master IP will be closed. To reach the master you will"
334
                " need to use its node IP."
335
                " Continue?")
336
    if not AskUser(usertext):
337
      return 1
338

    
339
  op = opcodes.OpClusterDeactivateMasterIp()
340
  SubmitOpCode(op)
341
  return 0
342

    
343

    
344
def RedistributeConfig(opts, args):
345
  """Forces push of the cluster configuration.
346

347
  @param opts: the command line options selected by the user
348
  @type args: list
349
  @param args: empty list
350
  @rtype: int
351
  @return: the desired exit code
352

353
  """
354
  op = opcodes.OpClusterRedistConf()
355
  SubmitOrSend(op, opts)
356
  return 0
357

    
358

    
359
def ShowClusterVersion(opts, args):
360
  """Write version of ganeti software to the standard output.
361

362
  @param opts: the command line options selected by the user
363
  @type args: list
364
  @param args: should be an empty list
365
  @rtype: int
366
  @return: the desired exit code
367

368
  """
369
  cl = GetClient(query=True)
370
  result = cl.QueryClusterInfo()
371
  ToStdout("Software version: %s", result["software_version"])
372
  ToStdout("Internode protocol: %s", result["protocol_version"])
373
  ToStdout("Configuration format: %s", result["config_version"])
374
  ToStdout("OS api version: %s", result["os_api_version"])
375
  ToStdout("Export interface: %s", result["export_version"])
376
  return 0
377

    
378

    
379
def ShowClusterMaster(opts, args):
380
  """Write name of master node to the standard output.
381

382
  @param opts: the command line options selected by the user
383
  @type args: list
384
  @param args: should be an empty list
385
  @rtype: int
386
  @return: the desired exit code
387

388
  """
389
  master = bootstrap.GetMaster()
390
  ToStdout(master)
391
  return 0
392

    
393

    
394
def _FormatGroupedParams(paramsdict, roman=False):
395
  """Format Grouped parameters (be, nic, disk) by group.
396

397
  @type paramsdict: dict of dicts
398
  @param paramsdict: {group: {param: value, ...}, ...}
399
  @rtype: dict of dicts
400
  @return: copy of the input dictionaries with strings as values
401

402
  """
403
  ret = {}
404
  for (item, val) in paramsdict.items():
405
    if isinstance(val, dict):
406
      ret[item] = _FormatGroupedParams(val, roman=roman)
407
    elif roman and isinstance(val, int):
408
      ret[item] = compat.TryToRoman(val)
409
    else:
410
      ret[item] = str(val)
411
  return ret
412

    
413

    
414
def ShowClusterConfig(opts, args):
415
  """Shows cluster information.
416

417
  @param opts: the command line options selected by the user
418
  @type args: list
419
  @param args: should be an empty list
420
  @rtype: int
421
  @return: the desired exit code
422

423
  """
424
  cl = GetClient(query=True)
425
  result = cl.QueryClusterInfo()
426

    
427
  if result["tags"]:
428
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
429
  else:
430
    tags = "(none)"
431
  if result["reserved_lvs"]:
432
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
433
  else:
434
    reserved_lvs = "(none)"
435

    
436
  info = [
437
    ("Cluster name", result["name"]),
438
    ("Cluster UUID", result["uuid"]),
439

    
440
    ("Creation time", utils.FormatTime(result["ctime"])),
441
    ("Modification time", utils.FormatTime(result["mtime"])),
442

    
443
    ("Master node", result["master"]),
444

    
445
    ("Architecture (this node)",
446
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
447

    
448
    ("Tags", tags),
449

    
450
    ("Default hypervisor", result["default_hypervisor"]),
451
    ("Enabled hypervisors",
452
     utils.CommaJoin(result["enabled_hypervisors"])),
453

    
454
    ("Hypervisor parameters", _FormatGroupedParams(result["hvparams"])),
455

    
456
    ("OS-specific hypervisor parameters",
457
     _FormatGroupedParams(result["os_hvp"])),
458

    
459
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
460

    
461
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
462
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
463

    
464
    ("Cluster parameters", [
465
      ("candidate pool size",
466
       compat.TryToRoman(result["candidate_pool_size"],
467
                         convert=opts.roman_integers)),
468
      ("master netdev", result["master_netdev"]),
469
      ("master netmask", result["master_netmask"]),
470
      ("use external master IP address setup script",
471
       result["use_external_mip_script"]),
472
      ("lvm volume group", result["volume_group_name"]),
473
      ("lvm reserved volumes", reserved_lvs),
474
      ("drbd usermode helper", result["drbd_usermode_helper"]),
475
      ("file storage path", result["file_storage_dir"]),
476
      ("shared file storage path", result["shared_file_storage_dir"]),
477
      ("maintenance of node health", result["maintain_node_health"]),
478
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
479
      ("default instance allocator", result["default_iallocator"]),
480
      ("primary ip version", result["primary_ip_version"]),
481
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
482
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
483
      ("ExtStorage Providers search path",
484
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
485
      ("enabled disk templates",
486
       utils.CommaJoin(result["enabled_disk_templates"])),
487
      ]),
488

    
489
    ("Default node parameters",
490
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
491

    
492
    ("Default instance parameters",
493
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
494

    
495
    ("Default nic parameters",
496
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
497

    
498
    ("Default disk parameters",
499
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
500

    
501
    ("Instance policy - limits for instances",
502
     FormatPolicyInfo(result["ipolicy"], None, True)),
503
    ]
504

    
505
  PrintGenericInfo(info)
506
  return 0
507

    
508

    
509
def ClusterCopyFile(opts, args):
510
  """Copy a file from master to some nodes.
511

512
  @param opts: the command line options selected by the user
513
  @type args: list
514
  @param args: should contain only one element, the path of
515
      the file to be copied
516
  @rtype: int
517
  @return: the desired exit code
518

519
  """
520
  filename = args[0]
521
  if not os.path.exists(filename):
522
    raise errors.OpPrereqError("No such filename '%s'" % filename,
523
                               errors.ECODE_INVAL)
524

    
525
  cl = GetClient()
526

    
527
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
528

    
529
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
530
                           secondary_ips=opts.use_replication_network,
531
                           nodegroup=opts.nodegroup)
532

    
533
  srun = ssh.SshRunner(cluster_name)
534
  for node in results:
535
    if not srun.CopyFileToNode(node, filename):
536
      ToStderr("Copy of file %s to node %s failed", filename, node)
537

    
538
  return 0
539

    
540

    
541
def RunClusterCommand(opts, args):
542
  """Run a command on some nodes.
543

544
  @param opts: the command line options selected by the user
545
  @type args: list
546
  @param args: should contain the command to be run and its arguments
547
  @rtype: int
548
  @return: the desired exit code
549

550
  """
551
  cl = GetClient()
552

    
553
  command = " ".join(args)
554

    
555
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
556

    
557
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
558
                                                    "master_node"])
559

    
560
  srun = ssh.SshRunner(cluster_name=cluster_name)
561

    
562
  # Make sure master node is at list end
563
  if master_node in nodes:
564
    nodes.remove(master_node)
565
    nodes.append(master_node)
566

    
567
  for name in nodes:
568
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
569

    
570
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
571
      # Do not output anything for successful commands
572
      continue
573

    
574
    ToStdout("------------------------------------------------")
575
    if opts.show_machine_names:
576
      for line in result.output.splitlines():
577
        ToStdout("%s: %s", name, line)
578
    else:
579
      ToStdout("node: %s", name)
580
      ToStdout("%s", result.output)
581
    ToStdout("return code = %s", result.exit_code)
582

    
583
  return 0
584

    
585

    
586
def VerifyCluster(opts, args):
587
  """Verify integrity of cluster, performing various test on nodes.
588

589
  @param opts: the command line options selected by the user
590
  @type args: list
591
  @param args: should be an empty list
592
  @rtype: int
593
  @return: the desired exit code
594

595
  """
596
  skip_checks = []
597

    
598
  if opts.skip_nplusone_mem:
599
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
600

    
601
  cl = GetClient()
602

    
603
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
604
                               error_codes=opts.error_codes,
605
                               debug_simulate_errors=opts.simulate_errors,
606
                               skip_checks=skip_checks,
607
                               ignore_errors=opts.ignore_errors,
608
                               group_name=opts.nodegroup)
609
  result = SubmitOpCode(op, cl=cl, opts=opts)
610

    
611
  # Keep track of submitted jobs
612
  jex = JobExecutor(cl=cl, opts=opts)
613

    
614
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
615
    jex.AddJobId(None, status, job_id)
616

    
617
  results = jex.GetResults()
618

    
619
  (bad_jobs, bad_results) = \
620
    map(len,
621
        # Convert iterators to lists
622
        map(list,
623
            # Count errors
624
            map(compat.partial(itertools.ifilterfalse, bool),
625
                # Convert result to booleans in a tuple
626
                zip(*((job_success, len(op_results) == 1 and op_results[0])
627
                      for (job_success, op_results) in results)))))
628

    
629
  if bad_jobs == 0 and bad_results == 0:
630
    rcode = constants.EXIT_SUCCESS
631
  else:
632
    rcode = constants.EXIT_FAILURE
633
    if bad_jobs > 0:
634
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
635

    
636
  return rcode
637

    
638

    
639
def VerifyDisks(opts, args):
640
  """Verify integrity of cluster disks.
641

642
  @param opts: the command line options selected by the user
643
  @type args: list
644
  @param args: should be an empty list
645
  @rtype: int
646
  @return: the desired exit code
647

648
  """
649
  cl = GetClient()
650

    
651
  op = opcodes.OpClusterVerifyDisks()
652

    
653
  result = SubmitOpCode(op, cl=cl, opts=opts)
654

    
655
  # Keep track of submitted jobs
656
  jex = JobExecutor(cl=cl, opts=opts)
657

    
658
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
659
    jex.AddJobId(None, status, job_id)
660

    
661
  retcode = constants.EXIT_SUCCESS
662

    
663
  for (status, result) in jex.GetResults():
664
    if not status:
665
      ToStdout("Job failed: %s", result)
666
      continue
667

    
668
    ((bad_nodes, instances, missing), ) = result
669

    
670
    for node, text in bad_nodes.items():
671
      ToStdout("Error gathering data on node %s: %s",
672
               node, utils.SafeEncode(text[-400:]))
673
      retcode = constants.EXIT_FAILURE
674
      ToStdout("You need to fix these nodes first before fixing instances")
675

    
676
    for iname in instances:
677
      if iname in missing:
678
        continue
679
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
680
      try:
681
        ToStdout("Activating disks for instance '%s'", iname)
682
        SubmitOpCode(op, opts=opts, cl=cl)
683
      except errors.GenericError, err:
684
        nret, msg = FormatError(err)
685
        retcode |= nret
686
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
687

    
688
    if missing:
689
      for iname, ival in missing.iteritems():
690
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
691
        if all_missing:
692
          ToStdout("Instance %s cannot be verified as it lives on"
693
                   " broken nodes", iname)
694
        else:
695
          ToStdout("Instance %s has missing logical volumes:", iname)
696
          ival.sort()
697
          for node, vol in ival:
698
            if node in bad_nodes:
699
              ToStdout("\tbroken node %s /dev/%s", node, vol)
700
            else:
701
              ToStdout("\t%s /dev/%s", node, vol)
702

    
703
      ToStdout("You need to replace or recreate disks for all the above"
704
               " instances if this message persists after fixing broken nodes.")
705
      retcode = constants.EXIT_FAILURE
706
    elif not instances:
707
      ToStdout("No disks need to be activated.")
708

    
709
  return retcode
710

    
711

    
712
def RepairDiskSizes(opts, args):
713
  """Verify sizes of cluster disks.
714

715
  @param opts: the command line options selected by the user
716
  @type args: list
717
  @param args: optional list of instances to restrict check to
718
  @rtype: int
719
  @return: the desired exit code
720

721
  """
722
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
723
  SubmitOpCode(op, opts=opts)
724

    
725

    
726
@UsesRPC
727
def MasterFailover(opts, args):
728
  """Failover the master node.
729

730
  This command, when run on a non-master node, will cause the current
731
  master to cease being master, and the non-master to become new
732
  master.
733

734
  @param opts: the command line options selected by the user
735
  @type args: list
736
  @param args: should be an empty list
737
  @rtype: int
738
  @return: the desired exit code
739

740
  """
741
  if opts.no_voting and not opts.yes_do_it:
742
    usertext = ("This will perform the failover even if most other nodes"
743
                " are down, or if this node is outdated. This is dangerous"
744
                " as it can lead to a non-consistent cluster. Check the"
745
                " gnt-cluster(8) man page before proceeding. Continue?")
746
    if not AskUser(usertext):
747
      return 1
748

    
749
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
750

    
751

    
752
def MasterPing(opts, args):
753
  """Checks if the master is alive.
754

755
  @param opts: the command line options selected by the user
756
  @type args: list
757
  @param args: should be an empty list
758
  @rtype: int
759
  @return: the desired exit code
760

761
  """
762
  try:
763
    cl = GetClient()
764
    cl.QueryClusterInfo()
765
    return 0
766
  except Exception: # pylint: disable=W0703
767
    return 1
768

    
769

    
770
def SearchTags(opts, args):
771
  """Searches the tags on all the cluster.
772

773
  @param opts: the command line options selected by the user
774
  @type args: list
775
  @param args: should contain only one element, the tag pattern
776
  @rtype: int
777
  @return: the desired exit code
778

779
  """
780
  op = opcodes.OpTagsSearch(pattern=args[0])
781
  result = SubmitOpCode(op, opts=opts)
782
  if not result:
783
    return 1
784
  result = list(result)
785
  result.sort()
786
  for path, tag in result:
787
    ToStdout("%s %s", path, tag)
788

    
789

    
790
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
791
  """Reads and verifies an X509 certificate.
792

793
  @type cert_filename: string
794
  @param cert_filename: the path of the file containing the certificate to
795
                        verify encoded in PEM format
796
  @type verify_private_key: bool
797
  @param verify_private_key: whether to verify the private key in addition to
798
                             the public certificate
799
  @rtype: string
800
  @return: a string containing the PEM-encoded certificate.
801

802
  """
803
  try:
804
    pem = utils.ReadFile(cert_filename)
805
  except IOError, err:
806
    raise errors.X509CertError(cert_filename,
807
                               "Unable to read certificate: %s" % str(err))
808

    
809
  try:
810
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
811
  except Exception, err:
812
    raise errors.X509CertError(cert_filename,
813
                               "Unable to load certificate: %s" % str(err))
814

    
815
  if verify_private_key:
816
    try:
817
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
818
    except Exception, err:
819
      raise errors.X509CertError(cert_filename,
820
                                 "Unable to load private key: %s" % str(err))
821

    
822
  return pem
823

    
824

    
825
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
826
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
827
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
828
                 cds_filename, force):
829
  """Renews cluster certificates, keys and secrets.
830

831
  @type new_cluster_cert: bool
832
  @param new_cluster_cert: Whether to generate a new cluster certificate
833
  @type new_rapi_cert: bool
834
  @param new_rapi_cert: Whether to generate a new RAPI certificate
835
  @type rapi_cert_filename: string
836
  @param rapi_cert_filename: Path to file containing new RAPI certificate
837
  @type new_spice_cert: bool
838
  @param new_spice_cert: Whether to generate a new SPICE certificate
839
  @type spice_cert_filename: string
840
  @param spice_cert_filename: Path to file containing new SPICE certificate
841
  @type spice_cacert_filename: string
842
  @param spice_cacert_filename: Path to file containing the certificate of the
843
                                CA that signed the SPICE certificate
844
  @type new_confd_hmac_key: bool
845
  @param new_confd_hmac_key: Whether to generate a new HMAC key
846
  @type new_cds: bool
847
  @param new_cds: Whether to generate a new cluster domain secret
848
  @type cds_filename: string
849
  @param cds_filename: Path to file containing new cluster domain secret
850
  @type force: bool
851
  @param force: Whether to ask user for confirmation
852

853
  """
854
  if new_rapi_cert and rapi_cert_filename:
855
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
856
             " options can be specified at the same time.")
857
    return 1
858

    
859
  if new_cds and cds_filename:
860
    ToStderr("Only one of the --new-cluster-domain-secret and"
861
             " --cluster-domain-secret options can be specified at"
862
             " the same time.")
863
    return 1
864

    
865
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
866
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
867
             " and --spice-ca-certificate must not be used.")
868
    return 1
869

    
870
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
871
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
872
             " specified.")
873
    return 1
874

    
875
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
876
  try:
877
    if rapi_cert_filename:
878
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
879
    if spice_cert_filename:
880
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
881
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
882
  except errors.X509CertError, err:
883
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
884
    return 1
885

    
886
  if cds_filename:
887
    try:
888
      cds = utils.ReadFile(cds_filename)
889
    except Exception, err: # pylint: disable=W0703
890
      ToStderr("Can't load new cluster domain secret from %s: %s" %
891
               (cds_filename, str(err)))
892
      return 1
893
  else:
894
    cds = None
895

    
896
  if not force:
897
    usertext = ("This requires all daemons on all nodes to be restarted and"
898
                " may take some time. Continue?")
899
    if not AskUser(usertext):
900
      return 1
901

    
902
  def _RenewCryptoInner(ctx):
903
    ctx.feedback_fn("Updating certificates and keys")
904
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
905
                                    new_rapi_cert,
906
                                    new_spice_cert,
907
                                    new_confd_hmac_key,
908
                                    new_cds,
909
                                    rapi_cert_pem=rapi_cert_pem,
910
                                    spice_cert_pem=spice_cert_pem,
911
                                    spice_cacert_pem=spice_cacert_pem,
912
                                    cds=cds)
913

    
914
    files_to_copy = []
915

    
916
    if new_cluster_cert:
917
      files_to_copy.append(pathutils.NODED_CERT_FILE)
918

    
919
    if new_rapi_cert or rapi_cert_pem:
920
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
921

    
922
    if new_spice_cert or spice_cert_pem:
923
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
924
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
925

    
926
    if new_confd_hmac_key:
927
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
928

    
929
    if new_cds or cds:
930
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
931

    
932
    if files_to_copy:
933
      for node_name in ctx.nonmaster_nodes:
934
        ctx.feedback_fn("Copying %s to %s" %
935
                        (", ".join(files_to_copy), node_name))
936
        for file_name in files_to_copy:
937
          ctx.ssh.CopyFileToNode(node_name, file_name)
938

    
939
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
940

    
941
  ToStdout("All requested certificates and keys have been replaced."
942
           " Running \"gnt-cluster verify\" now is recommended.")
943

    
944
  return 0
945

    
946

    
947
def RenewCrypto(opts, args):
948
  """Renews cluster certificates, keys and secrets.
949

950
  """
951
  return _RenewCrypto(opts.new_cluster_cert,
952
                      opts.new_rapi_cert,
953
                      opts.rapi_cert,
954
                      opts.new_spice_cert,
955
                      opts.spice_cert,
956
                      opts.spice_cacert,
957
                      opts.new_confd_hmac_key,
958
                      opts.new_cluster_domain_secret,
959
                      opts.cluster_domain_secret,
960
                      opts.force)
961

    
962

    
963
def SetClusterParams(opts, args):
964
  """Modify the cluster.
965

966
  @param opts: the command line options selected by the user
967
  @type args: list
968
  @param args: should be an empty list
969
  @rtype: int
970
  @return: the desired exit code
971

972
  """
973
  if not (opts.vg_name is not None or opts.drbd_helper or
974
          opts.enabled_hypervisors or opts.hvparams or
975
          opts.beparams or opts.nicparams or
976
          opts.ndparams or opts.diskparams or
977
          opts.candidate_pool_size is not None or
978
          opts.uid_pool is not None or
979
          opts.maintain_node_health is not None or
980
          opts.add_uids is not None or
981
          opts.remove_uids is not None or
982
          opts.default_iallocator is not None or
983
          opts.reserved_lvs is not None or
984
          opts.master_netdev is not None or
985
          opts.master_netmask is not None or
986
          opts.use_external_mip_script is not None or
987
          opts.prealloc_wipe_disks is not None or
988
          opts.hv_state or
989
          opts.enabled_disk_templates or
990
          opts.disk_state or
991
          opts.ipolicy_bounds_specs is not None or
992
          opts.ipolicy_std_specs is not None or
993
          opts.ipolicy_disk_templates is not None or
994
          opts.ipolicy_vcpu_ratio is not None or
995
          opts.ipolicy_spindle_ratio is not None):
996
    ToStderr("Please give at least one of the parameters.")
997
    return 1
998

    
999
  if _CheckNoLvmStorageOptDeprecated(opts):
1000
    return 1
1001

    
1002
  enabled_disk_templates = None
1003
  if opts.enabled_disk_templates:
1004
    enabled_disk_templates = opts.enabled_disk_templates.split(",")
1005

    
1006
  # consistency between vg name and enabled disk templates
1007
  vg_name = None
1008
  if opts.vg_name is not None:
1009
    vg_name = opts.vg_name
1010
  if enabled_disk_templates:
1011
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1012
      ToStdout("You specified a volume group with --vg-name, but you did not"
1013
               " enable any of the following lvm-based disk templates: %s" %
1014
               utils.CommaJoin(utils.GetLvmDiskTemplates()))
1015

    
1016
  drbd_helper = opts.drbd_helper
1017
  if not opts.drbd_storage and opts.drbd_helper:
1018
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
1019
    return 1
1020

    
1021
  if not opts.drbd_storage:
1022
    drbd_helper = ""
1023

    
1024
  hvlist = opts.enabled_hypervisors
1025
  if hvlist is not None:
1026
    hvlist = hvlist.split(",")
1027

    
1028
  # a list of (name, dict) we can pass directly to dict() (or [])
1029
  hvparams = dict(opts.hvparams)
1030
  for hv_params in hvparams.values():
1031
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1032

    
1033
  diskparams = dict(opts.diskparams)
1034

    
1035
  for dt_params in diskparams.values():
1036
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1037

    
1038
  beparams = opts.beparams
1039
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1040

    
1041
  nicparams = opts.nicparams
1042
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1043

    
1044
  ndparams = opts.ndparams
1045
  if ndparams is not None:
1046
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1047

    
1048
  ipolicy = CreateIPolicyFromOpts(
1049
    minmax_ispecs=opts.ipolicy_bounds_specs,
1050
    std_ispecs=opts.ipolicy_std_specs,
1051
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1052
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1053
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1054
    )
1055

    
1056
  mnh = opts.maintain_node_health
1057

    
1058
  uid_pool = opts.uid_pool
1059
  if uid_pool is not None:
1060
    uid_pool = uidpool.ParseUidPool(uid_pool)
1061

    
1062
  add_uids = opts.add_uids
1063
  if add_uids is not None:
1064
    add_uids = uidpool.ParseUidPool(add_uids)
1065

    
1066
  remove_uids = opts.remove_uids
1067
  if remove_uids is not None:
1068
    remove_uids = uidpool.ParseUidPool(remove_uids)
1069

    
1070
  if opts.reserved_lvs is not None:
1071
    if opts.reserved_lvs == "":
1072
      opts.reserved_lvs = []
1073
    else:
1074
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1075

    
1076
  if opts.master_netmask is not None:
1077
    try:
1078
      opts.master_netmask = int(opts.master_netmask)
1079
    except ValueError:
1080
      ToStderr("The --master-netmask option expects an int parameter.")
1081
      return 1
1082

    
1083
  ext_ip_script = opts.use_external_mip_script
1084

    
1085
  if opts.disk_state:
1086
    disk_state = utils.FlatToDict(opts.disk_state)
1087
  else:
1088
    disk_state = {}
1089

    
1090
  hv_state = dict(opts.hv_state)
1091

    
1092
  op = opcodes.OpClusterSetParams(
1093
    vg_name=vg_name,
1094
    drbd_helper=drbd_helper,
1095
    enabled_hypervisors=hvlist,
1096
    hvparams=hvparams,
1097
    os_hvp=None,
1098
    beparams=beparams,
1099
    nicparams=nicparams,
1100
    ndparams=ndparams,
1101
    diskparams=diskparams,
1102
    ipolicy=ipolicy,
1103
    candidate_pool_size=opts.candidate_pool_size,
1104
    maintain_node_health=mnh,
1105
    uid_pool=uid_pool,
1106
    add_uids=add_uids,
1107
    remove_uids=remove_uids,
1108
    default_iallocator=opts.default_iallocator,
1109
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1110
    master_netdev=opts.master_netdev,
1111
    master_netmask=opts.master_netmask,
1112
    reserved_lvs=opts.reserved_lvs,
1113
    use_external_mip_script=ext_ip_script,
1114
    hv_state=hv_state,
1115
    disk_state=disk_state,
1116
    enabled_disk_templates=enabled_disk_templates,
1117
    )
1118
  SubmitOrSend(op, opts)
1119
  return 0
1120

    
1121

    
1122
def QueueOps(opts, args):
1123
  """Queue operations.
1124

1125
  @param opts: the command line options selected by the user
1126
  @type args: list
1127
  @param args: should contain only one element, the subcommand
1128
  @rtype: int
1129
  @return: the desired exit code
1130

1131
  """
1132
  command = args[0]
1133
  client = GetClient()
1134
  if command in ("drain", "undrain"):
1135
    drain_flag = command == "drain"
1136
    client.SetQueueDrainFlag(drain_flag)
1137
  elif command == "info":
1138
    result = client.QueryConfigValues(["drain_flag"])
1139
    if result[0]:
1140
      val = "set"
1141
    else:
1142
      val = "unset"
1143
    ToStdout("The drain flag is %s" % val)
1144
  else:
1145
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1146
                               errors.ECODE_INVAL)
1147

    
1148
  return 0
1149

    
1150

    
1151
def _ShowWatcherPause(until):
1152
  if until is None or until < time.time():
1153
    ToStdout("The watcher is not paused.")
1154
  else:
1155
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1156

    
1157

    
1158
def WatcherOps(opts, args):
1159
  """Watcher operations.
1160

1161
  @param opts: the command line options selected by the user
1162
  @type args: list
1163
  @param args: should contain only one element, the subcommand
1164
  @rtype: int
1165
  @return: the desired exit code
1166

1167
  """
1168
  command = args[0]
1169
  client = GetClient()
1170

    
1171
  if command == "continue":
1172
    client.SetWatcherPause(None)
1173
    ToStdout("The watcher is no longer paused.")
1174

    
1175
  elif command == "pause":
1176
    if len(args) < 2:
1177
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1178

    
1179
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1180
    _ShowWatcherPause(result)
1181

    
1182
  elif command == "info":
1183
    result = client.QueryConfigValues(["watcher_pause"])
1184
    _ShowWatcherPause(result[0])
1185

    
1186
  else:
1187
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1188
                               errors.ECODE_INVAL)
1189

    
1190
  return 0
1191

    
1192

    
1193
def _OobPower(opts, node_list, power):
1194
  """Puts the node in the list to desired power state.
1195

1196
  @param opts: The command line options selected by the user
1197
  @param node_list: The list of nodes to operate on
1198
  @param power: True if they should be powered on, False otherwise
1199
  @return: The success of the operation (none failed)
1200

1201
  """
1202
  if power:
1203
    command = constants.OOB_POWER_ON
1204
  else:
1205
    command = constants.OOB_POWER_OFF
1206

    
1207
  op = opcodes.OpOobCommand(node_names=node_list,
1208
                            command=command,
1209
                            ignore_status=True,
1210
                            timeout=opts.oob_timeout,
1211
                            power_delay=opts.power_delay)
1212
  result = SubmitOpCode(op, opts=opts)
1213
  errs = 0
1214
  for node_result in result:
1215
    (node_tuple, data_tuple) = node_result
1216
    (_, node_name) = node_tuple
1217
    (data_status, _) = data_tuple
1218
    if data_status != constants.RS_NORMAL:
1219
      assert data_status != constants.RS_UNAVAIL
1220
      errs += 1
1221
      ToStderr("There was a problem changing power for %s, please investigate",
1222
               node_name)
1223

    
1224
  if errs > 0:
1225
    return False
1226

    
1227
  return True
1228

    
1229

    
1230
def _InstanceStart(opts, inst_list, start, no_remember=False):
1231
  """Puts the instances in the list to desired state.
1232

1233
  @param opts: The command line options selected by the user
1234
  @param inst_list: The list of instances to operate on
1235
  @param start: True if they should be started, False for shutdown
1236
  @param no_remember: If the instance state should be remembered
1237
  @return: The success of the operation (none failed)
1238

1239
  """
1240
  if start:
1241
    opcls = opcodes.OpInstanceStartup
1242
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1243
  else:
1244
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1245
                           timeout=opts.shutdown_timeout,
1246
                           no_remember=no_remember)
1247
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1248

    
1249
  jex = JobExecutor(opts=opts)
1250

    
1251
  for inst in inst_list:
1252
    ToStdout("Submit %s of instance %s", text_submit, inst)
1253
    op = opcls(instance_name=inst)
1254
    jex.QueueJob(inst, op)
1255

    
1256
  results = jex.GetResults()
1257
  bad_cnt = len([1 for (success, _) in results if not success])
1258

    
1259
  if bad_cnt == 0:
1260
    ToStdout("All instances have been %s successfully", text_success)
1261
  else:
1262
    ToStderr("There were errors while %s instances:\n"
1263
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1264
             len(results))
1265
    return False
1266

    
1267
  return True
1268

    
1269

    
1270
class _RunWhenNodesReachableHelper:
1271
  """Helper class to make shared internal state sharing easier.
1272

1273
  @ivar success: Indicates if all action_cb calls were successful
1274

1275
  """
1276
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1277
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1278
    """Init the object.
1279

1280
    @param node_list: The list of nodes to be reachable
1281
    @param action_cb: Callback called when a new host is reachable
1282
    @type node2ip: dict
1283
    @param node2ip: Node to ip mapping
1284
    @param port: The port to use for the TCP ping
1285
    @param feedback_fn: The function used for feedback
1286
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1287
    @param _sleep_fn: Function to sleep (for unittest use only)
1288

1289
    """
1290
    self.down = set(node_list)
1291
    self.up = set()
1292
    self.node2ip = node2ip
1293
    self.success = True
1294
    self.action_cb = action_cb
1295
    self.port = port
1296
    self.feedback_fn = feedback_fn
1297
    self._ping_fn = _ping_fn
1298
    self._sleep_fn = _sleep_fn
1299

    
1300
  def __call__(self):
1301
    """When called we run action_cb.
1302

1303
    @raises utils.RetryAgain: When there are still down nodes
1304

1305
    """
1306
    if not self.action_cb(self.up):
1307
      self.success = False
1308

    
1309
    if self.down:
1310
      raise utils.RetryAgain()
1311
    else:
1312
      return self.success
1313

    
1314
  def Wait(self, secs):
1315
    """Checks if a host is up or waits remaining seconds.
1316

1317
    @param secs: The secs remaining
1318

1319
    """
1320
    start = time.time()
1321
    for node in self.down:
1322
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1323
                       live_port_needed=True):
1324
        self.feedback_fn("Node %s became available" % node)
1325
        self.up.add(node)
1326
        self.down -= self.up
1327
        # If we have a node available there is the possibility to run the
1328
        # action callback successfully, therefore we don't wait and return
1329
        return
1330

    
1331
    self._sleep_fn(max(0.0, start + secs - time.time()))
1332

    
1333

    
1334
def _RunWhenNodesReachable(node_list, action_cb, interval):
1335
  """Run action_cb when nodes become reachable.
1336

1337
  @param node_list: The list of nodes to be reachable
1338
  @param action_cb: Callback called when a new host is reachable
1339
  @param interval: The earliest time to retry
1340

1341
  """
1342
  client = GetClient()
1343
  cluster_info = client.QueryClusterInfo()
1344
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1345
    family = netutils.IPAddress.family
1346
  else:
1347
    family = netutils.IP6Address.family
1348

    
1349
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1350
                 for node in node_list)
1351

    
1352
  port = netutils.GetDaemonPort(constants.NODED)
1353
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1354
                                        ToStdout)
1355

    
1356
  try:
1357
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1358
                       wait_fn=helper.Wait)
1359
  except utils.RetryTimeout:
1360
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1361
             " again:\n  - %s", "  - ".join(helper.down))
1362
    return False
1363

    
1364

    
1365
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1366
                          _instance_start_fn=_InstanceStart):
1367
  """Start the instances conditional based on node_states.
1368

1369
  @param opts: The command line options selected by the user
1370
  @param inst_map: A dict of inst -> nodes mapping
1371
  @param nodes_online: A list of nodes online
1372
  @param _instance_start_fn: Callback to start instances (unittest use only)
1373
  @return: Success of the operation on all instances
1374

1375
  """
1376
  start_inst_list = []
1377
  for (inst, nodes) in inst_map.items():
1378
    if not (nodes - nodes_online):
1379
      # All nodes the instance lives on are back online
1380
      start_inst_list.append(inst)
1381

    
1382
  for inst in start_inst_list:
1383
    del inst_map[inst]
1384

    
1385
  if start_inst_list:
1386
    return _instance_start_fn(opts, start_inst_list, True)
1387

    
1388
  return True
1389

    
1390

    
1391
def _EpoOn(opts, full_node_list, node_list, inst_map):
1392
  """Does the actual power on.
1393

1394
  @param opts: The command line options selected by the user
1395
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1396
                         OOB)
1397
  @param node_list: The list of nodes to operate on (all need to support OOB)
1398
  @param inst_map: A dict of inst -> nodes mapping
1399
  @return: The desired exit status
1400

1401
  """
1402
  if node_list and not _OobPower(opts, node_list, False):
1403
    ToStderr("Not all nodes seem to get back up, investigate and start"
1404
             " manually if needed")
1405

    
1406
  # Wait for the nodes to be back up
1407
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1408

    
1409
  ToStdout("Waiting until all nodes are available again")
1410
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1411
    ToStderr("Please investigate and start stopped instances manually")
1412
    return constants.EXIT_FAILURE
1413

    
1414
  return constants.EXIT_SUCCESS
1415

    
1416

    
1417
def _EpoOff(opts, node_list, inst_map):
1418
  """Does the actual power off.
1419

1420
  @param opts: The command line options selected by the user
1421
  @param node_list: The list of nodes to operate on (all need to support OOB)
1422
  @param inst_map: A dict of inst -> nodes mapping
1423
  @return: The desired exit status
1424

1425
  """
1426
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1427
    ToStderr("Please investigate and stop instances manually before continuing")
1428
    return constants.EXIT_FAILURE
1429

    
1430
  if not node_list:
1431
    return constants.EXIT_SUCCESS
1432

    
1433
  if _OobPower(opts, node_list, False):
1434
    return constants.EXIT_SUCCESS
1435
  else:
1436
    return constants.EXIT_FAILURE
1437

    
1438

    
1439
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1440
        _confirm_fn=ConfirmOperation,
1441
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1442
  """EPO operations.
1443

1444
  @param opts: the command line options selected by the user
1445
  @type args: list
1446
  @param args: should contain only one element, the subcommand
1447
  @rtype: int
1448
  @return: the desired exit code
1449

1450
  """
1451
  if opts.groups and opts.show_all:
1452
    _stderr_fn("Only one of --groups or --all are allowed")
1453
    return constants.EXIT_FAILURE
1454
  elif args and opts.show_all:
1455
    _stderr_fn("Arguments in combination with --all are not allowed")
1456
    return constants.EXIT_FAILURE
1457

    
1458
  if cl is None:
1459
    cl = GetClient()
1460

    
1461
  if opts.groups:
1462
    node_query_list = \
1463
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1464
  else:
1465
    node_query_list = args
1466

    
1467
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1468
                                           "sinst_list", "powered", "offline"],
1469
                         False)
1470

    
1471
  all_nodes = map(compat.fst, result)
1472
  node_list = []
1473
  inst_map = {}
1474
  for (node, master, pinsts, sinsts, powered, offline) in result:
1475
    if not offline:
1476
      for inst in (pinsts + sinsts):
1477
        if inst in inst_map:
1478
          if not master:
1479
            inst_map[inst].add(node)
1480
        elif master:
1481
          inst_map[inst] = set()
1482
        else:
1483
          inst_map[inst] = set([node])
1484

    
1485
    if master and opts.on:
1486
      # We ignore the master for turning on the machines, in fact we are
1487
      # already operating on the master at this point :)
1488
      continue
1489
    elif master and not opts.show_all:
1490
      _stderr_fn("%s is the master node, please do a master-failover to another"
1491
                 " node not affected by the EPO or use --all if you intend to"
1492
                 " shutdown the whole cluster", node)
1493
      return constants.EXIT_FAILURE
1494
    elif powered is None:
1495
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1496
                 " handled in a fully automated manner", node)
1497
    elif powered == opts.on:
1498
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1499
    elif not offline or (offline and powered):
1500
      node_list.append(node)
1501

    
1502
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1503
    return constants.EXIT_FAILURE
1504

    
1505
  if opts.on:
1506
    return _on_fn(opts, all_nodes, node_list, inst_map)
1507
  else:
1508
    return _off_fn(opts, node_list, inst_map)
1509

    
1510

    
1511
def _GetCreateCommand(info):
1512
  buf = StringIO()
1513
  buf.write("gnt-cluster init")
1514
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1515
  buf.write(" ")
1516
  buf.write(info["name"])
1517
  return buf.getvalue()
1518

    
1519

    
1520
def ShowCreateCommand(opts, args):
1521
  """Shows the command that can be used to re-create the cluster.
1522

1523
  Currently it works only for ipolicy specs.
1524

1525
  """
1526
  cl = GetClient(query=True)
1527
  result = cl.QueryClusterInfo()
1528
  ToStdout(_GetCreateCommand(result))
1529

    
1530

    
1531
commands = {
1532
  "init": (
1533
    InitCluster, [ArgHost(min=1, max=1)],
1534
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1535
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1536
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1537
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1538
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1539
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1540
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1541
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
1542
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
1543
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1544
  "destroy": (
1545
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1546
    "", "Destroy cluster"),
1547
  "rename": (
1548
    RenameCluster, [ArgHost(min=1, max=1)],
1549
    [FORCE_OPT, DRY_RUN_OPT],
1550
    "<new_name>",
1551
    "Renames the cluster"),
1552
  "redist-conf": (
1553
    RedistributeConfig, ARGS_NONE, [SUBMIT_OPT, DRY_RUN_OPT, PRIORITY_OPT],
1554
    "", "Forces a push of the configuration file and ssconf files"
1555
    " to the nodes in the cluster"),
1556
  "verify": (
1557
    VerifyCluster, ARGS_NONE,
1558
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1559
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1560
    "", "Does a check on the cluster configuration"),
1561
  "verify-disks": (
1562
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1563
    "", "Does a check on the cluster disk status"),
1564
  "repair-disk-sizes": (
1565
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1566
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1567
  "master-failover": (
1568
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1569
    "", "Makes the current node the master"),
1570
  "master-ping": (
1571
    MasterPing, ARGS_NONE, [],
1572
    "", "Checks if the master is alive"),
1573
  "version": (
1574
    ShowClusterVersion, ARGS_NONE, [],
1575
    "", "Shows the cluster version"),
1576
  "getmaster": (
1577
    ShowClusterMaster, ARGS_NONE, [],
1578
    "", "Shows the cluster master"),
1579
  "copyfile": (
1580
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1581
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1582
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1583
  "command": (
1584
    RunClusterCommand, [ArgCommand(min=1)],
1585
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1586
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1587
  "info": (
1588
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1589
    "[--roman]", "Show cluster configuration"),
1590
  "list-tags": (
1591
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1592
  "add-tags": (
1593
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1594
    "tag...", "Add tags to the cluster"),
1595
  "remove-tags": (
1596
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT, SUBMIT_OPT],
1597
    "tag...", "Remove tags from the cluster"),
1598
  "search-tags": (
1599
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1600
    "Searches the tags on all objects on"
1601
    " the cluster for a given pattern (regex)"),
1602
  "queue": (
1603
    QueueOps,
1604
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1605
    [], "drain|undrain|info", "Change queue properties"),
1606
  "watcher": (
1607
    WatcherOps,
1608
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1609
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1610
    [],
1611
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1612
  "modify": (
1613
    SetClusterParams, ARGS_NONE,
1614
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1615
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1616
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1617
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1618
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1619
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1620
     DISK_STATE_OPT, SUBMIT_OPT, ENABLED_DISK_TEMPLATES_OPT,
1621
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS,
1622
    "[opts...]",
1623
    "Alters the parameters of the cluster"),
1624
  "renew-crypto": (
1625
    RenewCrypto, ARGS_NONE,
1626
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1627
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1628
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1629
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1630
    "[opts...]",
1631
    "Renews cluster certificates, keys and secrets"),
1632
  "epo": (
1633
    Epo, [ArgUnknown()],
1634
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1635
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1636
    "[opts...] [args]",
1637
    "Performs an emergency power-off on given args"),
1638
  "activate-master-ip": (
1639
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1640
  "deactivate-master-ip": (
1641
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1642
    "Deactivates the master IP"),
1643
  "show-ispecs-cmd": (
1644
    ShowCreateCommand, ARGS_NONE, [], "",
1645
    "Show the command line to re-create the cluster"),
1646
  }
1647

    
1648

    
1649
#: dictionary with aliases for commands
1650
aliases = {
1651
  "masterfailover": "master-failover",
1652
  "show": "info",
1653
}
1654

    
1655

    
1656
def Main():
1657
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1658
                     aliases=aliases)