Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ ff6c5e55

History | View | Annotate | Download (71.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
# pylint: disable=R0904
35
# R0904: Too many public methods
36

    
37
import os
38
import random
39
import logging
40
import time
41
import itertools
42

    
43
from ganeti import errors
44
from ganeti import locking
45
from ganeti import utils
46
from ganeti import constants
47
from ganeti import rpc
48
from ganeti import objects
49
from ganeti import serializer
50
from ganeti import uidpool
51
from ganeti import netutils
52
from ganeti import runtime
53

    
54

    
55
_config_lock = locking.SharedLock("ConfigWriter")
56

    
57
# job id used for resource management at config upgrade time
58
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
59

    
60

    
61
def _ValidateConfig(data):
62
  """Verifies that a configuration objects looks valid.
63

64
  This only verifies the version of the configuration.
65

66
  @raise errors.ConfigurationError: if the version differs from what
67
      we expect
68

69
  """
70
  if data.version != constants.CONFIG_VERSION:
71
    raise errors.ConfigVersionMismatch(constants.CONFIG_VERSION, data.version)
72

    
73

    
74
class TemporaryReservationManager:
75
  """A temporary resource reservation manager.
76

77
  This is used to reserve resources in a job, before using them, making sure
78
  other jobs cannot get them in the meantime.
79

80
  """
81
  def __init__(self):
82
    self._ec_reserved = {}
83

    
84
  def Reserved(self, resource):
85
    for holder_reserved in self._ec_reserved.values():
86
      if resource in holder_reserved:
87
        return True
88
    return False
89

    
90
  def Reserve(self, ec_id, resource):
91
    if self.Reserved(resource):
92
      raise errors.ReservationError("Duplicate reservation for resource '%s'"
93
                                    % str(resource))
94
    if ec_id not in self._ec_reserved:
95
      self._ec_reserved[ec_id] = set([resource])
96
    else:
97
      self._ec_reserved[ec_id].add(resource)
98

    
99
  def DropECReservations(self, ec_id):
100
    if ec_id in self._ec_reserved:
101
      del self._ec_reserved[ec_id]
102

    
103
  def GetReserved(self):
104
    all_reserved = set()
105
    for holder_reserved in self._ec_reserved.values():
106
      all_reserved.update(holder_reserved)
107
    return all_reserved
108

    
109
  def Generate(self, existing, generate_one_fn, ec_id):
110
    """Generate a new resource of this type
111

112
    """
113
    assert callable(generate_one_fn)
114

    
115
    all_elems = self.GetReserved()
116
    all_elems.update(existing)
117
    retries = 64
118
    while retries > 0:
119
      new_resource = generate_one_fn()
120
      if new_resource is not None and new_resource not in all_elems:
121
        break
122
    else:
123
      raise errors.ConfigurationError("Not able generate new resource"
124
                                      " (last tried: %s)" % new_resource)
125
    self.Reserve(ec_id, new_resource)
126
    return new_resource
127

    
128

    
129
def _MatchNameComponentIgnoreCase(short_name, names):
130
  """Wrapper around L{utils.text.MatchNameComponent}.
131

132
  """
133
  return utils.MatchNameComponent(short_name, names, case_sensitive=False)
134

    
135

    
136
class ConfigWriter:
137
  """The interface to the cluster configuration.
138

139
  @ivar _temporary_lvs: reservation manager for temporary LVs
140
  @ivar _all_rms: a list of all temporary reservation managers
141

142
  """
143
  def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts,
144
               accept_foreign=False):
145
    self.write_count = 0
146
    self._lock = _config_lock
147
    self._config_data = None
148
    self._offline = offline
149
    if cfg_file is None:
150
      self._cfg_file = constants.CLUSTER_CONF_FILE
151
    else:
152
      self._cfg_file = cfg_file
153
    self._getents = _getents
154
    self._temporary_ids = TemporaryReservationManager()
155
    self._temporary_drbds = {}
156
    self._temporary_macs = TemporaryReservationManager()
157
    self._temporary_secrets = TemporaryReservationManager()
158
    self._temporary_lvs = TemporaryReservationManager()
159
    self._all_rms = [self._temporary_ids, self._temporary_macs,
160
                     self._temporary_secrets, self._temporary_lvs]
161
    # Note: in order to prevent errors when resolving our name in
162
    # _DistributeConfig, we compute it here once and reuse it; it's
163
    # better to raise an error before starting to modify the config
164
    # file than after it was modified
165
    self._my_hostname = netutils.Hostname.GetSysName()
166
    self._last_cluster_serial = -1
167
    self._cfg_id = None
168
    self._context = None
169
    self._OpenConfig(accept_foreign)
170

    
171
  def _GetRpc(self, address_list):
172
    """Returns RPC runner for configuration.
173

174
    """
175
    return rpc.ConfigRunner(self._context, address_list)
176

    
177
  def SetContext(self, context):
178
    """Sets Ganeti context.
179

180
    """
181
    self._context = context
182

    
183
  # this method needs to be static, so that we can call it on the class
184
  @staticmethod
185
  def IsCluster():
186
    """Check if the cluster is configured.
187

188
    """
189
    return os.path.exists(constants.CLUSTER_CONF_FILE)
190

    
191
  def _GenerateOneMAC(self):
192
    """Generate one mac address
193

194
    """
195
    prefix = self._config_data.cluster.mac_prefix
196
    byte1 = random.randrange(0, 256)
197
    byte2 = random.randrange(0, 256)
198
    byte3 = random.randrange(0, 256)
199
    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
200
    return mac
201

    
202
  @locking.ssynchronized(_config_lock, shared=1)
203
  def GetNdParams(self, node):
204
    """Get the node params populated with cluster defaults.
205

206
    @type node: L{objects.Node}
207
    @param node: The node we want to know the params for
208
    @return: A dict with the filled in node params
209

210
    """
211
    nodegroup = self._UnlockedGetNodeGroup(node.group)
212
    return self._config_data.cluster.FillND(node, nodegroup)
213

    
214
  @locking.ssynchronized(_config_lock, shared=1)
215
  def GenerateMAC(self, ec_id):
216
    """Generate a MAC for an instance.
217

218
    This should check the current instances for duplicates.
219

220
    """
221
    existing = self._AllMACs()
222
    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
223

    
224
  @locking.ssynchronized(_config_lock, shared=1)
225
  def ReserveMAC(self, mac, ec_id):
226
    """Reserve a MAC for an instance.
227

228
    This only checks instances managed by this cluster, it does not
229
    check for potential collisions elsewhere.
230

231
    """
232
    all_macs = self._AllMACs()
233
    if mac in all_macs:
234
      raise errors.ReservationError("mac already in use")
235
    else:
236
      self._temporary_macs.Reserve(ec_id, mac)
237

    
238
  @locking.ssynchronized(_config_lock, shared=1)
239
  def ReserveLV(self, lv_name, ec_id):
240
    """Reserve an VG/LV pair for an instance.
241

242
    @type lv_name: string
243
    @param lv_name: the logical volume name to reserve
244

245
    """
246
    all_lvs = self._AllLVs()
247
    if lv_name in all_lvs:
248
      raise errors.ReservationError("LV already in use")
249
    else:
250
      self._temporary_lvs.Reserve(ec_id, lv_name)
251

    
252
  @locking.ssynchronized(_config_lock, shared=1)
253
  def GenerateDRBDSecret(self, ec_id):
254
    """Generate a DRBD secret.
255

256
    This checks the current disks for duplicates.
257

258
    """
259
    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
260
                                            utils.GenerateSecret,
261
                                            ec_id)
262

    
263
  def _AllLVs(self):
264
    """Compute the list of all LVs.
265

266
    """
267
    lvnames = set()
268
    for instance in self._config_data.instances.values():
269
      node_data = instance.MapLVsByNode()
270
      for lv_list in node_data.values():
271
        lvnames.update(lv_list)
272
    return lvnames
273

    
274
  def _AllIDs(self, include_temporary):
275
    """Compute the list of all UUIDs and names we have.
276

277
    @type include_temporary: boolean
278
    @param include_temporary: whether to include the _temporary_ids set
279
    @rtype: set
280
    @return: a set of IDs
281

282
    """
283
    existing = set()
284
    if include_temporary:
285
      existing.update(self._temporary_ids.GetReserved())
286
    existing.update(self._AllLVs())
287
    existing.update(self._config_data.instances.keys())
288
    existing.update(self._config_data.nodes.keys())
289
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
290
    return existing
291

    
292
  def _GenerateUniqueID(self, ec_id):
293
    """Generate an unique UUID.
294

295
    This checks the current node, instances and disk names for
296
    duplicates.
297

298
    @rtype: string
299
    @return: the unique id
300

301
    """
302
    existing = self._AllIDs(include_temporary=False)
303
    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
304

    
305
  @locking.ssynchronized(_config_lock, shared=1)
306
  def GenerateUniqueID(self, ec_id):
307
    """Generate an unique ID.
308

309
    This is just a wrapper over the unlocked version.
310

311
    @type ec_id: string
312
    @param ec_id: unique id for the job to reserve the id to
313

314
    """
315
    return self._GenerateUniqueID(ec_id)
316

    
317
  def _AllMACs(self):
318
    """Return all MACs present in the config.
319

320
    @rtype: list
321
    @return: the list of all MACs
322

323
    """
324
    result = []
325
    for instance in self._config_data.instances.values():
326
      for nic in instance.nics:
327
        result.append(nic.mac)
328

    
329
    return result
330

    
331
  def _AllDRBDSecrets(self):
332
    """Return all DRBD secrets present in the config.
333

334
    @rtype: list
335
    @return: the list of all DRBD secrets
336

337
    """
338
    def helper(disk, result):
339
      """Recursively gather secrets from this disk."""
340
      if disk.dev_type == constants.DT_DRBD8:
341
        result.append(disk.logical_id[5])
342
      if disk.children:
343
        for child in disk.children:
344
          helper(child, result)
345

    
346
    result = []
347
    for instance in self._config_data.instances.values():
348
      for disk in instance.disks:
349
        helper(disk, result)
350

    
351
    return result
352

    
353
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
354
    """Compute duplicate disk IDs
355

356
    @type disk: L{objects.Disk}
357
    @param disk: the disk at which to start searching
358
    @type l_ids: list
359
    @param l_ids: list of current logical ids
360
    @type p_ids: list
361
    @param p_ids: list of current physical ids
362
    @rtype: list
363
    @return: a list of error messages
364

365
    """
366
    result = []
367
    if disk.logical_id is not None:
368
      if disk.logical_id in l_ids:
369
        result.append("duplicate logical id %s" % str(disk.logical_id))
370
      else:
371
        l_ids.append(disk.logical_id)
372
    if disk.physical_id is not None:
373
      if disk.physical_id in p_ids:
374
        result.append("duplicate physical id %s" % str(disk.physical_id))
375
      else:
376
        p_ids.append(disk.physical_id)
377

    
378
    if disk.children:
379
      for child in disk.children:
380
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
381
    return result
382

    
383
  def _UnlockedVerifyConfig(self):
384
    """Verify function.
385

386
    @rtype: list
387
    @return: a list of error messages; a non-empty list signifies
388
        configuration errors
389

390
    """
391
    # pylint: disable=R0914
392
    result = []
393
    seen_macs = []
394
    ports = {}
395
    data = self._config_data
396
    cluster = data.cluster
397
    seen_lids = []
398
    seen_pids = []
399

    
400
    # global cluster checks
401
    if not cluster.enabled_hypervisors:
402
      result.append("enabled hypervisors list doesn't have any entries")
403
    invalid_hvs = set(cluster.enabled_hypervisors) - constants.HYPER_TYPES
404
    if invalid_hvs:
405
      result.append("enabled hypervisors contains invalid entries: %s" %
406
                    invalid_hvs)
407
    missing_hvp = (set(cluster.enabled_hypervisors) -
408
                   set(cluster.hvparams.keys()))
409
    if missing_hvp:
410
      result.append("hypervisor parameters missing for the enabled"
411
                    " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
412

    
413
    if cluster.master_node not in data.nodes:
414
      result.append("cluster has invalid primary node '%s'" %
415
                    cluster.master_node)
416

    
417
    def _helper(owner, attr, value, template):
418
      try:
419
        utils.ForceDictType(value, template)
420
      except errors.GenericError, err:
421
        result.append("%s has invalid %s: %s" % (owner, attr, err))
422

    
423
    def _helper_nic(owner, params):
424
      try:
425
        objects.NIC.CheckParameterSyntax(params)
426
      except errors.ConfigurationError, err:
427
        result.append("%s has invalid nicparams: %s" % (owner, err))
428

    
429
    def _helper_ipolicy(owner, params):
430
      try:
431
        objects.InstancePolicy.CheckParameterSyntax(params)
432
      except errors.ConfigurationError, err:
433
        result.append("%s has invalid instance policy: %s" % (owner, err))
434

    
435
    def _helper_ispecs(owner, params):
436
      for key, value in params.items():
437
        if key in constants.IPOLICY_ISPECS:
438
          fullkey = "ipolicy/" + key
439
          _helper(owner, fullkey, value, constants.ISPECS_PARAMETER_TYPES)
440
        else:
441
          # FIXME: assuming list type
442
          if key in constants.IPOLICY_PARAMETERS:
443
            exp_type = float
444
          else:
445
            exp_type = list
446
          if not isinstance(value, exp_type):
447
            result.append("%s has invalid instance policy: for %s,"
448
                          " expecting %s, got %s" %
449
                          (owner, key, exp_type.__name__, type(value)))
450

    
451
    # check cluster parameters
452
    _helper("cluster", "beparams", cluster.SimpleFillBE({}),
453
            constants.BES_PARAMETER_TYPES)
454
    _helper("cluster", "nicparams", cluster.SimpleFillNIC({}),
455
            constants.NICS_PARAMETER_TYPES)
456
    _helper_nic("cluster", cluster.SimpleFillNIC({}))
457
    _helper("cluster", "ndparams", cluster.SimpleFillND({}),
458
            constants.NDS_PARAMETER_TYPES)
459
    _helper_ipolicy("cluster", cluster.SimpleFillIPolicy({}))
460
    _helper_ispecs("cluster", cluster.SimpleFillIPolicy({}))
461

    
462
    # per-instance checks
463
    for instance_name in data.instances:
464
      instance = data.instances[instance_name]
465
      if instance.name != instance_name:
466
        result.append("instance '%s' is indexed by wrong name '%s'" %
467
                      (instance.name, instance_name))
468
      if instance.primary_node not in data.nodes:
469
        result.append("instance '%s' has invalid primary node '%s'" %
470
                      (instance_name, instance.primary_node))
471
      for snode in instance.secondary_nodes:
472
        if snode not in data.nodes:
473
          result.append("instance '%s' has invalid secondary node '%s'" %
474
                        (instance_name, snode))
475
      for idx, nic in enumerate(instance.nics):
476
        if nic.mac in seen_macs:
477
          result.append("instance '%s' has NIC %d mac %s duplicate" %
478
                        (instance_name, idx, nic.mac))
479
        else:
480
          seen_macs.append(nic.mac)
481
        if nic.nicparams:
482
          filled = cluster.SimpleFillNIC(nic.nicparams)
483
          owner = "instance %s nic %d" % (instance.name, idx)
484
          _helper(owner, "nicparams",
485
                  filled, constants.NICS_PARAMETER_TYPES)
486
          _helper_nic(owner, filled)
487

    
488
      # parameter checks
489
      if instance.beparams:
490
        _helper("instance %s" % instance.name, "beparams",
491
                cluster.FillBE(instance), constants.BES_PARAMETER_TYPES)
492

    
493
      # gather the drbd ports for duplicate checks
494
      for dsk in instance.disks:
495
        if dsk.dev_type in constants.LDS_DRBD:
496
          tcp_port = dsk.logical_id[2]
497
          if tcp_port not in ports:
498
            ports[tcp_port] = []
499
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
500
      # gather network port reservation
501
      net_port = getattr(instance, "network_port", None)
502
      if net_port is not None:
503
        if net_port not in ports:
504
          ports[net_port] = []
505
        ports[net_port].append((instance.name, "network port"))
506

    
507
      # instance disk verify
508
      for idx, disk in enumerate(instance.disks):
509
        result.extend(["instance '%s' disk %d error: %s" %
510
                       (instance.name, idx, msg) for msg in disk.Verify()])
511
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
512

    
513
    # cluster-wide pool of free ports
514
    for free_port in cluster.tcpudp_port_pool:
515
      if free_port not in ports:
516
        ports[free_port] = []
517
      ports[free_port].append(("cluster", "port marked as free"))
518

    
519
    # compute tcp/udp duplicate ports
520
    keys = ports.keys()
521
    keys.sort()
522
    for pnum in keys:
523
      pdata = ports[pnum]
524
      if len(pdata) > 1:
525
        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
526
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
527

    
528
    # highest used tcp port check
529
    if keys:
530
      if keys[-1] > cluster.highest_used_port:
531
        result.append("Highest used port mismatch, saved %s, computed %s" %
532
                      (cluster.highest_used_port, keys[-1]))
533

    
534
    if not data.nodes[cluster.master_node].master_candidate:
535
      result.append("Master node is not a master candidate")
536

    
537
    # master candidate checks
538
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
539
    if mc_now < mc_max:
540
      result.append("Not enough master candidates: actual %d, target %d" %
541
                    (mc_now, mc_max))
542

    
543
    # node checks
544
    for node_name, node in data.nodes.items():
545
      if node.name != node_name:
546
        result.append("Node '%s' is indexed by wrong name '%s'" %
547
                      (node.name, node_name))
548
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
549
        result.append("Node %s state is invalid: master_candidate=%s,"
550
                      " drain=%s, offline=%s" %
551
                      (node.name, node.master_candidate, node.drained,
552
                       node.offline))
553
      if node.group not in data.nodegroups:
554
        result.append("Node '%s' has invalid group '%s'" %
555
                      (node.name, node.group))
556
      else:
557
        _helper("node %s" % node.name, "ndparams",
558
                cluster.FillND(node, data.nodegroups[node.group]),
559
                constants.NDS_PARAMETER_TYPES)
560

    
561
    # nodegroups checks
562
    nodegroups_names = set()
563
    for nodegroup_uuid in data.nodegroups:
564
      nodegroup = data.nodegroups[nodegroup_uuid]
565
      if nodegroup.uuid != nodegroup_uuid:
566
        result.append("node group '%s' (uuid: '%s') indexed by wrong uuid '%s'"
567
                      % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
568
      if utils.UUID_RE.match(nodegroup.name.lower()):
569
        result.append("node group '%s' (uuid: '%s') has uuid-like name" %
570
                      (nodegroup.name, nodegroup.uuid))
571
      if nodegroup.name in nodegroups_names:
572
        result.append("duplicate node group name '%s'" % nodegroup.name)
573
      else:
574
        nodegroups_names.add(nodegroup.name)
575
      group_name = "group %s" % nodegroup.name
576
      _helper_ipolicy(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy))
577
      _helper_ispecs(group_name, cluster.SimpleFillIPolicy(nodegroup.ipolicy))
578
      if nodegroup.ndparams:
579
        _helper(group_name, "ndparams",
580
                cluster.SimpleFillND(nodegroup.ndparams),
581
                constants.NDS_PARAMETER_TYPES)
582

    
583
    # drbd minors check
584
    _, duplicates = self._UnlockedComputeDRBDMap()
585
    for node, minor, instance_a, instance_b in duplicates:
586
      result.append("DRBD minor %d on node %s is assigned twice to instances"
587
                    " %s and %s" % (minor, node, instance_a, instance_b))
588

    
589
    # IP checks
590
    default_nicparams = cluster.nicparams[constants.PP_DEFAULT]
591
    ips = {}
592

    
593
    def _AddIpAddress(ip, name):
594
      ips.setdefault(ip, []).append(name)
595

    
596
    _AddIpAddress(cluster.master_ip, "cluster_ip")
597

    
598
    for node in data.nodes.values():
599
      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
600
      if node.secondary_ip != node.primary_ip:
601
        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
602

    
603
    for instance in data.instances.values():
604
      for idx, nic in enumerate(instance.nics):
605
        if nic.ip is None:
606
          continue
607

    
608
        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
609
        nic_mode = nicparams[constants.NIC_MODE]
610
        nic_link = nicparams[constants.NIC_LINK]
611

    
612
        if nic_mode == constants.NIC_MODE_BRIDGED:
613
          link = "bridge:%s" % nic_link
614
        elif nic_mode == constants.NIC_MODE_ROUTED:
615
          link = "route:%s" % nic_link
616
        else:
617
          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
618

    
619
        _AddIpAddress("%s/%s" % (link, nic.ip),
620
                      "instance:%s/nic:%d" % (instance.name, idx))
621

    
622
    for ip, owners in ips.items():
623
      if len(owners) > 1:
624
        result.append("IP address %s is used by multiple owners: %s" %
625
                      (ip, utils.CommaJoin(owners)))
626

    
627
    return result
628

    
629
  @locking.ssynchronized(_config_lock, shared=1)
630
  def VerifyConfig(self):
631
    """Verify function.
632

633
    This is just a wrapper over L{_UnlockedVerifyConfig}.
634

635
    @rtype: list
636
    @return: a list of error messages; a non-empty list signifies
637
        configuration errors
638

639
    """
640
    return self._UnlockedVerifyConfig()
641

    
642
  def _UnlockedSetDiskID(self, disk, node_name):
643
    """Convert the unique ID to the ID needed on the target nodes.
644

645
    This is used only for drbd, which needs ip/port configuration.
646

647
    The routine descends down and updates its children also, because
648
    this helps when the only the top device is passed to the remote
649
    node.
650

651
    This function is for internal use, when the config lock is already held.
652

653
    """
654
    if disk.children:
655
      for child in disk.children:
656
        self._UnlockedSetDiskID(child, node_name)
657

    
658
    if disk.logical_id is None and disk.physical_id is not None:
659
      return
660
    if disk.dev_type == constants.LD_DRBD8:
661
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
662
      if node_name not in (pnode, snode):
663
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
664
                                        node_name)
665
      pnode_info = self._UnlockedGetNodeInfo(pnode)
666
      snode_info = self._UnlockedGetNodeInfo(snode)
667
      if pnode_info is None or snode_info is None:
668
        raise errors.ConfigurationError("Can't find primary or secondary node"
669
                                        " for %s" % str(disk))
670
      p_data = (pnode_info.secondary_ip, port)
671
      s_data = (snode_info.secondary_ip, port)
672
      if pnode == node_name:
673
        disk.physical_id = p_data + s_data + (pminor, secret)
674
      else: # it must be secondary, we tested above
675
        disk.physical_id = s_data + p_data + (sminor, secret)
676
    else:
677
      disk.physical_id = disk.logical_id
678
    return
679

    
680
  @locking.ssynchronized(_config_lock)
681
  def SetDiskID(self, disk, node_name):
682
    """Convert the unique ID to the ID needed on the target nodes.
683

684
    This is used only for drbd, which needs ip/port configuration.
685

686
    The routine descends down and updates its children also, because
687
    this helps when the only the top device is passed to the remote
688
    node.
689

690
    """
691
    return self._UnlockedSetDiskID(disk, node_name)
692

    
693
  @locking.ssynchronized(_config_lock)
694
  def AddTcpUdpPort(self, port):
695
    """Adds a new port to the available port pool.
696

697
    """
698
    if not isinstance(port, int):
699
      raise errors.ProgrammerError("Invalid type passed for port")
700

    
701
    self._config_data.cluster.tcpudp_port_pool.add(port)
702
    self._WriteConfig()
703

    
704
  @locking.ssynchronized(_config_lock, shared=1)
705
  def GetPortList(self):
706
    """Returns a copy of the current port list.
707

708
    """
709
    return self._config_data.cluster.tcpudp_port_pool.copy()
710

    
711
  @locking.ssynchronized(_config_lock)
712
  def AllocatePort(self):
713
    """Allocate a port.
714

715
    The port will be taken from the available port pool or from the
716
    default port range (and in this case we increase
717
    highest_used_port).
718

719
    """
720
    # If there are TCP/IP ports configured, we use them first.
721
    if self._config_data.cluster.tcpudp_port_pool:
722
      port = self._config_data.cluster.tcpudp_port_pool.pop()
723
    else:
724
      port = self._config_data.cluster.highest_used_port + 1
725
      if port >= constants.LAST_DRBD_PORT:
726
        raise errors.ConfigurationError("The highest used port is greater"
727
                                        " than %s. Aborting." %
728
                                        constants.LAST_DRBD_PORT)
729
      self._config_data.cluster.highest_used_port = port
730

    
731
    self._WriteConfig()
732
    return port
733

    
734
  def _UnlockedComputeDRBDMap(self):
735
    """Compute the used DRBD minor/nodes.
736

737
    @rtype: (dict, list)
738
    @return: dictionary of node_name: dict of minor: instance_name;
739
        the returned dict will have all the nodes in it (even if with
740
        an empty list), and a list of duplicates; if the duplicates
741
        list is not empty, the configuration is corrupted and its caller
742
        should raise an exception
743

744
    """
745
    def _AppendUsedPorts(instance_name, disk, used):
746
      duplicates = []
747
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
748
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
749
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
750
          assert node in used, ("Node '%s' of instance '%s' not found"
751
                                " in node list" % (node, instance_name))
752
          if port in used[node]:
753
            duplicates.append((node, port, instance_name, used[node][port]))
754
          else:
755
            used[node][port] = instance_name
756
      if disk.children:
757
        for child in disk.children:
758
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
759
      return duplicates
760

    
761
    duplicates = []
762
    my_dict = dict((node, {}) for node in self._config_data.nodes)
763
    for instance in self._config_data.instances.itervalues():
764
      for disk in instance.disks:
765
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
766
    for (node, minor), instance in self._temporary_drbds.iteritems():
767
      if minor in my_dict[node] and my_dict[node][minor] != instance:
768
        duplicates.append((node, minor, instance, my_dict[node][minor]))
769
      else:
770
        my_dict[node][minor] = instance
771
    return my_dict, duplicates
772

    
773
  @locking.ssynchronized(_config_lock)
774
  def ComputeDRBDMap(self):
775
    """Compute the used DRBD minor/nodes.
776

777
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
778

779
    @return: dictionary of node_name: dict of minor: instance_name;
780
        the returned dict will have all the nodes in it (even if with
781
        an empty list).
782

783
    """
784
    d_map, duplicates = self._UnlockedComputeDRBDMap()
785
    if duplicates:
786
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
787
                                      str(duplicates))
788
    return d_map
789

    
790
  @locking.ssynchronized(_config_lock)
791
  def AllocateDRBDMinor(self, nodes, instance):
792
    """Allocate a drbd minor.
793

794
    The free minor will be automatically computed from the existing
795
    devices. A node can be given multiple times in order to allocate
796
    multiple minors. The result is the list of minors, in the same
797
    order as the passed nodes.
798

799
    @type instance: string
800
    @param instance: the instance for which we allocate minors
801

802
    """
803
    assert isinstance(instance, basestring), \
804
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
805

    
806
    d_map, duplicates = self._UnlockedComputeDRBDMap()
807
    if duplicates:
808
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
809
                                      str(duplicates))
810
    result = []
811
    for nname in nodes:
812
      ndata = d_map[nname]
813
      if not ndata:
814
        # no minors used, we can start at 0
815
        result.append(0)
816
        ndata[0] = instance
817
        self._temporary_drbds[(nname, 0)] = instance
818
        continue
819
      keys = ndata.keys()
820
      keys.sort()
821
      ffree = utils.FirstFree(keys)
822
      if ffree is None:
823
        # return the next minor
824
        # TODO: implement high-limit check
825
        minor = keys[-1] + 1
826
      else:
827
        minor = ffree
828
      # double-check minor against current instances
829
      assert minor not in d_map[nname], \
830
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
831
              " already allocated to instance %s" %
832
              (minor, nname, d_map[nname][minor]))
833
      ndata[minor] = instance
834
      # double-check minor against reservation
835
      r_key = (nname, minor)
836
      assert r_key not in self._temporary_drbds, \
837
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
838
              " reserved for instance %s" %
839
              (minor, nname, self._temporary_drbds[r_key]))
840
      self._temporary_drbds[r_key] = instance
841
      result.append(minor)
842
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
843
                  nodes, result)
844
    return result
845

    
846
  def _UnlockedReleaseDRBDMinors(self, instance):
847
    """Release temporary drbd minors allocated for a given instance.
848

849
    @type instance: string
850
    @param instance: the instance for which temporary minors should be
851
                     released
852

853
    """
854
    assert isinstance(instance, basestring), \
855
           "Invalid argument passed to ReleaseDRBDMinors"
856
    for key, name in self._temporary_drbds.items():
857
      if name == instance:
858
        del self._temporary_drbds[key]
859

    
860
  @locking.ssynchronized(_config_lock)
861
  def ReleaseDRBDMinors(self, instance):
862
    """Release temporary drbd minors allocated for a given instance.
863

864
    This should be called on the error paths, on the success paths
865
    it's automatically called by the ConfigWriter add and update
866
    functions.
867

868
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
869

870
    @type instance: string
871
    @param instance: the instance for which temporary minors should be
872
                     released
873

874
    """
875
    self._UnlockedReleaseDRBDMinors(instance)
876

    
877
  @locking.ssynchronized(_config_lock, shared=1)
878
  def GetConfigVersion(self):
879
    """Get the configuration version.
880

881
    @return: Config version
882

883
    """
884
    return self._config_data.version
885

    
886
  @locking.ssynchronized(_config_lock, shared=1)
887
  def GetClusterName(self):
888
    """Get cluster name.
889

890
    @return: Cluster name
891

892
    """
893
    return self._config_data.cluster.cluster_name
894

    
895
  @locking.ssynchronized(_config_lock, shared=1)
896
  def GetMasterNode(self):
897
    """Get the hostname of the master node for this cluster.
898

899
    @return: Master hostname
900

901
    """
902
    return self._config_data.cluster.master_node
903

    
904
  @locking.ssynchronized(_config_lock, shared=1)
905
  def GetMasterIP(self):
906
    """Get the IP of the master node for this cluster.
907

908
    @return: Master IP
909

910
    """
911
    return self._config_data.cluster.master_ip
912

    
913
  @locking.ssynchronized(_config_lock, shared=1)
914
  def GetMasterNetdev(self):
915
    """Get the master network device for this cluster.
916

917
    """
918
    return self._config_data.cluster.master_netdev
919

    
920
  @locking.ssynchronized(_config_lock, shared=1)
921
  def GetMasterNetmask(self):
922
    """Get the netmask of the master node for this cluster.
923

924
    """
925
    return self._config_data.cluster.master_netmask
926

    
927
  @locking.ssynchronized(_config_lock, shared=1)
928
  def GetUseExternalMipScript(self):
929
    """Get flag representing whether to use the external master IP setup script.
930

931
    """
932
    return self._config_data.cluster.use_external_mip_script
933

    
934
  @locking.ssynchronized(_config_lock, shared=1)
935
  def GetFileStorageDir(self):
936
    """Get the file storage dir for this cluster.
937

938
    """
939
    return self._config_data.cluster.file_storage_dir
940

    
941
  @locking.ssynchronized(_config_lock, shared=1)
942
  def GetSharedFileStorageDir(self):
943
    """Get the shared file storage dir for this cluster.
944

945
    """
946
    return self._config_data.cluster.shared_file_storage_dir
947

    
948
  @locking.ssynchronized(_config_lock, shared=1)
949
  def GetHypervisorType(self):
950
    """Get the hypervisor type for this cluster.
951

952
    """
953
    return self._config_data.cluster.enabled_hypervisors[0]
954

    
955
  @locking.ssynchronized(_config_lock, shared=1)
956
  def GetHostKey(self):
957
    """Return the rsa hostkey from the config.
958

959
    @rtype: string
960
    @return: the rsa hostkey
961

962
    """
963
    return self._config_data.cluster.rsahostkeypub
964

    
965
  @locking.ssynchronized(_config_lock, shared=1)
966
  def GetDefaultIAllocator(self):
967
    """Get the default instance allocator for this cluster.
968

969
    """
970
    return self._config_data.cluster.default_iallocator
971

    
972
  @locking.ssynchronized(_config_lock, shared=1)
973
  def GetPrimaryIPFamily(self):
974
    """Get cluster primary ip family.
975

976
    @return: primary ip family
977

978
    """
979
    return self._config_data.cluster.primary_ip_family
980

    
981
  @locking.ssynchronized(_config_lock, shared=1)
982
  def GetMasterNetworkParameters(self):
983
    """Get network parameters of the master node.
984

985
    @rtype: L{object.MasterNetworkParameters}
986
    @return: network parameters of the master node
987

988
    """
989
    cluster = self._config_data.cluster
990
    result = objects.MasterNetworkParameters(name=cluster.master_node,
991
      ip=cluster.master_ip,
992
      netmask=cluster.master_netmask,
993
      netdev=cluster.master_netdev,
994
      ip_family=cluster.primary_ip_family)
995

    
996
    return result
997

    
998
  @locking.ssynchronized(_config_lock)
999
  def AddNodeGroup(self, group, ec_id, check_uuid=True):
1000
    """Add a node group to the configuration.
1001

1002
    This method calls group.UpgradeConfig() to fill any missing attributes
1003
    according to their default values.
1004

1005
    @type group: L{objects.NodeGroup}
1006
    @param group: the NodeGroup object to add
1007
    @type ec_id: string
1008
    @param ec_id: unique id for the job to use when creating a missing UUID
1009
    @type check_uuid: bool
1010
    @param check_uuid: add an UUID to the group if it doesn't have one or, if
1011
                       it does, ensure that it does not exist in the
1012
                       configuration already
1013

1014
    """
1015
    self._UnlockedAddNodeGroup(group, ec_id, check_uuid)
1016
    self._WriteConfig()
1017

    
1018
  def _UnlockedAddNodeGroup(self, group, ec_id, check_uuid):
1019
    """Add a node group to the configuration.
1020

1021
    """
1022
    logging.info("Adding node group %s to configuration", group.name)
1023

    
1024
    # Some code might need to add a node group with a pre-populated UUID
1025
    # generated with ConfigWriter.GenerateUniqueID(). We allow them to bypass
1026
    # the "does this UUID" exist already check.
1027
    if check_uuid:
1028
      self._EnsureUUID(group, ec_id)
1029

    
1030
    try:
1031
      existing_uuid = self._UnlockedLookupNodeGroup(group.name)
1032
    except errors.OpPrereqError:
1033
      pass
1034
    else:
1035
      raise errors.OpPrereqError("Desired group name '%s' already exists as a"
1036
                                 " node group (UUID: %s)" %
1037
                                 (group.name, existing_uuid),
1038
                                 errors.ECODE_EXISTS)
1039

    
1040
    group.serial_no = 1
1041
    group.ctime = group.mtime = time.time()
1042
    group.UpgradeConfig()
1043

    
1044
    self._config_data.nodegroups[group.uuid] = group
1045
    self._config_data.cluster.serial_no += 1
1046

    
1047
  @locking.ssynchronized(_config_lock)
1048
  def RemoveNodeGroup(self, group_uuid):
1049
    """Remove a node group from the configuration.
1050

1051
    @type group_uuid: string
1052
    @param group_uuid: the UUID of the node group to remove
1053

1054
    """
1055
    logging.info("Removing node group %s from configuration", group_uuid)
1056

    
1057
    if group_uuid not in self._config_data.nodegroups:
1058
      raise errors.ConfigurationError("Unknown node group '%s'" % group_uuid)
1059

    
1060
    assert len(self._config_data.nodegroups) != 1, \
1061
            "Group '%s' is the only group, cannot be removed" % group_uuid
1062

    
1063
    del self._config_data.nodegroups[group_uuid]
1064
    self._config_data.cluster.serial_no += 1
1065
    self._WriteConfig()
1066

    
1067
  def _UnlockedLookupNodeGroup(self, target):
1068
    """Lookup a node group's UUID.
1069

1070
    @type target: string or None
1071
    @param target: group name or UUID or None to look for the default
1072
    @rtype: string
1073
    @return: nodegroup UUID
1074
    @raises errors.OpPrereqError: when the target group cannot be found
1075

1076
    """
1077
    if target is None:
1078
      if len(self._config_data.nodegroups) != 1:
1079
        raise errors.OpPrereqError("More than one node group exists. Target"
1080
                                   " group must be specified explicitely.")
1081
      else:
1082
        return self._config_data.nodegroups.keys()[0]
1083
    if target in self._config_data.nodegroups:
1084
      return target
1085
    for nodegroup in self._config_data.nodegroups.values():
1086
      if nodegroup.name == target:
1087
        return nodegroup.uuid
1088
    raise errors.OpPrereqError("Node group '%s' not found" % target,
1089
                               errors.ECODE_NOENT)
1090

    
1091
  @locking.ssynchronized(_config_lock, shared=1)
1092
  def LookupNodeGroup(self, target):
1093
    """Lookup a node group's UUID.
1094

1095
    This function is just a wrapper over L{_UnlockedLookupNodeGroup}.
1096

1097
    @type target: string or None
1098
    @param target: group name or UUID or None to look for the default
1099
    @rtype: string
1100
    @return: nodegroup UUID
1101

1102
    """
1103
    return self._UnlockedLookupNodeGroup(target)
1104

    
1105
  def _UnlockedGetNodeGroup(self, uuid):
1106
    """Lookup a node group.
1107

1108
    @type uuid: string
1109
    @param uuid: group UUID
1110
    @rtype: L{objects.NodeGroup} or None
1111
    @return: nodegroup object, or None if not found
1112

1113
    """
1114
    if uuid not in self._config_data.nodegroups:
1115
      return None
1116

    
1117
    return self._config_data.nodegroups[uuid]
1118

    
1119
  @locking.ssynchronized(_config_lock, shared=1)
1120
  def GetNodeGroup(self, uuid):
1121
    """Lookup a node group.
1122

1123
    @type uuid: string
1124
    @param uuid: group UUID
1125
    @rtype: L{objects.NodeGroup} or None
1126
    @return: nodegroup object, or None if not found
1127

1128
    """
1129
    return self._UnlockedGetNodeGroup(uuid)
1130

    
1131
  @locking.ssynchronized(_config_lock, shared=1)
1132
  def GetAllNodeGroupsInfo(self):
1133
    """Get the configuration of all node groups.
1134

1135
    """
1136
    return dict(self._config_data.nodegroups)
1137

    
1138
  @locking.ssynchronized(_config_lock, shared=1)
1139
  def GetNodeGroupList(self):
1140
    """Get a list of node groups.
1141

1142
    """
1143
    return self._config_data.nodegroups.keys()
1144

    
1145
  @locking.ssynchronized(_config_lock, shared=1)
1146
  def GetNodeGroupMembersByNodes(self, nodes):
1147
    """Get nodes which are member in the same nodegroups as the given nodes.
1148

1149
    """
1150
    ngfn = lambda node_name: self._UnlockedGetNodeInfo(node_name).group
1151
    return frozenset(member_name
1152
                     for node_name in nodes
1153
                     for member_name in
1154
                       self._UnlockedGetNodeGroup(ngfn(node_name)).members)
1155

    
1156
  @locking.ssynchronized(_config_lock)
1157
  def AddInstance(self, instance, ec_id):
1158
    """Add an instance to the config.
1159

1160
    This should be used after creating a new instance.
1161

1162
    @type instance: L{objects.Instance}
1163
    @param instance: the instance object
1164

1165
    """
1166
    if not isinstance(instance, objects.Instance):
1167
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
1168

    
1169
    if instance.disk_template != constants.DT_DISKLESS:
1170
      all_lvs = instance.MapLVsByNode()
1171
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
1172

    
1173
    all_macs = self._AllMACs()
1174
    for nic in instance.nics:
1175
      if nic.mac in all_macs:
1176
        raise errors.ConfigurationError("Cannot add instance %s:"
1177
                                        " MAC address '%s' already in use." %
1178
                                        (instance.name, nic.mac))
1179

    
1180
    self._EnsureUUID(instance, ec_id)
1181

    
1182
    instance.serial_no = 1
1183
    instance.ctime = instance.mtime = time.time()
1184
    self._config_data.instances[instance.name] = instance
1185
    self._config_data.cluster.serial_no += 1
1186
    self._UnlockedReleaseDRBDMinors(instance.name)
1187
    self._WriteConfig()
1188

    
1189
  def _EnsureUUID(self, item, ec_id):
1190
    """Ensures a given object has a valid UUID.
1191

1192
    @param item: the instance or node to be checked
1193
    @param ec_id: the execution context id for the uuid reservation
1194

1195
    """
1196
    if not item.uuid:
1197
      item.uuid = self._GenerateUniqueID(ec_id)
1198
    elif item.uuid in self._AllIDs(include_temporary=True):
1199
      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
1200
                                      " in use" % (item.name, item.uuid))
1201

    
1202
  def _SetInstanceStatus(self, instance_name, status):
1203
    """Set the instance's status to a given value.
1204

1205
    """
1206
    assert status in constants.ADMINST_ALL, \
1207
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
1208

    
1209
    if instance_name not in self._config_data.instances:
1210
      raise errors.ConfigurationError("Unknown instance '%s'" %
1211
                                      instance_name)
1212
    instance = self._config_data.instances[instance_name]
1213
    if instance.admin_state != status:
1214
      instance.admin_state = status
1215
      instance.serial_no += 1
1216
      instance.mtime = time.time()
1217
      self._WriteConfig()
1218

    
1219
  @locking.ssynchronized(_config_lock)
1220
  def MarkInstanceUp(self, instance_name):
1221
    """Mark the instance status to up in the config.
1222

1223
    """
1224
    self._SetInstanceStatus(instance_name, constants.ADMINST_UP)
1225

    
1226
  @locking.ssynchronized(_config_lock)
1227
  def MarkInstanceOffline(self, instance_name):
1228
    """Mark the instance status to down in the config.
1229

1230
    """
1231
    self._SetInstanceStatus(instance_name, constants.ADMINST_OFFLINE)
1232

    
1233
  @locking.ssynchronized(_config_lock)
1234
  def RemoveInstance(self, instance_name):
1235
    """Remove the instance from the configuration.
1236

1237
    """
1238
    if instance_name not in self._config_data.instances:
1239
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1240

    
1241
    # If a network port has been allocated to the instance,
1242
    # return it to the pool of free ports.
1243
    inst = self._config_data.instances[instance_name]
1244
    network_port = getattr(inst, "network_port", None)
1245
    if network_port is not None:
1246
      self._config_data.cluster.tcpudp_port_pool.add(network_port)
1247

    
1248
    del self._config_data.instances[instance_name]
1249
    self._config_data.cluster.serial_no += 1
1250
    self._WriteConfig()
1251

    
1252
  @locking.ssynchronized(_config_lock)
1253
  def RenameInstance(self, old_name, new_name):
1254
    """Rename an instance.
1255

1256
    This needs to be done in ConfigWriter and not by RemoveInstance
1257
    combined with AddInstance as only we can guarantee an atomic
1258
    rename.
1259

1260
    """
1261
    if old_name not in self._config_data.instances:
1262
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
1263
    inst = self._config_data.instances[old_name]
1264
    del self._config_data.instances[old_name]
1265
    inst.name = new_name
1266

    
1267
    for disk in inst.disks:
1268
      if disk.dev_type == constants.LD_FILE:
1269
        # rename the file paths in logical and physical id
1270
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
1271
        disk_fname = "disk%s" % disk.iv_name.split("/")[1]
1272
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
1273
                                              utils.PathJoin(file_storage_dir,
1274
                                                             inst.name,
1275
                                                             disk_fname))
1276

    
1277
    # Force update of ssconf files
1278
    self._config_data.cluster.serial_no += 1
1279

    
1280
    self._config_data.instances[inst.name] = inst
1281
    self._WriteConfig()
1282

    
1283
  @locking.ssynchronized(_config_lock)
1284
  def MarkInstanceDown(self, instance_name):
1285
    """Mark the status of an instance to down in the configuration.
1286

1287
    """
1288
    self._SetInstanceStatus(instance_name, constants.ADMINST_DOWN)
1289

    
1290
  def _UnlockedGetInstanceList(self):
1291
    """Get the list of instances.
1292

1293
    This function is for internal use, when the config lock is already held.
1294

1295
    """
1296
    return self._config_data.instances.keys()
1297

    
1298
  @locking.ssynchronized(_config_lock, shared=1)
1299
  def GetInstanceList(self):
1300
    """Get the list of instances.
1301

1302
    @return: array of instances, ex. ['instance2.example.com',
1303
        'instance1.example.com']
1304

1305
    """
1306
    return self._UnlockedGetInstanceList()
1307

    
1308
  def ExpandInstanceName(self, short_name):
1309
    """Attempt to expand an incomplete instance name.
1310

1311
    """
1312
    # Locking is done in L{ConfigWriter.GetInstanceList}
1313
    return _MatchNameComponentIgnoreCase(short_name, self.GetInstanceList())
1314

    
1315
  def _UnlockedGetInstanceInfo(self, instance_name):
1316
    """Returns information about an instance.
1317

1318
    This function is for internal use, when the config lock is already held.
1319

1320
    """
1321
    if instance_name not in self._config_data.instances:
1322
      return None
1323

    
1324
    return self._config_data.instances[instance_name]
1325

    
1326
  @locking.ssynchronized(_config_lock, shared=1)
1327
  def GetInstanceInfo(self, instance_name):
1328
    """Returns information about an instance.
1329

1330
    It takes the information from the configuration file. Other information of
1331
    an instance are taken from the live systems.
1332

1333
    @param instance_name: name of the instance, e.g.
1334
        I{instance1.example.com}
1335

1336
    @rtype: L{objects.Instance}
1337
    @return: the instance object
1338

1339
    """
1340
    return self._UnlockedGetInstanceInfo(instance_name)
1341

    
1342
  @locking.ssynchronized(_config_lock, shared=1)
1343
  def GetInstanceNodeGroups(self, instance_name, primary_only=False):
1344
    """Returns set of node group UUIDs for instance's nodes.
1345

1346
    @rtype: frozenset
1347

1348
    """
1349
    instance = self._UnlockedGetInstanceInfo(instance_name)
1350
    if not instance:
1351
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
1352

    
1353
    if primary_only:
1354
      nodes = [instance.primary_node]
1355
    else:
1356
      nodes = instance.all_nodes
1357

    
1358
    return frozenset(self._UnlockedGetNodeInfo(node_name).group
1359
                     for node_name in nodes)
1360

    
1361
  @locking.ssynchronized(_config_lock, shared=1)
1362
  def GetMultiInstanceInfo(self, instances):
1363
    """Get the configuration of multiple instances.
1364

1365
    @param instances: list of instance names
1366
    @rtype: list
1367
    @return: list of tuples (instance, instance_info), where
1368
        instance_info is what would GetInstanceInfo return for the
1369
        node, while keeping the original order
1370

1371
    """
1372
    return [(name, self._UnlockedGetInstanceInfo(name)) for name in instances]
1373

    
1374
  @locking.ssynchronized(_config_lock, shared=1)
1375
  def GetAllInstancesInfo(self):
1376
    """Get the configuration of all instances.
1377

1378
    @rtype: dict
1379
    @return: dict of (instance, instance_info), where instance_info is what
1380
              would GetInstanceInfo return for the node
1381

1382
    """
1383
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1384
                    for instance in self._UnlockedGetInstanceList()])
1385
    return my_dict
1386

    
1387
  @locking.ssynchronized(_config_lock, shared=1)
1388
  def GetInstancesInfoByFilter(self, filter_fn):
1389
    """Get instance configuration with a filter.
1390

1391
    @type filter_fn: callable
1392
    @param filter_fn: Filter function receiving instance object as parameter,
1393
      returning boolean. Important: this function is called while the
1394
      configuration locks is held. It must not do any complex work or call
1395
      functions potentially leading to a deadlock. Ideally it doesn't call any
1396
      other functions and just compares instance attributes.
1397

1398
    """
1399
    return dict((name, inst)
1400
                for (name, inst) in self._config_data.instances.items()
1401
                if filter_fn(inst))
1402

    
1403
  @locking.ssynchronized(_config_lock)
1404
  def AddNode(self, node, ec_id):
1405
    """Add a node to the configuration.
1406

1407
    @type node: L{objects.Node}
1408
    @param node: a Node instance
1409

1410
    """
1411
    logging.info("Adding node %s to configuration", node.name)
1412

    
1413
    self._EnsureUUID(node, ec_id)
1414

    
1415
    node.serial_no = 1
1416
    node.ctime = node.mtime = time.time()
1417
    self._UnlockedAddNodeToGroup(node.name, node.group)
1418
    self._config_data.nodes[node.name] = node
1419
    self._config_data.cluster.serial_no += 1
1420
    self._WriteConfig()
1421

    
1422
  @locking.ssynchronized(_config_lock)
1423
  def RemoveNode(self, node_name):
1424
    """Remove a node from the configuration.
1425

1426
    """
1427
    logging.info("Removing node %s from configuration", node_name)
1428

    
1429
    if node_name not in self._config_data.nodes:
1430
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1431

    
1432
    self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1433
    del self._config_data.nodes[node_name]
1434
    self._config_data.cluster.serial_no += 1
1435
    self._WriteConfig()
1436

    
1437
  def ExpandNodeName(self, short_name):
1438
    """Attempt to expand an incomplete node name.
1439

1440
    """
1441
    # Locking is done in L{ConfigWriter.GetNodeList}
1442
    return _MatchNameComponentIgnoreCase(short_name, self.GetNodeList())
1443

    
1444
  def _UnlockedGetNodeInfo(self, node_name):
1445
    """Get the configuration of a node, as stored in the config.
1446

1447
    This function is for internal use, when the config lock is already
1448
    held.
1449

1450
    @param node_name: the node name, e.g. I{node1.example.com}
1451

1452
    @rtype: L{objects.Node}
1453
    @return: the node object
1454

1455
    """
1456
    if node_name not in self._config_data.nodes:
1457
      return None
1458

    
1459
    return self._config_data.nodes[node_name]
1460

    
1461
  @locking.ssynchronized(_config_lock, shared=1)
1462
  def GetNodeInfo(self, node_name):
1463
    """Get the configuration of a node, as stored in the config.
1464

1465
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1466

1467
    @param node_name: the node name, e.g. I{node1.example.com}
1468

1469
    @rtype: L{objects.Node}
1470
    @return: the node object
1471

1472
    """
1473
    return self._UnlockedGetNodeInfo(node_name)
1474

    
1475
  @locking.ssynchronized(_config_lock, shared=1)
1476
  def GetNodeInstances(self, node_name):
1477
    """Get the instances of a node, as stored in the config.
1478

1479
    @param node_name: the node name, e.g. I{node1.example.com}
1480

1481
    @rtype: (list, list)
1482
    @return: a tuple with two lists: the primary and the secondary instances
1483

1484
    """
1485
    pri = []
1486
    sec = []
1487
    for inst in self._config_data.instances.values():
1488
      if inst.primary_node == node_name:
1489
        pri.append(inst.name)
1490
      if node_name in inst.secondary_nodes:
1491
        sec.append(inst.name)
1492
    return (pri, sec)
1493

    
1494
  @locking.ssynchronized(_config_lock, shared=1)
1495
  def GetNodeGroupInstances(self, uuid, primary_only=False):
1496
    """Get the instances of a node group.
1497

1498
    @param uuid: Node group UUID
1499
    @param primary_only: Whether to only consider primary nodes
1500
    @rtype: frozenset
1501
    @return: List of instance names in node group
1502

1503
    """
1504
    if primary_only:
1505
      nodes_fn = lambda inst: [inst.primary_node]
1506
    else:
1507
      nodes_fn = lambda inst: inst.all_nodes
1508

    
1509
    return frozenset(inst.name
1510
                     for inst in self._config_data.instances.values()
1511
                     for node_name in nodes_fn(inst)
1512
                     if self._UnlockedGetNodeInfo(node_name).group == uuid)
1513

    
1514
  def _UnlockedGetNodeList(self):
1515
    """Return the list of nodes which are in the configuration.
1516

1517
    This function is for internal use, when the config lock is already
1518
    held.
1519

1520
    @rtype: list
1521

1522
    """
1523
    return self._config_data.nodes.keys()
1524

    
1525
  @locking.ssynchronized(_config_lock, shared=1)
1526
  def GetNodeList(self):
1527
    """Return the list of nodes which are in the configuration.
1528

1529
    """
1530
    return self._UnlockedGetNodeList()
1531

    
1532
  def _UnlockedGetOnlineNodeList(self):
1533
    """Return the list of nodes which are online.
1534

1535
    """
1536
    all_nodes = [self._UnlockedGetNodeInfo(node)
1537
                 for node in self._UnlockedGetNodeList()]
1538
    return [node.name for node in all_nodes if not node.offline]
1539

    
1540
  @locking.ssynchronized(_config_lock, shared=1)
1541
  def GetOnlineNodeList(self):
1542
    """Return the list of nodes which are online.
1543

1544
    """
1545
    return self._UnlockedGetOnlineNodeList()
1546

    
1547
  @locking.ssynchronized(_config_lock, shared=1)
1548
  def GetVmCapableNodeList(self):
1549
    """Return the list of nodes which are not vm capable.
1550

1551
    """
1552
    all_nodes = [self._UnlockedGetNodeInfo(node)
1553
                 for node in self._UnlockedGetNodeList()]
1554
    return [node.name for node in all_nodes if node.vm_capable]
1555

    
1556
  @locking.ssynchronized(_config_lock, shared=1)
1557
  def GetNonVmCapableNodeList(self):
1558
    """Return the list of nodes which are not vm capable.
1559

1560
    """
1561
    all_nodes = [self._UnlockedGetNodeInfo(node)
1562
                 for node in self._UnlockedGetNodeList()]
1563
    return [node.name for node in all_nodes if not node.vm_capable]
1564

    
1565
  @locking.ssynchronized(_config_lock, shared=1)
1566
  def GetMultiNodeInfo(self, nodes):
1567
    """Get the configuration of multiple nodes.
1568

1569
    @param nodes: list of node names
1570
    @rtype: list
1571
    @return: list of tuples of (node, node_info), where node_info is
1572
        what would GetNodeInfo return for the node, in the original
1573
        order
1574

1575
    """
1576
    return [(name, self._UnlockedGetNodeInfo(name)) for name in nodes]
1577

    
1578
  @locking.ssynchronized(_config_lock, shared=1)
1579
  def GetAllNodesInfo(self):
1580
    """Get the configuration of all nodes.
1581

1582
    @rtype: dict
1583
    @return: dict of (node, node_info), where node_info is what
1584
              would GetNodeInfo return for the node
1585

1586
    """
1587
    return self._UnlockedGetAllNodesInfo()
1588

    
1589
  def _UnlockedGetAllNodesInfo(self):
1590
    """Gets configuration of all nodes.
1591

1592
    @note: See L{GetAllNodesInfo}
1593

1594
    """
1595
    return dict([(node, self._UnlockedGetNodeInfo(node))
1596
                 for node in self._UnlockedGetNodeList()])
1597

    
1598
  @locking.ssynchronized(_config_lock, shared=1)
1599
  def GetNodeGroupsFromNodes(self, nodes):
1600
    """Returns groups for a list of nodes.
1601

1602
    @type nodes: list of string
1603
    @param nodes: List of node names
1604
    @rtype: frozenset
1605

1606
    """
1607
    return frozenset(self._UnlockedGetNodeInfo(name).group for name in nodes)
1608

    
1609
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1610
    """Get the number of current and maximum desired and possible candidates.
1611

1612
    @type exceptions: list
1613
    @param exceptions: if passed, list of nodes that should be ignored
1614
    @rtype: tuple
1615
    @return: tuple of (current, desired and possible, possible)
1616

1617
    """
1618
    mc_now = mc_should = mc_max = 0
1619
    for node in self._config_data.nodes.values():
1620
      if exceptions and node.name in exceptions:
1621
        continue
1622
      if not (node.offline or node.drained) and node.master_capable:
1623
        mc_max += 1
1624
      if node.master_candidate:
1625
        mc_now += 1
1626
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1627
    return (mc_now, mc_should, mc_max)
1628

    
1629
  @locking.ssynchronized(_config_lock, shared=1)
1630
  def GetMasterCandidateStats(self, exceptions=None):
1631
    """Get the number of current and maximum possible candidates.
1632

1633
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1634

1635
    @type exceptions: list
1636
    @param exceptions: if passed, list of nodes that should be ignored
1637
    @rtype: tuple
1638
    @return: tuple of (current, max)
1639

1640
    """
1641
    return self._UnlockedGetMasterCandidateStats(exceptions)
1642

    
1643
  @locking.ssynchronized(_config_lock)
1644
  def MaintainCandidatePool(self, exceptions):
1645
    """Try to grow the candidate pool to the desired size.
1646

1647
    @type exceptions: list
1648
    @param exceptions: if passed, list of nodes that should be ignored
1649
    @rtype: list
1650
    @return: list with the adjusted nodes (L{objects.Node} instances)
1651

1652
    """
1653
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1654
    mod_list = []
1655
    if mc_now < mc_max:
1656
      node_list = self._config_data.nodes.keys()
1657
      random.shuffle(node_list)
1658
      for name in node_list:
1659
        if mc_now >= mc_max:
1660
          break
1661
        node = self._config_data.nodes[name]
1662
        if (node.master_candidate or node.offline or node.drained or
1663
            node.name in exceptions or not node.master_capable):
1664
          continue
1665
        mod_list.append(node)
1666
        node.master_candidate = True
1667
        node.serial_no += 1
1668
        mc_now += 1
1669
      if mc_now != mc_max:
1670
        # this should not happen
1671
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1672
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1673
      if mod_list:
1674
        self._config_data.cluster.serial_no += 1
1675
        self._WriteConfig()
1676

    
1677
    return mod_list
1678

    
1679
  def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1680
    """Add a given node to the specified group.
1681

1682
    """
1683
    if nodegroup_uuid not in self._config_data.nodegroups:
1684
      # This can happen if a node group gets deleted between its lookup and
1685
      # when we're adding the first node to it, since we don't keep a lock in
1686
      # the meantime. It's ok though, as we'll fail cleanly if the node group
1687
      # is not found anymore.
1688
      raise errors.OpExecError("Unknown node group: %s" % nodegroup_uuid)
1689
    if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1690
      self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1691

    
1692
  def _UnlockedRemoveNodeFromGroup(self, node):
1693
    """Remove a given node from its group.
1694

1695
    """
1696
    nodegroup = node.group
1697
    if nodegroup not in self._config_data.nodegroups:
1698
      logging.warning("Warning: node '%s' has unknown node group '%s'"
1699
                      " (while being removed from it)", node.name, nodegroup)
1700
    nodegroup_obj = self._config_data.nodegroups[nodegroup]
1701
    if node.name not in nodegroup_obj.members:
1702
      logging.warning("Warning: node '%s' not a member of its node group '%s'"
1703
                      " (while being removed from it)", node.name, nodegroup)
1704
    else:
1705
      nodegroup_obj.members.remove(node.name)
1706

    
1707
  @locking.ssynchronized(_config_lock)
1708
  def AssignGroupNodes(self, mods):
1709
    """Changes the group of a number of nodes.
1710

1711
    @type mods: list of tuples; (node name, new group UUID)
1712
    @param mods: Node membership modifications
1713

1714
    """
1715
    groups = self._config_data.nodegroups
1716
    nodes = self._config_data.nodes
1717

    
1718
    resmod = []
1719

    
1720
    # Try to resolve names/UUIDs first
1721
    for (node_name, new_group_uuid) in mods:
1722
      try:
1723
        node = nodes[node_name]
1724
      except KeyError:
1725
        raise errors.ConfigurationError("Unable to find node '%s'" % node_name)
1726

    
1727
      if node.group == new_group_uuid:
1728
        # Node is being assigned to its current group
1729
        logging.debug("Node '%s' was assigned to its current group (%s)",
1730
                      node_name, node.group)
1731
        continue
1732

    
1733
      # Try to find current group of node
1734
      try:
1735
        old_group = groups[node.group]
1736
      except KeyError:
1737
        raise errors.ConfigurationError("Unable to find old group '%s'" %
1738
                                        node.group)
1739

    
1740
      # Try to find new group for node
1741
      try:
1742
        new_group = groups[new_group_uuid]
1743
      except KeyError:
1744
        raise errors.ConfigurationError("Unable to find new group '%s'" %
1745
                                        new_group_uuid)
1746

    
1747
      assert node.name in old_group.members, \
1748
        ("Inconsistent configuration: node '%s' not listed in members for its"
1749
         " old group '%s'" % (node.name, old_group.uuid))
1750
      assert node.name not in new_group.members, \
1751
        ("Inconsistent configuration: node '%s' already listed in members for"
1752
         " its new group '%s'" % (node.name, new_group.uuid))
1753

    
1754
      resmod.append((node, old_group, new_group))
1755

    
1756
    # Apply changes
1757
    for (node, old_group, new_group) in resmod:
1758
      assert node.uuid != new_group.uuid and old_group.uuid != new_group.uuid, \
1759
        "Assigning to current group is not possible"
1760

    
1761
      node.group = new_group.uuid
1762

    
1763
      # Update members of involved groups
1764
      if node.name in old_group.members:
1765
        old_group.members.remove(node.name)
1766
      if node.name not in new_group.members:
1767
        new_group.members.append(node.name)
1768

    
1769
    # Update timestamps and serials (only once per node/group object)
1770
    now = time.time()
1771
    for obj in frozenset(itertools.chain(*resmod)): # pylint: disable=W0142
1772
      obj.serial_no += 1
1773
      obj.mtime = now
1774

    
1775
    # Force ssconf update
1776
    self._config_data.cluster.serial_no += 1
1777

    
1778
    self._WriteConfig()
1779

    
1780
  def _BumpSerialNo(self):
1781
    """Bump up the serial number of the config.
1782

1783
    """
1784
    self._config_data.serial_no += 1
1785
    self._config_data.mtime = time.time()
1786

    
1787
  def _AllUUIDObjects(self):
1788
    """Returns all objects with uuid attributes.
1789

1790
    """
1791
    return (self._config_data.instances.values() +
1792
            self._config_data.nodes.values() +
1793
            self._config_data.nodegroups.values() +
1794
            [self._config_data.cluster])
1795

    
1796
  def _OpenConfig(self, accept_foreign):
1797
    """Read the config data from disk.
1798

1799
    """
1800
    raw_data = utils.ReadFile(self._cfg_file)
1801

    
1802
    try:
1803
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1804
    except Exception, err:
1805
      raise errors.ConfigurationError(err)
1806

    
1807
    # Make sure the configuration has the right version
1808
    _ValidateConfig(data)
1809

    
1810
    if (not hasattr(data, 'cluster') or
1811
        not hasattr(data.cluster, 'rsahostkeypub')):
1812
      raise errors.ConfigurationError("Incomplete configuration"
1813
                                      " (missing cluster.rsahostkeypub)")
1814

    
1815
    if data.cluster.master_node != self._my_hostname and not accept_foreign:
1816
      msg = ("The configuration denotes node %s as master, while my"
1817
             " hostname is %s; opening a foreign configuration is only"
1818
             " possible in accept_foreign mode" %
1819
             (data.cluster.master_node, self._my_hostname))
1820
      raise errors.ConfigurationError(msg)
1821

    
1822
    # Upgrade configuration if needed
1823
    data.UpgradeConfig()
1824

    
1825
    self._config_data = data
1826
    # reset the last serial as -1 so that the next write will cause
1827
    # ssconf update
1828
    self._last_cluster_serial = -1
1829

    
1830
    # And finally run our (custom) config upgrade sequence
1831
    self._UpgradeConfig()
1832

    
1833
    self._cfg_id = utils.GetFileID(path=self._cfg_file)
1834

    
1835
  def _UpgradeConfig(self):
1836
    """Run upgrade steps that cannot be done purely in the objects.
1837

1838
    This is because some data elements need uniqueness across the
1839
    whole configuration, etc.
1840

1841
    @warning: this function will call L{_WriteConfig()}, but also
1842
        L{DropECReservations} so it needs to be called only from a
1843
        "safe" place (the constructor). If one wanted to call it with
1844
        the lock held, a DropECReservationUnlocked would need to be
1845
        created first, to avoid causing deadlock.
1846

1847
    """
1848
    modified = False
1849
    for item in self._AllUUIDObjects():
1850
      if item.uuid is None:
1851
        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1852
        modified = True
1853
    if not self._config_data.nodegroups:
1854
      default_nodegroup_name = constants.INITIAL_NODE_GROUP_NAME
1855
      default_nodegroup = objects.NodeGroup(name=default_nodegroup_name,
1856
                                            members=[])
1857
      self._UnlockedAddNodeGroup(default_nodegroup, _UPGRADE_CONFIG_JID, True)
1858
      modified = True
1859
    for node in self._config_data.nodes.values():
1860
      if not node.group:
1861
        node.group = self.LookupNodeGroup(None)
1862
        modified = True
1863
      # This is technically *not* an upgrade, but needs to be done both when
1864
      # nodegroups are being added, and upon normally loading the config,
1865
      # because the members list of a node group is discarded upon
1866
      # serializing/deserializing the object.
1867
      self._UnlockedAddNodeToGroup(node.name, node.group)
1868
    if modified:
1869
      self._WriteConfig()
1870
      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1871
      # only called at config init time, without the lock held
1872
      self.DropECReservations(_UPGRADE_CONFIG_JID)
1873

    
1874
  def _DistributeConfig(self, feedback_fn):
1875
    """Distribute the configuration to the other nodes.
1876

1877
    Currently, this only copies the configuration file. In the future,
1878
    it could be used to encapsulate the 2/3-phase update mechanism.
1879

1880
    """
1881
    if self._offline:
1882
      return True
1883

    
1884
    bad = False
1885

    
1886
    node_list = []
1887
    addr_list = []
1888
    myhostname = self._my_hostname
1889
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1890
    # since the node list comes from _UnlocketGetNodeList, and we are
1891
    # called with the lock held, so no modifications should take place
1892
    # in between
1893
    for node_name in self._UnlockedGetNodeList():
1894
      if node_name == myhostname:
1895
        continue
1896
      node_info = self._UnlockedGetNodeInfo(node_name)
1897
      if not node_info.master_candidate:
1898
        continue
1899
      node_list.append(node_info.name)
1900
      addr_list.append(node_info.primary_ip)
1901

    
1902
    # TODO: Use dedicated resolver talking to config writer for name resolution
1903
    result = \
1904
      self._GetRpc(addr_list).call_upload_file(node_list, self._cfg_file)
1905
    for to_node, to_result in result.items():
1906
      msg = to_result.fail_msg
1907
      if msg:
1908
        msg = ("Copy of file %s to node %s failed: %s" %
1909
               (self._cfg_file, to_node, msg))
1910
        logging.error(msg)
1911

    
1912
        if feedback_fn:
1913
          feedback_fn(msg)
1914

    
1915
        bad = True
1916

    
1917
    return not bad
1918

    
1919
  def _WriteConfig(self, destination=None, feedback_fn=None):
1920
    """Write the configuration data to persistent storage.
1921

1922
    """
1923
    assert feedback_fn is None or callable(feedback_fn)
1924

    
1925
    # Warn on config errors, but don't abort the save - the
1926
    # configuration has already been modified, and we can't revert;
1927
    # the best we can do is to warn the user and save as is, leaving
1928
    # recovery to the user
1929
    config_errors = self._UnlockedVerifyConfig()
1930
    if config_errors:
1931
      errmsg = ("Configuration data is not consistent: %s" %
1932
                (utils.CommaJoin(config_errors)))
1933
      logging.critical(errmsg)
1934
      if feedback_fn:
1935
        feedback_fn(errmsg)
1936

    
1937
    if destination is None:
1938
      destination = self._cfg_file
1939
    self._BumpSerialNo()
1940
    txt = serializer.Dump(self._config_data.ToDict())
1941

    
1942
    getents = self._getents()
1943
    try:
1944
      fd = utils.SafeWriteFile(destination, self._cfg_id, data=txt,
1945
                               close=False, gid=getents.confd_gid, mode=0640)
1946
    except errors.LockError:
1947
      raise errors.ConfigurationError("The configuration file has been"
1948
                                      " modified since the last write, cannot"
1949
                                      " update")
1950
    try:
1951
      self._cfg_id = utils.GetFileID(fd=fd)
1952
    finally:
1953
      os.close(fd)
1954

    
1955
    self.write_count += 1
1956

    
1957
    # and redistribute the config file to master candidates
1958
    self._DistributeConfig(feedback_fn)
1959

    
1960
    # Write ssconf files on all nodes (including locally)
1961
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1962
      if not self._offline:
1963
        result = self._GetRpc(None).call_write_ssconf_files(
1964
          self._UnlockedGetOnlineNodeList(),
1965
          self._UnlockedGetSsconfValues())
1966

    
1967
        for nname, nresu in result.items():
1968
          msg = nresu.fail_msg
1969
          if msg:
1970
            errmsg = ("Error while uploading ssconf files to"
1971
                      " node %s: %s" % (nname, msg))
1972
            logging.warning(errmsg)
1973

    
1974
            if feedback_fn:
1975
              feedback_fn(errmsg)
1976

    
1977
      self._last_cluster_serial = self._config_data.cluster.serial_no
1978

    
1979
  def _UnlockedGetSsconfValues(self):
1980
    """Return the values needed by ssconf.
1981

1982
    @rtype: dict
1983
    @return: a dictionary with keys the ssconf names and values their
1984
        associated value
1985

1986
    """
1987
    fn = "\n".join
1988
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1989
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1990
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1991
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1992
                    for ninfo in node_info]
1993
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1994
                    for ninfo in node_info]
1995

    
1996
    instance_data = fn(instance_names)
1997
    off_data = fn(node.name for node in node_info if node.offline)
1998
    on_data = fn(node.name for node in node_info if not node.offline)
1999
    mc_data = fn(node.name for node in node_info if node.master_candidate)
2000
    mc_ips_data = fn(node.primary_ip for node in node_info
2001
                     if node.master_candidate)
2002
    node_data = fn(node_names)
2003
    node_pri_ips_data = fn(node_pri_ips)
2004
    node_snd_ips_data = fn(node_snd_ips)
2005

    
2006
    cluster = self._config_data.cluster
2007
    cluster_tags = fn(cluster.GetTags())
2008

    
2009
    hypervisor_list = fn(cluster.enabled_hypervisors)
2010

    
2011
    uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
2012

    
2013
    nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
2014
                  self._config_data.nodegroups.values()]
2015
    nodegroups_data = fn(utils.NiceSort(nodegroups))
2016

    
2017
    ssconf_values = {
2018
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
2019
      constants.SS_CLUSTER_TAGS: cluster_tags,
2020
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
2021
      constants.SS_SHARED_FILE_STORAGE_DIR: cluster.shared_file_storage_dir,
2022
      constants.SS_MASTER_CANDIDATES: mc_data,
2023
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
2024
      constants.SS_MASTER_IP: cluster.master_ip,
2025
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
2026
      constants.SS_MASTER_NETMASK: str(cluster.master_netmask),
2027
      constants.SS_MASTER_NODE: cluster.master_node,
2028
      constants.SS_NODE_LIST: node_data,
2029
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
2030
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
2031
      constants.SS_OFFLINE_NODES: off_data,
2032
      constants.SS_ONLINE_NODES: on_data,
2033
      constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
2034
      constants.SS_INSTANCE_LIST: instance_data,
2035
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
2036
      constants.SS_HYPERVISOR_LIST: hypervisor_list,
2037
      constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
2038
      constants.SS_UID_POOL: uid_pool,
2039
      constants.SS_NODEGROUPS: nodegroups_data,
2040
      }
2041
    bad_values = [(k, v) for k, v in ssconf_values.items()
2042
                  if not isinstance(v, (str, basestring))]
2043
    if bad_values:
2044
      err = utils.CommaJoin("%s=%s" % (k, v) for k, v in bad_values)
2045
      raise errors.ConfigurationError("Some ssconf key(s) have non-string"
2046
                                      " values: %s" % err)
2047
    return ssconf_values
2048

    
2049
  @locking.ssynchronized(_config_lock, shared=1)
2050
  def GetSsconfValues(self):
2051
    """Wrapper using lock around _UnlockedGetSsconf().
2052

2053
    """
2054
    return self._UnlockedGetSsconfValues()
2055

    
2056
  @locking.ssynchronized(_config_lock, shared=1)
2057
  def GetVGName(self):
2058
    """Return the volume group name.
2059

2060
    """
2061
    return self._config_data.cluster.volume_group_name
2062

    
2063
  @locking.ssynchronized(_config_lock)
2064
  def SetVGName(self, vg_name):
2065
    """Set the volume group name.
2066

2067
    """
2068
    self._config_data.cluster.volume_group_name = vg_name
2069
    self._config_data.cluster.serial_no += 1
2070
    self._WriteConfig()
2071

    
2072
  @locking.ssynchronized(_config_lock, shared=1)
2073
  def GetDRBDHelper(self):
2074
    """Return DRBD usermode helper.
2075

2076
    """
2077
    return self._config_data.cluster.drbd_usermode_helper
2078

    
2079
  @locking.ssynchronized(_config_lock)
2080
  def SetDRBDHelper(self, drbd_helper):
2081
    """Set DRBD usermode helper.
2082

2083
    """
2084
    self._config_data.cluster.drbd_usermode_helper = drbd_helper
2085
    self._config_data.cluster.serial_no += 1
2086
    self._WriteConfig()
2087

    
2088
  @locking.ssynchronized(_config_lock, shared=1)
2089
  def GetMACPrefix(self):
2090
    """Return the mac prefix.
2091

2092
    """
2093
    return self._config_data.cluster.mac_prefix
2094

    
2095
  @locking.ssynchronized(_config_lock, shared=1)
2096
  def GetClusterInfo(self):
2097
    """Returns information about the cluster
2098

2099
    @rtype: L{objects.Cluster}
2100
    @return: the cluster object
2101

2102
    """
2103
    return self._config_data.cluster
2104

    
2105
  @locking.ssynchronized(_config_lock, shared=1)
2106
  def HasAnyDiskOfType(self, dev_type):
2107
    """Check if in there is at disk of the given type in the configuration.
2108

2109
    """
2110
    return self._config_data.HasAnyDiskOfType(dev_type)
2111

    
2112
  @locking.ssynchronized(_config_lock)
2113
  def Update(self, target, feedback_fn):
2114
    """Notify function to be called after updates.
2115

2116
    This function must be called when an object (as returned by
2117
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
2118
    caller wants the modifications saved to the backing store. Note
2119
    that all modified objects will be saved, but the target argument
2120
    is the one the caller wants to ensure that it's saved.
2121

2122
    @param target: an instance of either L{objects.Cluster},
2123
        L{objects.Node} or L{objects.Instance} which is existing in
2124
        the cluster
2125
    @param feedback_fn: Callable feedback function
2126

2127
    """
2128
    if self._config_data is None:
2129
      raise errors.ProgrammerError("Configuration file not read,"
2130
                                   " cannot save.")
2131
    update_serial = False
2132
    if isinstance(target, objects.Cluster):
2133
      test = target == self._config_data.cluster
2134
    elif isinstance(target, objects.Node):
2135
      test = target in self._config_data.nodes.values()
2136
      update_serial = True
2137
    elif isinstance(target, objects.Instance):
2138
      test = target in self._config_data.instances.values()
2139
    elif isinstance(target, objects.NodeGroup):
2140
      test = target in self._config_data.nodegroups.values()
2141
    else:
2142
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
2143
                                   " ConfigWriter.Update" % type(target))
2144
    if not test:
2145
      raise errors.ConfigurationError("Configuration updated since object"
2146
                                      " has been read or unknown object")
2147
    target.serial_no += 1
2148
    target.mtime = now = time.time()
2149

    
2150
    if update_serial:
2151
      # for node updates, we need to increase the cluster serial too
2152
      self._config_data.cluster.serial_no += 1
2153
      self._config_data.cluster.mtime = now
2154

    
2155
    if isinstance(target, objects.Instance):
2156
      self._UnlockedReleaseDRBDMinors(target.name)
2157

    
2158
    self._WriteConfig(feedback_fn=feedback_fn)
2159

    
2160
  @locking.ssynchronized(_config_lock)
2161
  def DropECReservations(self, ec_id):
2162
    """Drop per-execution-context reservations
2163

2164
    """
2165
    for rm in self._all_rms:
2166
      rm.DropECReservations(ec_id)