Statistics
| Branch: | Tag: | Revision:

root / lib / client / gnt_cluster.py @ 74a4fc94

History | View | Annotate | Download (53.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21
"""Cluster related commands"""
22

    
23
# pylint: disable=W0401,W0613,W0614,C0103
24
# W0401: Wildcard import ganeti.cli
25
# W0613: Unused argument, since all functions follow the same API
26
# W0614: Unused import %s from wildcard import (since we need cli)
27
# C0103: Invalid name gnt-cluster
28

    
29
from cStringIO import StringIO
30
import os.path
31
import time
32
import OpenSSL
33
import itertools
34

    
35
from ganeti.cli import *
36
from ganeti import opcodes
37
from ganeti import constants
38
from ganeti import errors
39
from ganeti import utils
40
from ganeti import bootstrap
41
from ganeti import ssh
42
from ganeti import objects
43
from ganeti import uidpool
44
from ganeti import compat
45
from ganeti import netutils
46
from ganeti import pathutils
47

    
48

    
49
ON_OPT = cli_option("--on", default=False,
50
                    action="store_true", dest="on",
51
                    help="Recover from an EPO")
52

    
53
GROUPS_OPT = cli_option("--groups", default=False,
54
                        action="store_true", dest="groups",
55
                        help="Arguments are node groups instead of nodes")
56

    
57
FORCE_FAILOVER = cli_option("--yes-do-it", dest="yes_do_it",
58
                            help="Override interactive check for --no-voting",
59
                            default=False, action="store_true")
60

    
61
_EPO_PING_INTERVAL = 30 # 30 seconds between pings
62
_EPO_PING_TIMEOUT = 1 # 1 second
63
_EPO_REACHABLE_TIMEOUT = 15 * 60 # 15 minutes
64

    
65

    
66
def _CheckNoLvmStorageOptDeprecated(opts):
67
  """Checks if the legacy option '--no-lvm-storage' is used.
68

69
  """
70
  if not opts.lvm_storage:
71
    ToStderr("The option --no-lvm-storage is no longer supported. If you want"
72
             " to disable lvm-based storage cluster-wide, use the option"
73
             " --enabled-disk-templates to disable all of these lvm-base disk "
74
             "  templates: %s" %
75
             utils.CommaJoin(utils.GetLvmDiskTemplates()))
76
    return 1
77

    
78

    
79
@UsesRPC
80
def InitCluster(opts, args):
81
  """Initialize the cluster.
82

83
  @param opts: the command line options selected by the user
84
  @type args: list
85
  @param args: should contain only one element, the desired
86
      cluster name
87
  @rtype: int
88
  @return: the desired exit code
89

90
  """
91
  if _CheckNoLvmStorageOptDeprecated(opts):
92
    return 1
93
  enabled_disk_templates = opts.enabled_disk_templates
94
  if enabled_disk_templates:
95
    enabled_disk_templates = enabled_disk_templates.split(",")
96
  else:
97
    enabled_disk_templates = constants.DEFAULT_ENABLED_DISK_TEMPLATES
98

    
99
  vg_name = None
100
  if opts.vg_name is not None:
101
    vg_name = opts.vg_name
102
    if vg_name:
103
      if not utils.IsLvmEnabled(enabled_disk_templates):
104
        ToStdout("You specified a volume group with --vg-name, but you did not"
105
                 " enable any disk template that uses lvm.")
106
    else:
107
      if utils.IsLvmEnabled(enabled_disk_templates):
108
        ToStderr("LVM disk templates are enabled, but vg name not set.")
109
        return 1
110
  else:
111
    if utils.IsLvmEnabled(enabled_disk_templates):
112
      vg_name = constants.DEFAULT_VG
113

    
114
  if not opts.drbd_storage and opts.drbd_helper:
115
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
116
    return 1
117

    
118
  drbd_helper = opts.drbd_helper
119
  if opts.drbd_storage and not opts.drbd_helper:
120
    drbd_helper = constants.DEFAULT_DRBD_HELPER
121

    
122
  master_netdev = opts.master_netdev
123
  if master_netdev is None:
124
    master_netdev = constants.DEFAULT_BRIDGE
125

    
126
  hvlist = opts.enabled_hypervisors
127
  if hvlist is None:
128
    hvlist = constants.DEFAULT_ENABLED_HYPERVISOR
129
  hvlist = hvlist.split(",")
130

    
131
  hvparams = dict(opts.hvparams)
132
  beparams = opts.beparams
133
  nicparams = opts.nicparams
134

    
135
  diskparams = dict(opts.diskparams)
136

    
137
  # check the disk template types here, as we cannot rely on the type check done
138
  # by the opcode parameter types
139
  diskparams_keys = set(diskparams.keys())
140
  if not (diskparams_keys <= constants.DISK_TEMPLATES):
141
    unknown = utils.NiceSort(diskparams_keys - constants.DISK_TEMPLATES)
142
    ToStderr("Disk templates unknown: %s" % utils.CommaJoin(unknown))
143
    return 1
144

    
145
  # prepare beparams dict
146
  beparams = objects.FillDict(constants.BEC_DEFAULTS, beparams)
147
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
148

    
149
  # prepare nicparams dict
150
  nicparams = objects.FillDict(constants.NICC_DEFAULTS, nicparams)
151
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
152

    
153
  # prepare ndparams dict
154
  if opts.ndparams is None:
155
    ndparams = dict(constants.NDC_DEFAULTS)
156
  else:
157
    ndparams = objects.FillDict(constants.NDC_DEFAULTS, opts.ndparams)
158
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
159

    
160
  # prepare hvparams dict
161
  for hv in constants.HYPER_TYPES:
162
    if hv not in hvparams:
163
      hvparams[hv] = {}
164
    hvparams[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], hvparams[hv])
165
    utils.ForceDictType(hvparams[hv], constants.HVS_PARAMETER_TYPES)
166

    
167
  # prepare diskparams dict
168
  for templ in constants.DISK_TEMPLATES:
169
    if templ not in diskparams:
170
      diskparams[templ] = {}
171
    diskparams[templ] = objects.FillDict(constants.DISK_DT_DEFAULTS[templ],
172
                                         diskparams[templ])
173
    utils.ForceDictType(diskparams[templ], constants.DISK_DT_TYPES)
174

    
175
  # prepare ipolicy dict
176
  ipolicy = CreateIPolicyFromOpts(
177
    ispecs_mem_size=opts.ispecs_mem_size,
178
    ispecs_cpu_count=opts.ispecs_cpu_count,
179
    ispecs_disk_count=opts.ispecs_disk_count,
180
    ispecs_disk_size=opts.ispecs_disk_size,
181
    ispecs_nic_count=opts.ispecs_nic_count,
182
    minmax_ispecs=opts.ipolicy_bounds_specs,
183
    std_ispecs=opts.ipolicy_std_specs,
184
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
185
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
186
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
187
    fill_all=True)
188

    
189
  if opts.candidate_pool_size is None:
190
    opts.candidate_pool_size = constants.MASTER_POOL_SIZE_DEFAULT
191

    
192
  if opts.mac_prefix is None:
193
    opts.mac_prefix = constants.DEFAULT_MAC_PREFIX
194

    
195
  uid_pool = opts.uid_pool
196
  if uid_pool is not None:
197
    uid_pool = uidpool.ParseUidPool(uid_pool)
198

    
199
  if opts.prealloc_wipe_disks is None:
200
    opts.prealloc_wipe_disks = False
201

    
202
  external_ip_setup_script = opts.use_external_mip_script
203
  if external_ip_setup_script is None:
204
    external_ip_setup_script = False
205

    
206
  try:
207
    primary_ip_version = int(opts.primary_ip_version)
208
  except (ValueError, TypeError), err:
209
    ToStderr("Invalid primary ip version value: %s" % str(err))
210
    return 1
211

    
212
  master_netmask = opts.master_netmask
213
  try:
214
    if master_netmask is not None:
215
      master_netmask = int(master_netmask)
216
  except (ValueError, TypeError), err:
217
    ToStderr("Invalid master netmask value: %s" % str(err))
218
    return 1
219

    
220
  if opts.disk_state:
221
    disk_state = utils.FlatToDict(opts.disk_state)
222
  else:
223
    disk_state = {}
224

    
225
  hv_state = dict(opts.hv_state)
226

    
227
  bootstrap.InitCluster(cluster_name=args[0],
228
                        secondary_ip=opts.secondary_ip,
229
                        vg_name=vg_name,
230
                        mac_prefix=opts.mac_prefix,
231
                        master_netmask=master_netmask,
232
                        master_netdev=master_netdev,
233
                        file_storage_dir=opts.file_storage_dir,
234
                        shared_file_storage_dir=opts.shared_file_storage_dir,
235
                        enabled_hypervisors=hvlist,
236
                        hvparams=hvparams,
237
                        beparams=beparams,
238
                        nicparams=nicparams,
239
                        ndparams=ndparams,
240
                        diskparams=diskparams,
241
                        ipolicy=ipolicy,
242
                        candidate_pool_size=opts.candidate_pool_size,
243
                        modify_etc_hosts=opts.modify_etc_hosts,
244
                        modify_ssh_setup=opts.modify_ssh_setup,
245
                        maintain_node_health=opts.maintain_node_health,
246
                        drbd_helper=drbd_helper,
247
                        uid_pool=uid_pool,
248
                        default_iallocator=opts.default_iallocator,
249
                        primary_ip_version=primary_ip_version,
250
                        prealloc_wipe_disks=opts.prealloc_wipe_disks,
251
                        use_external_mip_script=external_ip_setup_script,
252
                        hv_state=hv_state,
253
                        disk_state=disk_state,
254
                        enabled_disk_templates=enabled_disk_templates,
255
                        )
256
  op = opcodes.OpClusterPostInit()
257
  SubmitOpCode(op, opts=opts)
258
  return 0
259

    
260

    
261
@UsesRPC
262
def DestroyCluster(opts, args):
263
  """Destroy the cluster.
264

265
  @param opts: the command line options selected by the user
266
  @type args: list
267
  @param args: should be an empty list
268
  @rtype: int
269
  @return: the desired exit code
270

271
  """
272
  if not opts.yes_do_it:
273
    ToStderr("Destroying a cluster is irreversible. If you really want"
274
             " destroy this cluster, supply the --yes-do-it option.")
275
    return 1
276

    
277
  op = opcodes.OpClusterDestroy()
278
  master_uuid = SubmitOpCode(op, opts=opts)
279
  # if we reached this, the opcode didn't fail; we can proceed to
280
  # shutdown all the daemons
281
  bootstrap.FinalizeClusterDestroy(master_uuid)
282
  return 0
283

    
284

    
285
def RenameCluster(opts, args):
286
  """Rename the cluster.
287

288
  @param opts: the command line options selected by the user
289
  @type args: list
290
  @param args: should contain only one element, the new cluster name
291
  @rtype: int
292
  @return: the desired exit code
293

294
  """
295
  cl = GetClient()
296

    
297
  (cluster_name, ) = cl.QueryConfigValues(["cluster_name"])
298

    
299
  new_name = args[0]
300
  if not opts.force:
301
    usertext = ("This will rename the cluster from '%s' to '%s'. If you are"
302
                " connected over the network to the cluster name, the"
303
                " operation is very dangerous as the IP address will be"
304
                " removed from the node and the change may not go through."
305
                " Continue?") % (cluster_name, new_name)
306
    if not AskUser(usertext):
307
      return 1
308

    
309
  op = opcodes.OpClusterRename(name=new_name)
310
  result = SubmitOpCode(op, opts=opts, cl=cl)
311

    
312
  if result:
313
    ToStdout("Cluster renamed from '%s' to '%s'", cluster_name, result)
314

    
315
  return 0
316

    
317

    
318
def ActivateMasterIp(opts, args):
319
  """Activates the master IP.
320

321
  """
322
  op = opcodes.OpClusterActivateMasterIp()
323
  SubmitOpCode(op)
324
  return 0
325

    
326

    
327
def DeactivateMasterIp(opts, args):
328
  """Deactivates the master IP.
329

330
  """
331
  if not opts.confirm:
332
    usertext = ("This will disable the master IP. All the open connections to"
333
                " the master IP will be closed. To reach the master you will"
334
                " need to use its node IP."
335
                " Continue?")
336
    if not AskUser(usertext):
337
      return 1
338

    
339
  op = opcodes.OpClusterDeactivateMasterIp()
340
  SubmitOpCode(op)
341
  return 0
342

    
343

    
344
def RedistributeConfig(opts, args):
345
  """Forces push of the cluster configuration.
346

347
  @param opts: the command line options selected by the user
348
  @type args: list
349
  @param args: empty list
350
  @rtype: int
351
  @return: the desired exit code
352

353
  """
354
  op = opcodes.OpClusterRedistConf()
355
  SubmitOrSend(op, opts)
356
  return 0
357

    
358

    
359
def ShowClusterVersion(opts, args):
360
  """Write version of ganeti software to the standard output.
361

362
  @param opts: the command line options selected by the user
363
  @type args: list
364
  @param args: should be an empty list
365
  @rtype: int
366
  @return: the desired exit code
367

368
  """
369
  cl = GetClient(query=True)
370
  result = cl.QueryClusterInfo()
371
  ToStdout("Software version: %s", result["software_version"])
372
  ToStdout("Internode protocol: %s", result["protocol_version"])
373
  ToStdout("Configuration format: %s", result["config_version"])
374
  ToStdout("OS api version: %s", result["os_api_version"])
375
  ToStdout("Export interface: %s", result["export_version"])
376
  ToStdout("VCS version: %s", result["vcs_version"])
377
  return 0
378

    
379

    
380
def ShowClusterMaster(opts, args):
381
  """Write name of master node to the standard output.
382

383
  @param opts: the command line options selected by the user
384
  @type args: list
385
  @param args: should be an empty list
386
  @rtype: int
387
  @return: the desired exit code
388

389
  """
390
  master = bootstrap.GetMaster()
391
  ToStdout(master)
392
  return 0
393

    
394

    
395
def _FormatGroupedParams(paramsdict, roman=False):
396
  """Format Grouped parameters (be, nic, disk) by group.
397

398
  @type paramsdict: dict of dicts
399
  @param paramsdict: {group: {param: value, ...}, ...}
400
  @rtype: dict of dicts
401
  @return: copy of the input dictionaries with strings as values
402

403
  """
404
  ret = {}
405
  for (item, val) in paramsdict.items():
406
    if isinstance(val, dict):
407
      ret[item] = _FormatGroupedParams(val, roman=roman)
408
    elif roman and isinstance(val, int):
409
      ret[item] = compat.TryToRoman(val)
410
    else:
411
      ret[item] = str(val)
412
  return ret
413

    
414

    
415
def ShowClusterConfig(opts, args):
416
  """Shows cluster information.
417

418
  @param opts: the command line options selected by the user
419
  @type args: list
420
  @param args: should be an empty list
421
  @rtype: int
422
  @return: the desired exit code
423

424
  """
425
  cl = GetClient(query=True)
426
  result = cl.QueryClusterInfo()
427

    
428
  if result["tags"]:
429
    tags = utils.CommaJoin(utils.NiceSort(result["tags"]))
430
  else:
431
    tags = "(none)"
432
  if result["reserved_lvs"]:
433
    reserved_lvs = utils.CommaJoin(result["reserved_lvs"])
434
  else:
435
    reserved_lvs = "(none)"
436

    
437
  enabled_hv = result["enabled_hypervisors"]
438
  hvparams = dict((k, v) for k, v in result["hvparams"].iteritems()
439
                  if k in enabled_hv)
440

    
441
  info = [
442
    ("Cluster name", result["name"]),
443
    ("Cluster UUID", result["uuid"]),
444

    
445
    ("Creation time", utils.FormatTime(result["ctime"])),
446
    ("Modification time", utils.FormatTime(result["mtime"])),
447

    
448
    ("Master node", result["master"]),
449

    
450
    ("Architecture (this node)",
451
     "%s (%s)" % (result["architecture"][0], result["architecture"][1])),
452

    
453
    ("Tags", tags),
454

    
455
    ("Default hypervisor", result["default_hypervisor"]),
456
    ("Enabled hypervisors", utils.CommaJoin(enabled_hv)),
457

    
458
    ("Hypervisor parameters", _FormatGroupedParams(hvparams)),
459

    
460
    ("OS-specific hypervisor parameters",
461
     _FormatGroupedParams(result["os_hvp"])),
462

    
463
    ("OS parameters", _FormatGroupedParams(result["osparams"])),
464

    
465
    ("Hidden OSes", utils.CommaJoin(result["hidden_os"])),
466
    ("Blacklisted OSes", utils.CommaJoin(result["blacklisted_os"])),
467

    
468
    ("Cluster parameters", [
469
      ("candidate pool size",
470
       compat.TryToRoman(result["candidate_pool_size"],
471
                         convert=opts.roman_integers)),
472
      ("master netdev", result["master_netdev"]),
473
      ("master netmask", result["master_netmask"]),
474
      ("use external master IP address setup script",
475
       result["use_external_mip_script"]),
476
      ("lvm volume group", result["volume_group_name"]),
477
      ("lvm reserved volumes", reserved_lvs),
478
      ("drbd usermode helper", result["drbd_usermode_helper"]),
479
      ("file storage path", result["file_storage_dir"]),
480
      ("shared file storage path", result["shared_file_storage_dir"]),
481
      ("maintenance of node health", result["maintain_node_health"]),
482
      ("uid pool", uidpool.FormatUidPool(result["uid_pool"])),
483
      ("default instance allocator", result["default_iallocator"]),
484
      ("primary ip version", result["primary_ip_version"]),
485
      ("preallocation wipe disks", result["prealloc_wipe_disks"]),
486
      ("OS search path", utils.CommaJoin(pathutils.OS_SEARCH_PATH)),
487
      ("ExtStorage Providers search path",
488
       utils.CommaJoin(pathutils.ES_SEARCH_PATH)),
489
      ("enabled disk templates",
490
       utils.CommaJoin(result["enabled_disk_templates"])),
491
      ]),
492

    
493
    ("Default node parameters",
494
     _FormatGroupedParams(result["ndparams"], roman=opts.roman_integers)),
495

    
496
    ("Default instance parameters",
497
     _FormatGroupedParams(result["beparams"], roman=opts.roman_integers)),
498

    
499
    ("Default nic parameters",
500
     _FormatGroupedParams(result["nicparams"], roman=opts.roman_integers)),
501

    
502
    ("Default disk parameters",
503
     _FormatGroupedParams(result["diskparams"], roman=opts.roman_integers)),
504

    
505
    ("Instance policy - limits for instances",
506
     FormatPolicyInfo(result["ipolicy"], None, True)),
507
    ]
508

    
509
  PrintGenericInfo(info)
510
  return 0
511

    
512

    
513
def ClusterCopyFile(opts, args):
514
  """Copy a file from master to some nodes.
515

516
  @param opts: the command line options selected by the user
517
  @type args: list
518
  @param args: should contain only one element, the path of
519
      the file to be copied
520
  @rtype: int
521
  @return: the desired exit code
522

523
  """
524
  filename = args[0]
525
  filename = os.path.abspath(filename)
526

    
527
  if not os.path.exists(filename):
528
    raise errors.OpPrereqError("No such filename '%s'" % filename,
529
                               errors.ECODE_INVAL)
530

    
531
  cl = GetClient()
532

    
533
  cluster_name = cl.QueryConfigValues(["cluster_name"])[0]
534

    
535
  results = GetOnlineNodes(nodes=opts.nodes, cl=cl, filter_master=True,
536
                           secondary_ips=opts.use_replication_network,
537
                           nodegroup=opts.nodegroup)
538

    
539
  srun = ssh.SshRunner(cluster_name)
540
  for node in results:
541
    if not srun.CopyFileToNode(node, filename):
542
      ToStderr("Copy of file %s to node %s failed", filename, node)
543

    
544
  return 0
545

    
546

    
547
def RunClusterCommand(opts, args):
548
  """Run a command on some nodes.
549

550
  @param opts: the command line options selected by the user
551
  @type args: list
552
  @param args: should contain the command to be run and its arguments
553
  @rtype: int
554
  @return: the desired exit code
555

556
  """
557
  cl = GetClient()
558

    
559
  command = " ".join(args)
560

    
561
  nodes = GetOnlineNodes(nodes=opts.nodes, cl=cl, nodegroup=opts.nodegroup)
562

    
563
  cluster_name, master_node = cl.QueryConfigValues(["cluster_name",
564
                                                    "master_node"])
565

    
566
  srun = ssh.SshRunner(cluster_name=cluster_name)
567

    
568
  # Make sure master node is at list end
569
  if master_node in nodes:
570
    nodes.remove(master_node)
571
    nodes.append(master_node)
572

    
573
  for name in nodes:
574
    result = srun.Run(name, constants.SSH_LOGIN_USER, command)
575

    
576
    if opts.failure_only and result.exit_code == constants.EXIT_SUCCESS:
577
      # Do not output anything for successful commands
578
      continue
579

    
580
    ToStdout("------------------------------------------------")
581
    if opts.show_machine_names:
582
      for line in result.output.splitlines():
583
        ToStdout("%s: %s", name, line)
584
    else:
585
      ToStdout("node: %s", name)
586
      ToStdout("%s", result.output)
587
    ToStdout("return code = %s", result.exit_code)
588

    
589
  return 0
590

    
591

    
592
def VerifyCluster(opts, args):
593
  """Verify integrity of cluster, performing various test on nodes.
594

595
  @param opts: the command line options selected by the user
596
  @type args: list
597
  @param args: should be an empty list
598
  @rtype: int
599
  @return: the desired exit code
600

601
  """
602
  skip_checks = []
603

    
604
  if opts.skip_nplusone_mem:
605
    skip_checks.append(constants.VERIFY_NPLUSONE_MEM)
606

    
607
  cl = GetClient()
608

    
609
  op = opcodes.OpClusterVerify(verbose=opts.verbose,
610
                               error_codes=opts.error_codes,
611
                               debug_simulate_errors=opts.simulate_errors,
612
                               skip_checks=skip_checks,
613
                               ignore_errors=opts.ignore_errors,
614
                               group_name=opts.nodegroup)
615
  result = SubmitOpCode(op, cl=cl, opts=opts)
616

    
617
  # Keep track of submitted jobs
618
  jex = JobExecutor(cl=cl, opts=opts)
619

    
620
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
621
    jex.AddJobId(None, status, job_id)
622

    
623
  results = jex.GetResults()
624

    
625
  (bad_jobs, bad_results) = \
626
    map(len,
627
        # Convert iterators to lists
628
        map(list,
629
            # Count errors
630
            map(compat.partial(itertools.ifilterfalse, bool),
631
                # Convert result to booleans in a tuple
632
                zip(*((job_success, len(op_results) == 1 and op_results[0])
633
                      for (job_success, op_results) in results)))))
634

    
635
  if bad_jobs == 0 and bad_results == 0:
636
    rcode = constants.EXIT_SUCCESS
637
  else:
638
    rcode = constants.EXIT_FAILURE
639
    if bad_jobs > 0:
640
      ToStdout("%s job(s) failed while verifying the cluster.", bad_jobs)
641

    
642
  return rcode
643

    
644

    
645
def VerifyDisks(opts, args):
646
  """Verify integrity of cluster disks.
647

648
  @param opts: the command line options selected by the user
649
  @type args: list
650
  @param args: should be an empty list
651
  @rtype: int
652
  @return: the desired exit code
653

654
  """
655
  cl = GetClient()
656

    
657
  op = opcodes.OpClusterVerifyDisks()
658

    
659
  result = SubmitOpCode(op, cl=cl, opts=opts)
660

    
661
  # Keep track of submitted jobs
662
  jex = JobExecutor(cl=cl, opts=opts)
663

    
664
  for (status, job_id) in result[constants.JOB_IDS_KEY]:
665
    jex.AddJobId(None, status, job_id)
666

    
667
  retcode = constants.EXIT_SUCCESS
668

    
669
  for (status, result) in jex.GetResults():
670
    if not status:
671
      ToStdout("Job failed: %s", result)
672
      continue
673

    
674
    ((bad_nodes, instances, missing), ) = result
675

    
676
    for node, text in bad_nodes.items():
677
      ToStdout("Error gathering data on node %s: %s",
678
               node, utils.SafeEncode(text[-400:]))
679
      retcode = constants.EXIT_FAILURE
680
      ToStdout("You need to fix these nodes first before fixing instances")
681

    
682
    for iname in instances:
683
      if iname in missing:
684
        continue
685
      op = opcodes.OpInstanceActivateDisks(instance_name=iname)
686
      try:
687
        ToStdout("Activating disks for instance '%s'", iname)
688
        SubmitOpCode(op, opts=opts, cl=cl)
689
      except errors.GenericError, err:
690
        nret, msg = FormatError(err)
691
        retcode |= nret
692
        ToStderr("Error activating disks for instance %s: %s", iname, msg)
693

    
694
    if missing:
695
      for iname, ival in missing.iteritems():
696
        all_missing = compat.all(x[0] in bad_nodes for x in ival)
697
        if all_missing:
698
          ToStdout("Instance %s cannot be verified as it lives on"
699
                   " broken nodes", iname)
700
        else:
701
          ToStdout("Instance %s has missing logical volumes:", iname)
702
          ival.sort()
703
          for node, vol in ival:
704
            if node in bad_nodes:
705
              ToStdout("\tbroken node %s /dev/%s", node, vol)
706
            else:
707
              ToStdout("\t%s /dev/%s", node, vol)
708

    
709
      ToStdout("You need to replace or recreate disks for all the above"
710
               " instances if this message persists after fixing broken nodes.")
711
      retcode = constants.EXIT_FAILURE
712
    elif not instances:
713
      ToStdout("No disks need to be activated.")
714

    
715
  return retcode
716

    
717

    
718
def RepairDiskSizes(opts, args):
719
  """Verify sizes of cluster disks.
720

721
  @param opts: the command line options selected by the user
722
  @type args: list
723
  @param args: optional list of instances to restrict check to
724
  @rtype: int
725
  @return: the desired exit code
726

727
  """
728
  op = opcodes.OpClusterRepairDiskSizes(instances=args)
729
  SubmitOpCode(op, opts=opts)
730

    
731

    
732
@UsesRPC
733
def MasterFailover(opts, args):
734
  """Failover the master node.
735

736
  This command, when run on a non-master node, will cause the current
737
  master to cease being master, and the non-master to become new
738
  master.
739

740
  @param opts: the command line options selected by the user
741
  @type args: list
742
  @param args: should be an empty list
743
  @rtype: int
744
  @return: the desired exit code
745

746
  """
747
  if opts.no_voting and not opts.yes_do_it:
748
    usertext = ("This will perform the failover even if most other nodes"
749
                " are down, or if this node is outdated. This is dangerous"
750
                " as it can lead to a non-consistent cluster. Check the"
751
                " gnt-cluster(8) man page before proceeding. Continue?")
752
    if not AskUser(usertext):
753
      return 1
754

    
755
  return bootstrap.MasterFailover(no_voting=opts.no_voting)
756

    
757

    
758
def MasterPing(opts, args):
759
  """Checks if the master is alive.
760

761
  @param opts: the command line options selected by the user
762
  @type args: list
763
  @param args: should be an empty list
764
  @rtype: int
765
  @return: the desired exit code
766

767
  """
768
  try:
769
    cl = GetClient()
770
    cl.QueryClusterInfo()
771
    return 0
772
  except Exception: # pylint: disable=W0703
773
    return 1
774

    
775

    
776
def SearchTags(opts, args):
777
  """Searches the tags on all the cluster.
778

779
  @param opts: the command line options selected by the user
780
  @type args: list
781
  @param args: should contain only one element, the tag pattern
782
  @rtype: int
783
  @return: the desired exit code
784

785
  """
786
  op = opcodes.OpTagsSearch(pattern=args[0])
787
  result = SubmitOpCode(op, opts=opts)
788
  if not result:
789
    return 1
790
  result = list(result)
791
  result.sort()
792
  for path, tag in result:
793
    ToStdout("%s %s", path, tag)
794

    
795

    
796
def _ReadAndVerifyCert(cert_filename, verify_private_key=False):
797
  """Reads and verifies an X509 certificate.
798

799
  @type cert_filename: string
800
  @param cert_filename: the path of the file containing the certificate to
801
                        verify encoded in PEM format
802
  @type verify_private_key: bool
803
  @param verify_private_key: whether to verify the private key in addition to
804
                             the public certificate
805
  @rtype: string
806
  @return: a string containing the PEM-encoded certificate.
807

808
  """
809
  try:
810
    pem = utils.ReadFile(cert_filename)
811
  except IOError, err:
812
    raise errors.X509CertError(cert_filename,
813
                               "Unable to read certificate: %s" % str(err))
814

    
815
  try:
816
    OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, pem)
817
  except Exception, err:
818
    raise errors.X509CertError(cert_filename,
819
                               "Unable to load certificate: %s" % str(err))
820

    
821
  if verify_private_key:
822
    try:
823
      OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, pem)
824
    except Exception, err:
825
      raise errors.X509CertError(cert_filename,
826
                                 "Unable to load private key: %s" % str(err))
827

    
828
  return pem
829

    
830

    
831
def _RenewCrypto(new_cluster_cert, new_rapi_cert, # pylint: disable=R0911
832
                 rapi_cert_filename, new_spice_cert, spice_cert_filename,
833
                 spice_cacert_filename, new_confd_hmac_key, new_cds,
834
                 cds_filename, force):
835
  """Renews cluster certificates, keys and secrets.
836

837
  @type new_cluster_cert: bool
838
  @param new_cluster_cert: Whether to generate a new cluster certificate
839
  @type new_rapi_cert: bool
840
  @param new_rapi_cert: Whether to generate a new RAPI certificate
841
  @type rapi_cert_filename: string
842
  @param rapi_cert_filename: Path to file containing new RAPI certificate
843
  @type new_spice_cert: bool
844
  @param new_spice_cert: Whether to generate a new SPICE certificate
845
  @type spice_cert_filename: string
846
  @param spice_cert_filename: Path to file containing new SPICE certificate
847
  @type spice_cacert_filename: string
848
  @param spice_cacert_filename: Path to file containing the certificate of the
849
                                CA that signed the SPICE certificate
850
  @type new_confd_hmac_key: bool
851
  @param new_confd_hmac_key: Whether to generate a new HMAC key
852
  @type new_cds: bool
853
  @param new_cds: Whether to generate a new cluster domain secret
854
  @type cds_filename: string
855
  @param cds_filename: Path to file containing new cluster domain secret
856
  @type force: bool
857
  @param force: Whether to ask user for confirmation
858

859
  """
860
  if new_rapi_cert and rapi_cert_filename:
861
    ToStderr("Only one of the --new-rapi-certificate and --rapi-certificate"
862
             " options can be specified at the same time.")
863
    return 1
864

    
865
  if new_cds and cds_filename:
866
    ToStderr("Only one of the --new-cluster-domain-secret and"
867
             " --cluster-domain-secret options can be specified at"
868
             " the same time.")
869
    return 1
870

    
871
  if new_spice_cert and (spice_cert_filename or spice_cacert_filename):
872
    ToStderr("When using --new-spice-certificate, the --spice-certificate"
873
             " and --spice-ca-certificate must not be used.")
874
    return 1
875

    
876
  if bool(spice_cacert_filename) ^ bool(spice_cert_filename):
877
    ToStderr("Both --spice-certificate and --spice-ca-certificate must be"
878
             " specified.")
879
    return 1
880

    
881
  rapi_cert_pem, spice_cert_pem, spice_cacert_pem = (None, None, None)
882
  try:
883
    if rapi_cert_filename:
884
      rapi_cert_pem = _ReadAndVerifyCert(rapi_cert_filename, True)
885
    if spice_cert_filename:
886
      spice_cert_pem = _ReadAndVerifyCert(spice_cert_filename, True)
887
      spice_cacert_pem = _ReadAndVerifyCert(spice_cacert_filename)
888
  except errors.X509CertError, err:
889
    ToStderr("Unable to load X509 certificate from %s: %s", err[0], err[1])
890
    return 1
891

    
892
  if cds_filename:
893
    try:
894
      cds = utils.ReadFile(cds_filename)
895
    except Exception, err: # pylint: disable=W0703
896
      ToStderr("Can't load new cluster domain secret from %s: %s" %
897
               (cds_filename, str(err)))
898
      return 1
899
  else:
900
    cds = None
901

    
902
  if not force:
903
    usertext = ("This requires all daemons on all nodes to be restarted and"
904
                " may take some time. Continue?")
905
    if not AskUser(usertext):
906
      return 1
907

    
908
  def _RenewCryptoInner(ctx):
909
    ctx.feedback_fn("Updating certificates and keys")
910
    bootstrap.GenerateClusterCrypto(new_cluster_cert,
911
                                    new_rapi_cert,
912
                                    new_spice_cert,
913
                                    new_confd_hmac_key,
914
                                    new_cds,
915
                                    rapi_cert_pem=rapi_cert_pem,
916
                                    spice_cert_pem=spice_cert_pem,
917
                                    spice_cacert_pem=spice_cacert_pem,
918
                                    cds=cds)
919

    
920
    files_to_copy = []
921

    
922
    if new_cluster_cert:
923
      files_to_copy.append(pathutils.NODED_CERT_FILE)
924

    
925
    if new_rapi_cert or rapi_cert_pem:
926
      files_to_copy.append(pathutils.RAPI_CERT_FILE)
927

    
928
    if new_spice_cert or spice_cert_pem:
929
      files_to_copy.append(pathutils.SPICE_CERT_FILE)
930
      files_to_copy.append(pathutils.SPICE_CACERT_FILE)
931

    
932
    if new_confd_hmac_key:
933
      files_to_copy.append(pathutils.CONFD_HMAC_KEY)
934

    
935
    if new_cds or cds:
936
      files_to_copy.append(pathutils.CLUSTER_DOMAIN_SECRET_FILE)
937

    
938
    if files_to_copy:
939
      for node_name in ctx.nonmaster_nodes:
940
        ctx.feedback_fn("Copying %s to %s" %
941
                        (", ".join(files_to_copy), node_name))
942
        for file_name in files_to_copy:
943
          ctx.ssh.CopyFileToNode(node_name, file_name)
944

    
945
  RunWhileClusterStopped(ToStdout, _RenewCryptoInner)
946

    
947
  ToStdout("All requested certificates and keys have been replaced."
948
           " Running \"gnt-cluster verify\" now is recommended.")
949

    
950
  return 0
951

    
952

    
953
def RenewCrypto(opts, args):
954
  """Renews cluster certificates, keys and secrets.
955

956
  """
957
  return _RenewCrypto(opts.new_cluster_cert,
958
                      opts.new_rapi_cert,
959
                      opts.rapi_cert,
960
                      opts.new_spice_cert,
961
                      opts.spice_cert,
962
                      opts.spice_cacert,
963
                      opts.new_confd_hmac_key,
964
                      opts.new_cluster_domain_secret,
965
                      opts.cluster_domain_secret,
966
                      opts.force)
967

    
968

    
969
def SetClusterParams(opts, args):
970
  """Modify the cluster.
971

972
  @param opts: the command line options selected by the user
973
  @type args: list
974
  @param args: should be an empty list
975
  @rtype: int
976
  @return: the desired exit code
977

978
  """
979
  if not (opts.vg_name is not None or opts.drbd_helper or
980
          opts.enabled_hypervisors or opts.hvparams or
981
          opts.beparams or opts.nicparams or
982
          opts.ndparams or opts.diskparams or
983
          opts.candidate_pool_size is not None or
984
          opts.uid_pool is not None or
985
          opts.maintain_node_health is not None or
986
          opts.add_uids is not None or
987
          opts.remove_uids is not None or
988
          opts.default_iallocator is not None or
989
          opts.reserved_lvs is not None or
990
          opts.master_netdev is not None or
991
          opts.master_netmask is not None or
992
          opts.use_external_mip_script is not None or
993
          opts.prealloc_wipe_disks is not None or
994
          opts.hv_state or
995
          opts.enabled_disk_templates or
996
          opts.disk_state or
997
          opts.ipolicy_bounds_specs is not None or
998
          opts.ipolicy_std_specs is not None or
999
          opts.ipolicy_disk_templates is not None or
1000
          opts.ipolicy_vcpu_ratio is not None or
1001
          opts.ipolicy_spindle_ratio is not None or
1002
          opts.modify_etc_hosts is not None or
1003
          opts.file_storage_dir is not None):
1004
    ToStderr("Please give at least one of the parameters.")
1005
    return 1
1006

    
1007
  if _CheckNoLvmStorageOptDeprecated(opts):
1008
    return 1
1009

    
1010
  enabled_disk_templates = None
1011
  if opts.enabled_disk_templates:
1012
    enabled_disk_templates = opts.enabled_disk_templates.split(",")
1013

    
1014
  # consistency between vg name and enabled disk templates
1015
  vg_name = None
1016
  if opts.vg_name is not None:
1017
    vg_name = opts.vg_name
1018
  if enabled_disk_templates:
1019
    if vg_name and not utils.IsLvmEnabled(enabled_disk_templates):
1020
      ToStdout("You specified a volume group with --vg-name, but you did not"
1021
               " enable any of the following lvm-based disk templates: %s" %
1022
               utils.CommaJoin(utils.GetLvmDiskTemplates()))
1023

    
1024
  drbd_helper = opts.drbd_helper
1025
  if not opts.drbd_storage and opts.drbd_helper:
1026
    ToStderr("Options --no-drbd-storage and --drbd-usermode-helper conflict.")
1027
    return 1
1028

    
1029
  if not opts.drbd_storage:
1030
    drbd_helper = ""
1031

    
1032
  hvlist = opts.enabled_hypervisors
1033
  if hvlist is not None:
1034
    hvlist = hvlist.split(",")
1035

    
1036
  # a list of (name, dict) we can pass directly to dict() (or [])
1037
  hvparams = dict(opts.hvparams)
1038
  for hv_params in hvparams.values():
1039
    utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1040

    
1041
  diskparams = dict(opts.diskparams)
1042

    
1043
  for dt_params in diskparams.values():
1044
    utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
1045

    
1046
  beparams = opts.beparams
1047
  utils.ForceDictType(beparams, constants.BES_PARAMETER_COMPAT)
1048

    
1049
  nicparams = opts.nicparams
1050
  utils.ForceDictType(nicparams, constants.NICS_PARAMETER_TYPES)
1051

    
1052
  ndparams = opts.ndparams
1053
  if ndparams is not None:
1054
    utils.ForceDictType(ndparams, constants.NDS_PARAMETER_TYPES)
1055

    
1056
  ipolicy = CreateIPolicyFromOpts(
1057
    minmax_ispecs=opts.ipolicy_bounds_specs,
1058
    std_ispecs=opts.ipolicy_std_specs,
1059
    ipolicy_disk_templates=opts.ipolicy_disk_templates,
1060
    ipolicy_vcpu_ratio=opts.ipolicy_vcpu_ratio,
1061
    ipolicy_spindle_ratio=opts.ipolicy_spindle_ratio,
1062
    )
1063

    
1064
  mnh = opts.maintain_node_health
1065

    
1066
  uid_pool = opts.uid_pool
1067
  if uid_pool is not None:
1068
    uid_pool = uidpool.ParseUidPool(uid_pool)
1069

    
1070
  add_uids = opts.add_uids
1071
  if add_uids is not None:
1072
    add_uids = uidpool.ParseUidPool(add_uids)
1073

    
1074
  remove_uids = opts.remove_uids
1075
  if remove_uids is not None:
1076
    remove_uids = uidpool.ParseUidPool(remove_uids)
1077

    
1078
  if opts.reserved_lvs is not None:
1079
    if opts.reserved_lvs == "":
1080
      opts.reserved_lvs = []
1081
    else:
1082
      opts.reserved_lvs = utils.UnescapeAndSplit(opts.reserved_lvs, sep=",")
1083

    
1084
  if opts.master_netmask is not None:
1085
    try:
1086
      opts.master_netmask = int(opts.master_netmask)
1087
    except ValueError:
1088
      ToStderr("The --master-netmask option expects an int parameter.")
1089
      return 1
1090

    
1091
  ext_ip_script = opts.use_external_mip_script
1092

    
1093
  if opts.disk_state:
1094
    disk_state = utils.FlatToDict(opts.disk_state)
1095
  else:
1096
    disk_state = {}
1097

    
1098
  hv_state = dict(opts.hv_state)
1099

    
1100
  op = opcodes.OpClusterSetParams(
1101
    vg_name=vg_name,
1102
    drbd_helper=drbd_helper,
1103
    enabled_hypervisors=hvlist,
1104
    hvparams=hvparams,
1105
    os_hvp=None,
1106
    beparams=beparams,
1107
    nicparams=nicparams,
1108
    ndparams=ndparams,
1109
    diskparams=diskparams,
1110
    ipolicy=ipolicy,
1111
    candidate_pool_size=opts.candidate_pool_size,
1112
    maintain_node_health=mnh,
1113
    modify_etc_hosts=opts.modify_etc_hosts,
1114
    uid_pool=uid_pool,
1115
    add_uids=add_uids,
1116
    remove_uids=remove_uids,
1117
    default_iallocator=opts.default_iallocator,
1118
    prealloc_wipe_disks=opts.prealloc_wipe_disks,
1119
    master_netdev=opts.master_netdev,
1120
    master_netmask=opts.master_netmask,
1121
    reserved_lvs=opts.reserved_lvs,
1122
    use_external_mip_script=ext_ip_script,
1123
    hv_state=hv_state,
1124
    disk_state=disk_state,
1125
    enabled_disk_templates=enabled_disk_templates,
1126
    force=opts.force,
1127
    file_storage_dir=opts.file_storage_dir,
1128
    )
1129
  SubmitOrSend(op, opts)
1130
  return 0
1131

    
1132

    
1133
def QueueOps(opts, args):
1134
  """Queue operations.
1135

1136
  @param opts: the command line options selected by the user
1137
  @type args: list
1138
  @param args: should contain only one element, the subcommand
1139
  @rtype: int
1140
  @return: the desired exit code
1141

1142
  """
1143
  command = args[0]
1144
  client = GetClient()
1145
  if command in ("drain", "undrain"):
1146
    drain_flag = command == "drain"
1147
    client.SetQueueDrainFlag(drain_flag)
1148
  elif command == "info":
1149
    result = client.QueryConfigValues(["drain_flag"])
1150
    if result[0]:
1151
      val = "set"
1152
    else:
1153
      val = "unset"
1154
    ToStdout("The drain flag is %s" % val)
1155
  else:
1156
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1157
                               errors.ECODE_INVAL)
1158

    
1159
  return 0
1160

    
1161

    
1162
def _ShowWatcherPause(until):
1163
  if until is None or until < time.time():
1164
    ToStdout("The watcher is not paused.")
1165
  else:
1166
    ToStdout("The watcher is paused until %s.", time.ctime(until))
1167

    
1168

    
1169
def WatcherOps(opts, args):
1170
  """Watcher operations.
1171

1172
  @param opts: the command line options selected by the user
1173
  @type args: list
1174
  @param args: should contain only one element, the subcommand
1175
  @rtype: int
1176
  @return: the desired exit code
1177

1178
  """
1179
  command = args[0]
1180
  client = GetClient()
1181

    
1182
  if command == "continue":
1183
    client.SetWatcherPause(None)
1184
    ToStdout("The watcher is no longer paused.")
1185

    
1186
  elif command == "pause":
1187
    if len(args) < 2:
1188
      raise errors.OpPrereqError("Missing pause duration", errors.ECODE_INVAL)
1189

    
1190
    result = client.SetWatcherPause(time.time() + ParseTimespec(args[1]))
1191
    _ShowWatcherPause(result)
1192

    
1193
  elif command == "info":
1194
    result = client.QueryConfigValues(["watcher_pause"])
1195
    _ShowWatcherPause(result[0])
1196

    
1197
  else:
1198
    raise errors.OpPrereqError("Command '%s' is not valid." % command,
1199
                               errors.ECODE_INVAL)
1200

    
1201
  return 0
1202

    
1203

    
1204
def _OobPower(opts, node_list, power):
1205
  """Puts the node in the list to desired power state.
1206

1207
  @param opts: The command line options selected by the user
1208
  @param node_list: The list of nodes to operate on
1209
  @param power: True if they should be powered on, False otherwise
1210
  @return: The success of the operation (none failed)
1211

1212
  """
1213
  if power:
1214
    command = constants.OOB_POWER_ON
1215
  else:
1216
    command = constants.OOB_POWER_OFF
1217

    
1218
  op = opcodes.OpOobCommand(node_names=node_list,
1219
                            command=command,
1220
                            ignore_status=True,
1221
                            timeout=opts.oob_timeout,
1222
                            power_delay=opts.power_delay)
1223
  result = SubmitOpCode(op, opts=opts)
1224
  errs = 0
1225
  for node_result in result:
1226
    (node_tuple, data_tuple) = node_result
1227
    (_, node_name) = node_tuple
1228
    (data_status, _) = data_tuple
1229
    if data_status != constants.RS_NORMAL:
1230
      assert data_status != constants.RS_UNAVAIL
1231
      errs += 1
1232
      ToStderr("There was a problem changing power for %s, please investigate",
1233
               node_name)
1234

    
1235
  if errs > 0:
1236
    return False
1237

    
1238
  return True
1239

    
1240

    
1241
def _InstanceStart(opts, inst_list, start, no_remember=False):
1242
  """Puts the instances in the list to desired state.
1243

1244
  @param opts: The command line options selected by the user
1245
  @param inst_list: The list of instances to operate on
1246
  @param start: True if they should be started, False for shutdown
1247
  @param no_remember: If the instance state should be remembered
1248
  @return: The success of the operation (none failed)
1249

1250
  """
1251
  if start:
1252
    opcls = opcodes.OpInstanceStartup
1253
    text_submit, text_success, text_failed = ("startup", "started", "starting")
1254
  else:
1255
    opcls = compat.partial(opcodes.OpInstanceShutdown,
1256
                           timeout=opts.shutdown_timeout,
1257
                           no_remember=no_remember)
1258
    text_submit, text_success, text_failed = ("shutdown", "stopped", "stopping")
1259

    
1260
  jex = JobExecutor(opts=opts)
1261

    
1262
  for inst in inst_list:
1263
    ToStdout("Submit %s of instance %s", text_submit, inst)
1264
    op = opcls(instance_name=inst)
1265
    jex.QueueJob(inst, op)
1266

    
1267
  results = jex.GetResults()
1268
  bad_cnt = len([1 for (success, _) in results if not success])
1269

    
1270
  if bad_cnt == 0:
1271
    ToStdout("All instances have been %s successfully", text_success)
1272
  else:
1273
    ToStderr("There were errors while %s instances:\n"
1274
             "%d error(s) out of %d instance(s)", text_failed, bad_cnt,
1275
             len(results))
1276
    return False
1277

    
1278
  return True
1279

    
1280

    
1281
class _RunWhenNodesReachableHelper:
1282
  """Helper class to make shared internal state sharing easier.
1283

1284
  @ivar success: Indicates if all action_cb calls were successful
1285

1286
  """
1287
  def __init__(self, node_list, action_cb, node2ip, port, feedback_fn,
1288
               _ping_fn=netutils.TcpPing, _sleep_fn=time.sleep):
1289
    """Init the object.
1290

1291
    @param node_list: The list of nodes to be reachable
1292
    @param action_cb: Callback called when a new host is reachable
1293
    @type node2ip: dict
1294
    @param node2ip: Node to ip mapping
1295
    @param port: The port to use for the TCP ping
1296
    @param feedback_fn: The function used for feedback
1297
    @param _ping_fn: Function to check reachabilty (for unittest use only)
1298
    @param _sleep_fn: Function to sleep (for unittest use only)
1299

1300
    """
1301
    self.down = set(node_list)
1302
    self.up = set()
1303
    self.node2ip = node2ip
1304
    self.success = True
1305
    self.action_cb = action_cb
1306
    self.port = port
1307
    self.feedback_fn = feedback_fn
1308
    self._ping_fn = _ping_fn
1309
    self._sleep_fn = _sleep_fn
1310

    
1311
  def __call__(self):
1312
    """When called we run action_cb.
1313

1314
    @raises utils.RetryAgain: When there are still down nodes
1315

1316
    """
1317
    if not self.action_cb(self.up):
1318
      self.success = False
1319

    
1320
    if self.down:
1321
      raise utils.RetryAgain()
1322
    else:
1323
      return self.success
1324

    
1325
  def Wait(self, secs):
1326
    """Checks if a host is up or waits remaining seconds.
1327

1328
    @param secs: The secs remaining
1329

1330
    """
1331
    start = time.time()
1332
    for node in self.down:
1333
      if self._ping_fn(self.node2ip[node], self.port, timeout=_EPO_PING_TIMEOUT,
1334
                       live_port_needed=True):
1335
        self.feedback_fn("Node %s became available" % node)
1336
        self.up.add(node)
1337
        self.down -= self.up
1338
        # If we have a node available there is the possibility to run the
1339
        # action callback successfully, therefore we don't wait and return
1340
        return
1341

    
1342
    self._sleep_fn(max(0.0, start + secs - time.time()))
1343

    
1344

    
1345
def _RunWhenNodesReachable(node_list, action_cb, interval):
1346
  """Run action_cb when nodes become reachable.
1347

1348
  @param node_list: The list of nodes to be reachable
1349
  @param action_cb: Callback called when a new host is reachable
1350
  @param interval: The earliest time to retry
1351

1352
  """
1353
  client = GetClient()
1354
  cluster_info = client.QueryClusterInfo()
1355
  if cluster_info["primary_ip_version"] == constants.IP4_VERSION:
1356
    family = netutils.IPAddress.family
1357
  else:
1358
    family = netutils.IP6Address.family
1359

    
1360
  node2ip = dict((node, netutils.GetHostname(node, family=family).ip)
1361
                 for node in node_list)
1362

    
1363
  port = netutils.GetDaemonPort(constants.NODED)
1364
  helper = _RunWhenNodesReachableHelper(node_list, action_cb, node2ip, port,
1365
                                        ToStdout)
1366

    
1367
  try:
1368
    return utils.Retry(helper, interval, _EPO_REACHABLE_TIMEOUT,
1369
                       wait_fn=helper.Wait)
1370
  except utils.RetryTimeout:
1371
    ToStderr("Time exceeded while waiting for nodes to become reachable"
1372
             " again:\n  - %s", "  - ".join(helper.down))
1373
    return False
1374

    
1375

    
1376
def _MaybeInstanceStartup(opts, inst_map, nodes_online,
1377
                          _instance_start_fn=_InstanceStart):
1378
  """Start the instances conditional based on node_states.
1379

1380
  @param opts: The command line options selected by the user
1381
  @param inst_map: A dict of inst -> nodes mapping
1382
  @param nodes_online: A list of nodes online
1383
  @param _instance_start_fn: Callback to start instances (unittest use only)
1384
  @return: Success of the operation on all instances
1385

1386
  """
1387
  start_inst_list = []
1388
  for (inst, nodes) in inst_map.items():
1389
    if not (nodes - nodes_online):
1390
      # All nodes the instance lives on are back online
1391
      start_inst_list.append(inst)
1392

    
1393
  for inst in start_inst_list:
1394
    del inst_map[inst]
1395

    
1396
  if start_inst_list:
1397
    return _instance_start_fn(opts, start_inst_list, True)
1398

    
1399
  return True
1400

    
1401

    
1402
def _EpoOn(opts, full_node_list, node_list, inst_map):
1403
  """Does the actual power on.
1404

1405
  @param opts: The command line options selected by the user
1406
  @param full_node_list: All nodes to operate on (includes nodes not supporting
1407
                         OOB)
1408
  @param node_list: The list of nodes to operate on (all need to support OOB)
1409
  @param inst_map: A dict of inst -> nodes mapping
1410
  @return: The desired exit status
1411

1412
  """
1413
  if node_list and not _OobPower(opts, node_list, False):
1414
    ToStderr("Not all nodes seem to get back up, investigate and start"
1415
             " manually if needed")
1416

    
1417
  # Wait for the nodes to be back up
1418
  action_cb = compat.partial(_MaybeInstanceStartup, opts, dict(inst_map))
1419

    
1420
  ToStdout("Waiting until all nodes are available again")
1421
  if not _RunWhenNodesReachable(full_node_list, action_cb, _EPO_PING_INTERVAL):
1422
    ToStderr("Please investigate and start stopped instances manually")
1423
    return constants.EXIT_FAILURE
1424

    
1425
  return constants.EXIT_SUCCESS
1426

    
1427

    
1428
def _EpoOff(opts, node_list, inst_map):
1429
  """Does the actual power off.
1430

1431
  @param opts: The command line options selected by the user
1432
  @param node_list: The list of nodes to operate on (all need to support OOB)
1433
  @param inst_map: A dict of inst -> nodes mapping
1434
  @return: The desired exit status
1435

1436
  """
1437
  if not _InstanceStart(opts, inst_map.keys(), False, no_remember=True):
1438
    ToStderr("Please investigate and stop instances manually before continuing")
1439
    return constants.EXIT_FAILURE
1440

    
1441
  if not node_list:
1442
    return constants.EXIT_SUCCESS
1443

    
1444
  if _OobPower(opts, node_list, False):
1445
    return constants.EXIT_SUCCESS
1446
  else:
1447
    return constants.EXIT_FAILURE
1448

    
1449

    
1450
def Epo(opts, args, cl=None, _on_fn=_EpoOn, _off_fn=_EpoOff,
1451
        _confirm_fn=ConfirmOperation,
1452
        _stdout_fn=ToStdout, _stderr_fn=ToStderr):
1453
  """EPO operations.
1454

1455
  @param opts: the command line options selected by the user
1456
  @type args: list
1457
  @param args: should contain only one element, the subcommand
1458
  @rtype: int
1459
  @return: the desired exit code
1460

1461
  """
1462
  if opts.groups and opts.show_all:
1463
    _stderr_fn("Only one of --groups or --all are allowed")
1464
    return constants.EXIT_FAILURE
1465
  elif args and opts.show_all:
1466
    _stderr_fn("Arguments in combination with --all are not allowed")
1467
    return constants.EXIT_FAILURE
1468

    
1469
  if cl is None:
1470
    cl = GetClient()
1471

    
1472
  if opts.groups:
1473
    node_query_list = \
1474
      itertools.chain(*cl.QueryGroups(args, ["node_list"], False))
1475
  else:
1476
    node_query_list = args
1477

    
1478
  result = cl.QueryNodes(node_query_list, ["name", "master", "pinst_list",
1479
                                           "sinst_list", "powered", "offline"],
1480
                         False)
1481

    
1482
  all_nodes = map(compat.fst, result)
1483
  node_list = []
1484
  inst_map = {}
1485
  for (node, master, pinsts, sinsts, powered, offline) in result:
1486
    if not offline:
1487
      for inst in (pinsts + sinsts):
1488
        if inst in inst_map:
1489
          if not master:
1490
            inst_map[inst].add(node)
1491
        elif master:
1492
          inst_map[inst] = set()
1493
        else:
1494
          inst_map[inst] = set([node])
1495

    
1496
    if master and opts.on:
1497
      # We ignore the master for turning on the machines, in fact we are
1498
      # already operating on the master at this point :)
1499
      continue
1500
    elif master and not opts.show_all:
1501
      _stderr_fn("%s is the master node, please do a master-failover to another"
1502
                 " node not affected by the EPO or use --all if you intend to"
1503
                 " shutdown the whole cluster", node)
1504
      return constants.EXIT_FAILURE
1505
    elif powered is None:
1506
      _stdout_fn("Node %s does not support out-of-band handling, it can not be"
1507
                 " handled in a fully automated manner", node)
1508
    elif powered == opts.on:
1509
      _stdout_fn("Node %s is already in desired power state, skipping", node)
1510
    elif not offline or (offline and powered):
1511
      node_list.append(node)
1512

    
1513
  if not (opts.force or _confirm_fn(all_nodes, "nodes", "epo")):
1514
    return constants.EXIT_FAILURE
1515

    
1516
  if opts.on:
1517
    return _on_fn(opts, all_nodes, node_list, inst_map)
1518
  else:
1519
    return _off_fn(opts, node_list, inst_map)
1520

    
1521

    
1522
def _GetCreateCommand(info):
1523
  buf = StringIO()
1524
  buf.write("gnt-cluster init")
1525
  PrintIPolicyCommand(buf, info["ipolicy"], False)
1526
  buf.write(" ")
1527
  buf.write(info["name"])
1528
  return buf.getvalue()
1529

    
1530

    
1531
def ShowCreateCommand(opts, args):
1532
  """Shows the command that can be used to re-create the cluster.
1533

1534
  Currently it works only for ipolicy specs.
1535

1536
  """
1537
  cl = GetClient(query=True)
1538
  result = cl.QueryClusterInfo()
1539
  ToStdout(_GetCreateCommand(result))
1540

    
1541

    
1542
commands = {
1543
  "init": (
1544
    InitCluster, [ArgHost(min=1, max=1)],
1545
    [BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, GLOBAL_FILEDIR_OPT,
1546
     HVLIST_OPT, MAC_PREFIX_OPT, MASTER_NETDEV_OPT, MASTER_NETMASK_OPT,
1547
     NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, NOMODIFY_ETCHOSTS_OPT,
1548
     NOMODIFY_SSH_SETUP_OPT, SECONDARY_IP_OPT, VG_NAME_OPT,
1549
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, DRBD_HELPER_OPT, NODRBD_STORAGE_OPT,
1550
     DEFAULT_IALLOCATOR_OPT, PRIMARY_IP_VERSION_OPT, PREALLOC_WIPE_DISKS_OPT,
1551
     NODE_PARAMS_OPT, GLOBAL_SHARED_FILEDIR_OPT, USE_EXTERNAL_MIP_SCRIPT,
1552
     DISK_PARAMS_OPT, HV_STATE_OPT, DISK_STATE_OPT, ENABLED_DISK_TEMPLATES_OPT,
1553
     IPOLICY_STD_SPECS_OPT] + INSTANCE_POLICY_OPTS + SPLIT_ISPECS_OPTS,
1554
    "[opts...] <cluster_name>", "Initialises a new cluster configuration"),
1555
  "destroy": (
1556
    DestroyCluster, ARGS_NONE, [YES_DOIT_OPT],
1557
    "", "Destroy cluster"),
1558
  "rename": (
1559
    RenameCluster, [ArgHost(min=1, max=1)],
1560
    [FORCE_OPT, DRY_RUN_OPT],
1561
    "<new_name>",
1562
    "Renames the cluster"),
1563
  "redist-conf": (
1564
    RedistributeConfig, ARGS_NONE, SUBMIT_OPTS + [DRY_RUN_OPT, PRIORITY_OPT],
1565
    "", "Forces a push of the configuration file and ssconf files"
1566
    " to the nodes in the cluster"),
1567
  "verify": (
1568
    VerifyCluster, ARGS_NONE,
1569
    [VERBOSE_OPT, DEBUG_SIMERR_OPT, ERROR_CODES_OPT, NONPLUS1_OPT,
1570
     DRY_RUN_OPT, PRIORITY_OPT, NODEGROUP_OPT, IGNORE_ERRORS_OPT],
1571
    "", "Does a check on the cluster configuration"),
1572
  "verify-disks": (
1573
    VerifyDisks, ARGS_NONE, [PRIORITY_OPT],
1574
    "", "Does a check on the cluster disk status"),
1575
  "repair-disk-sizes": (
1576
    RepairDiskSizes, ARGS_MANY_INSTANCES, [DRY_RUN_OPT, PRIORITY_OPT],
1577
    "[instance...]", "Updates mismatches in recorded disk sizes"),
1578
  "master-failover": (
1579
    MasterFailover, ARGS_NONE, [NOVOTING_OPT, FORCE_FAILOVER],
1580
    "", "Makes the current node the master"),
1581
  "master-ping": (
1582
    MasterPing, ARGS_NONE, [],
1583
    "", "Checks if the master is alive"),
1584
  "version": (
1585
    ShowClusterVersion, ARGS_NONE, [],
1586
    "", "Shows the cluster version"),
1587
  "getmaster": (
1588
    ShowClusterMaster, ARGS_NONE, [],
1589
    "", "Shows the cluster master"),
1590
  "copyfile": (
1591
    ClusterCopyFile, [ArgFile(min=1, max=1)],
1592
    [NODE_LIST_OPT, USE_REPL_NET_OPT, NODEGROUP_OPT],
1593
    "[-n node...] <filename>", "Copies a file to all (or only some) nodes"),
1594
  "command": (
1595
    RunClusterCommand, [ArgCommand(min=1)],
1596
    [NODE_LIST_OPT, NODEGROUP_OPT, SHOW_MACHINE_OPT, FAILURE_ONLY_OPT],
1597
    "[-n node...] <command>", "Runs a command on all (or only some) nodes"),
1598
  "info": (
1599
    ShowClusterConfig, ARGS_NONE, [ROMAN_OPT],
1600
    "[--roman]", "Show cluster configuration"),
1601
  "list-tags": (
1602
    ListTags, ARGS_NONE, [], "", "List the tags of the cluster"),
1603
  "add-tags": (
1604
    AddTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
1605
    "tag...", "Add tags to the cluster"),
1606
  "remove-tags": (
1607
    RemoveTags, [ArgUnknown()], [TAG_SRC_OPT, PRIORITY_OPT] + SUBMIT_OPTS,
1608
    "tag...", "Remove tags from the cluster"),
1609
  "search-tags": (
1610
    SearchTags, [ArgUnknown(min=1, max=1)], [PRIORITY_OPT], "",
1611
    "Searches the tags on all objects on"
1612
    " the cluster for a given pattern (regex)"),
1613
  "queue": (
1614
    QueueOps,
1615
    [ArgChoice(min=1, max=1, choices=["drain", "undrain", "info"])],
1616
    [], "drain|undrain|info", "Change queue properties"),
1617
  "watcher": (
1618
    WatcherOps,
1619
    [ArgChoice(min=1, max=1, choices=["pause", "continue", "info"]),
1620
     ArgSuggest(min=0, max=1, choices=["30m", "1h", "4h"])],
1621
    [],
1622
    "{pause <timespec>|continue|info}", "Change watcher properties"),
1623
  "modify": (
1624
    SetClusterParams, ARGS_NONE,
1625
    [FORCE_OPT,
1626
     BACKEND_OPT, CP_SIZE_OPT, ENABLED_HV_OPT, HVLIST_OPT, MASTER_NETDEV_OPT,
1627
     MASTER_NETMASK_OPT, NIC_PARAMS_OPT, NOLVM_STORAGE_OPT, VG_NAME_OPT,
1628
     MAINTAIN_NODE_HEALTH_OPT, UIDPOOL_OPT, ADD_UIDS_OPT, REMOVE_UIDS_OPT,
1629
     DRBD_HELPER_OPT, NODRBD_STORAGE_OPT, DEFAULT_IALLOCATOR_OPT,
1630
     RESERVED_LVS_OPT, DRY_RUN_OPT, PRIORITY_OPT, PREALLOC_WIPE_DISKS_OPT,
1631
     NODE_PARAMS_OPT, USE_EXTERNAL_MIP_SCRIPT, DISK_PARAMS_OPT, HV_STATE_OPT,
1632
     DISK_STATE_OPT] + SUBMIT_OPTS +
1633
     [ENABLED_DISK_TEMPLATES_OPT, IPOLICY_STD_SPECS_OPT, MODIFY_ETCHOSTS_OPT] +
1634
     INSTANCE_POLICY_OPTS + [GLOBAL_FILEDIR_OPT],
1635
    "[opts...]",
1636
    "Alters the parameters of the cluster"),
1637
  "renew-crypto": (
1638
    RenewCrypto, ARGS_NONE,
1639
    [NEW_CLUSTER_CERT_OPT, NEW_RAPI_CERT_OPT, RAPI_CERT_OPT,
1640
     NEW_CONFD_HMAC_KEY_OPT, FORCE_OPT,
1641
     NEW_CLUSTER_DOMAIN_SECRET_OPT, CLUSTER_DOMAIN_SECRET_OPT,
1642
     NEW_SPICE_CERT_OPT, SPICE_CERT_OPT, SPICE_CACERT_OPT],
1643
    "[opts...]",
1644
    "Renews cluster certificates, keys and secrets"),
1645
  "epo": (
1646
    Epo, [ArgUnknown()],
1647
    [FORCE_OPT, ON_OPT, GROUPS_OPT, ALL_OPT, OOB_TIMEOUT_OPT,
1648
     SHUTDOWN_TIMEOUT_OPT, POWER_DELAY_OPT],
1649
    "[opts...] [args]",
1650
    "Performs an emergency power-off on given args"),
1651
  "activate-master-ip": (
1652
    ActivateMasterIp, ARGS_NONE, [], "", "Activates the master IP"),
1653
  "deactivate-master-ip": (
1654
    DeactivateMasterIp, ARGS_NONE, [CONFIRM_OPT], "",
1655
    "Deactivates the master IP"),
1656
  "show-ispecs-cmd": (
1657
    ShowCreateCommand, ARGS_NONE, [], "",
1658
    "Show the command line to re-create the cluster"),
1659
  }
1660

    
1661

    
1662
#: dictionary with aliases for commands
1663
aliases = {
1664
  "masterfailover": "master-failover",
1665
  "show": "info",
1666
}
1667

    
1668

    
1669
def Main():
1670
  return GenericMain(commands, override={"tag_type": constants.TAG_CLUSTER},
1671
                     aliases=aliases)