Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ ace16501

History | View | Annotate | Download (53.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
# pylint: disable-msg=R0904
35
# R0904: Too many public methods
36

    
37
import os
38
import random
39
import logging
40
import time
41

    
42
from ganeti import errors
43
from ganeti import locking
44
from ganeti import utils
45
from ganeti import constants
46
from ganeti import rpc
47
from ganeti import objects
48
from ganeti import serializer
49
from ganeti import uidpool
50
from ganeti import netutils
51
from ganeti import runtime
52

    
53

    
54
_config_lock = locking.SharedLock("ConfigWriter")
55

    
56
# job id used for resource management at config upgrade time
57
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
58

    
59

    
60
def _ValidateConfig(data):
61
  """Verifies that a configuration objects looks valid.
62

63
  This only verifies the version of the configuration.
64

65
  @raise errors.ConfigurationError: if the version differs from what
66
      we expect
67

68
  """
69
  if data.version != constants.CONFIG_VERSION:
70
    raise errors.ConfigurationError("Cluster configuration version"
71
                                    " mismatch, got %s instead of %s" %
72
                                    (data.version,
73
                                     constants.CONFIG_VERSION))
74

    
75

    
76
class TemporaryReservationManager:
77
  """A temporary resource reservation manager.
78

79
  This is used to reserve resources in a job, before using them, making sure
80
  other jobs cannot get them in the meantime.
81

82
  """
83
  def __init__(self):
84
    self._ec_reserved = {}
85

    
86
  def Reserved(self, resource):
87
    for holder_reserved in self._ec_reserved.items():
88
      if resource in holder_reserved:
89
        return True
90
    return False
91

    
92
  def Reserve(self, ec_id, resource):
93
    if self.Reserved(resource):
94
      raise errors.ReservationError("Duplicate reservation for resource: %s." %
95
                                    (resource))
96
    if ec_id not in self._ec_reserved:
97
      self._ec_reserved[ec_id] = set([resource])
98
    else:
99
      self._ec_reserved[ec_id].add(resource)
100

    
101
  def DropECReservations(self, ec_id):
102
    if ec_id in self._ec_reserved:
103
      del self._ec_reserved[ec_id]
104

    
105
  def GetReserved(self):
106
    all_reserved = set()
107
    for holder_reserved in self._ec_reserved.values():
108
      all_reserved.update(holder_reserved)
109
    return all_reserved
110

    
111
  def Generate(self, existing, generate_one_fn, ec_id):
112
    """Generate a new resource of this type
113

114
    """
115
    assert callable(generate_one_fn)
116

    
117
    all_elems = self.GetReserved()
118
    all_elems.update(existing)
119
    retries = 64
120
    while retries > 0:
121
      new_resource = generate_one_fn()
122
      if new_resource is not None and new_resource not in all_elems:
123
        break
124
    else:
125
      raise errors.ConfigurationError("Not able generate new resource"
126
                                      " (last tried: %s)" % new_resource)
127
    self.Reserve(ec_id, new_resource)
128
    return new_resource
129

    
130

    
131
class ConfigWriter:
132
  """The interface to the cluster configuration.
133

134
  @ivar _temporary_lvs: reservation manager for temporary LVs
135
  @ivar _all_rms: a list of all temporary reservation managers
136

137
  """
138
  def __init__(self, cfg_file=None, offline=False, _getents=runtime.GetEnts):
139
    self.write_count = 0
140
    self._lock = _config_lock
141
    self._config_data = None
142
    self._offline = offline
143
    if cfg_file is None:
144
      self._cfg_file = constants.CLUSTER_CONF_FILE
145
    else:
146
      self._cfg_file = cfg_file
147
    self._getents = _getents
148
    self._temporary_ids = TemporaryReservationManager()
149
    self._temporary_drbds = {}
150
    self._temporary_macs = TemporaryReservationManager()
151
    self._temporary_secrets = TemporaryReservationManager()
152
    self._temporary_lvs = TemporaryReservationManager()
153
    self._all_rms = [self._temporary_ids, self._temporary_macs,
154
                     self._temporary_secrets, self._temporary_lvs]
155
    # Note: in order to prevent errors when resolving our name in
156
    # _DistributeConfig, we compute it here once and reuse it; it's
157
    # better to raise an error before starting to modify the config
158
    # file than after it was modified
159
    self._my_hostname = netutils.Hostname.GetSysName()
160
    self._last_cluster_serial = -1
161
    self._OpenConfig()
162

    
163
  # this method needs to be static, so that we can call it on the class
164
  @staticmethod
165
  def IsCluster():
166
    """Check if the cluster is configured.
167

168
    """
169
    return os.path.exists(constants.CLUSTER_CONF_FILE)
170

    
171
  def _GenerateOneMAC(self):
172
    """Generate one mac address
173

174
    """
175
    prefix = self._config_data.cluster.mac_prefix
176
    byte1 = random.randrange(0, 256)
177
    byte2 = random.randrange(0, 256)
178
    byte3 = random.randrange(0, 256)
179
    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
180
    return mac
181

    
182
  @locking.ssynchronized(_config_lock, shared=1)
183
  def GenerateMAC(self, ec_id):
184
    """Generate a MAC for an instance.
185

186
    This should check the current instances for duplicates.
187

188
    """
189
    existing = self._AllMACs()
190
    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
191

    
192
  @locking.ssynchronized(_config_lock, shared=1)
193
  def ReserveMAC(self, mac, ec_id):
194
    """Reserve a MAC for an instance.
195

196
    This only checks instances managed by this cluster, it does not
197
    check for potential collisions elsewhere.
198

199
    """
200
    all_macs = self._AllMACs()
201
    if mac in all_macs:
202
      raise errors.ReservationError("mac already in use")
203
    else:
204
      self._temporary_macs.Reserve(mac, ec_id)
205

    
206
  @locking.ssynchronized(_config_lock, shared=1)
207
  def ReserveLV(self, lv_name, ec_id):
208
    """Reserve an VG/LV pair for an instance.
209

210
    @type lv_name: string
211
    @param lv_name: the logical volume name to reserve
212

213
    """
214
    all_lvs = self._AllLVs()
215
    if lv_name in all_lvs:
216
      raise errors.ReservationError("LV already in use")
217
    else:
218
      self._temporary_lvs.Reserve(lv_name, ec_id)
219

    
220
  @locking.ssynchronized(_config_lock, shared=1)
221
  def GenerateDRBDSecret(self, ec_id):
222
    """Generate a DRBD secret.
223

224
    This checks the current disks for duplicates.
225

226
    """
227
    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
228
                                            utils.GenerateSecret,
229
                                            ec_id)
230

    
231
  def _AllLVs(self):
232
    """Compute the list of all LVs.
233

234
    """
235
    lvnames = set()
236
    for instance in self._config_data.instances.values():
237
      node_data = instance.MapLVsByNode()
238
      for lv_list in node_data.values():
239
        lvnames.update(lv_list)
240
    return lvnames
241

    
242
  def _AllIDs(self, include_temporary):
243
    """Compute the list of all UUIDs and names we have.
244

245
    @type include_temporary: boolean
246
    @param include_temporary: whether to include the _temporary_ids set
247
    @rtype: set
248
    @return: a set of IDs
249

250
    """
251
    existing = set()
252
    if include_temporary:
253
      existing.update(self._temporary_ids.GetReserved())
254
    existing.update(self._AllLVs())
255
    existing.update(self._config_data.instances.keys())
256
    existing.update(self._config_data.nodes.keys())
257
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
258
    return existing
259

    
260
  def _GenerateUniqueID(self, ec_id):
261
    """Generate an unique UUID.
262

263
    This checks the current node, instances and disk names for
264
    duplicates.
265

266
    @rtype: string
267
    @return: the unique id
268

269
    """
270
    existing = self._AllIDs(include_temporary=False)
271
    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
272

    
273
  @locking.ssynchronized(_config_lock, shared=1)
274
  def GenerateUniqueID(self, ec_id):
275
    """Generate an unique ID.
276

277
    This is just a wrapper over the unlocked version.
278

279
    @type ec_id: string
280
    @param ec_id: unique id for the job to reserve the id to
281

282
    """
283
    return self._GenerateUniqueID(ec_id)
284

    
285
  def _AllMACs(self):
286
    """Return all MACs present in the config.
287

288
    @rtype: list
289
    @return: the list of all MACs
290

291
    """
292
    result = []
293
    for instance in self._config_data.instances.values():
294
      for nic in instance.nics:
295
        result.append(nic.mac)
296

    
297
    return result
298

    
299
  def _AllDRBDSecrets(self):
300
    """Return all DRBD secrets present in the config.
301

302
    @rtype: list
303
    @return: the list of all DRBD secrets
304

305
    """
306
    def helper(disk, result):
307
      """Recursively gather secrets from this disk."""
308
      if disk.dev_type == constants.DT_DRBD8:
309
        result.append(disk.logical_id[5])
310
      if disk.children:
311
        for child in disk.children:
312
          helper(child, result)
313

    
314
    result = []
315
    for instance in self._config_data.instances.values():
316
      for disk in instance.disks:
317
        helper(disk, result)
318

    
319
    return result
320

    
321
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
322
    """Compute duplicate disk IDs
323

324
    @type disk: L{objects.Disk}
325
    @param disk: the disk at which to start searching
326
    @type l_ids: list
327
    @param l_ids: list of current logical ids
328
    @type p_ids: list
329
    @param p_ids: list of current physical ids
330
    @rtype: list
331
    @return: a list of error messages
332

333
    """
334
    result = []
335
    if disk.logical_id is not None:
336
      if disk.logical_id in l_ids:
337
        result.append("duplicate logical id %s" % str(disk.logical_id))
338
      else:
339
        l_ids.append(disk.logical_id)
340
    if disk.physical_id is not None:
341
      if disk.physical_id in p_ids:
342
        result.append("duplicate physical id %s" % str(disk.physical_id))
343
      else:
344
        p_ids.append(disk.physical_id)
345

    
346
    if disk.children:
347
      for child in disk.children:
348
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
349
    return result
350

    
351
  def _UnlockedVerifyConfig(self):
352
    """Verify function.
353

354
    @rtype: list
355
    @return: a list of error messages; a non-empty list signifies
356
        configuration errors
357

358
    """
359
    result = []
360
    seen_macs = []
361
    ports = {}
362
    data = self._config_data
363
    seen_lids = []
364
    seen_pids = []
365

    
366
    # global cluster checks
367
    if not data.cluster.enabled_hypervisors:
368
      result.append("enabled hypervisors list doesn't have any entries")
369
    invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
370
    if invalid_hvs:
371
      result.append("enabled hypervisors contains invalid entries: %s" %
372
                    invalid_hvs)
373
    missing_hvp = (set(data.cluster.enabled_hypervisors) -
374
                   set(data.cluster.hvparams.keys()))
375
    if missing_hvp:
376
      result.append("hypervisor parameters missing for the enabled"
377
                    " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
378

    
379
    if data.cluster.master_node not in data.nodes:
380
      result.append("cluster has invalid primary node '%s'" %
381
                    data.cluster.master_node)
382

    
383
    # per-instance checks
384
    for instance_name in data.instances:
385
      instance = data.instances[instance_name]
386
      if instance.name != instance_name:
387
        result.append("instance '%s' is indexed by wrong name '%s'" %
388
                      (instance.name, instance_name))
389
      if instance.primary_node not in data.nodes:
390
        result.append("instance '%s' has invalid primary node '%s'" %
391
                      (instance_name, instance.primary_node))
392
      for snode in instance.secondary_nodes:
393
        if snode not in data.nodes:
394
          result.append("instance '%s' has invalid secondary node '%s'" %
395
                        (instance_name, snode))
396
      for idx, nic in enumerate(instance.nics):
397
        if nic.mac in seen_macs:
398
          result.append("instance '%s' has NIC %d mac %s duplicate" %
399
                        (instance_name, idx, nic.mac))
400
        else:
401
          seen_macs.append(nic.mac)
402

    
403
      # gather the drbd ports for duplicate checks
404
      for dsk in instance.disks:
405
        if dsk.dev_type in constants.LDS_DRBD:
406
          tcp_port = dsk.logical_id[2]
407
          if tcp_port not in ports:
408
            ports[tcp_port] = []
409
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
410
      # gather network port reservation
411
      net_port = getattr(instance, "network_port", None)
412
      if net_port is not None:
413
        if net_port not in ports:
414
          ports[net_port] = []
415
        ports[net_port].append((instance.name, "network port"))
416

    
417
      # instance disk verify
418
      for idx, disk in enumerate(instance.disks):
419
        result.extend(["instance '%s' disk %d error: %s" %
420
                       (instance.name, idx, msg) for msg in disk.Verify()])
421
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
422

    
423
    # cluster-wide pool of free ports
424
    for free_port in data.cluster.tcpudp_port_pool:
425
      if free_port not in ports:
426
        ports[free_port] = []
427
      ports[free_port].append(("cluster", "port marked as free"))
428

    
429
    # compute tcp/udp duplicate ports
430
    keys = ports.keys()
431
    keys.sort()
432
    for pnum in keys:
433
      pdata = ports[pnum]
434
      if len(pdata) > 1:
435
        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
436
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
437

    
438
    # highest used tcp port check
439
    if keys:
440
      if keys[-1] > data.cluster.highest_used_port:
441
        result.append("Highest used port mismatch, saved %s, computed %s" %
442
                      (data.cluster.highest_used_port, keys[-1]))
443

    
444
    if not data.nodes[data.cluster.master_node].master_candidate:
445
      result.append("Master node is not a master candidate")
446

    
447
    # master candidate checks
448
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
449
    if mc_now < mc_max:
450
      result.append("Not enough master candidates: actual %d, target %d" %
451
                    (mc_now, mc_max))
452

    
453
    # node checks
454
    for node_name, node in data.nodes.items():
455
      if node.name != node_name:
456
        result.append("Node '%s' is indexed by wrong name '%s'" %
457
                      (node.name, node_name))
458
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
459
        result.append("Node %s state is invalid: master_candidate=%s,"
460
                      " drain=%s, offline=%s" %
461
                      (node.name, node.master_candidate, node.drain,
462
                       node.offline))
463

    
464
    # nodegroups checks
465
    nodegroups_names = set()
466
    for nodegroup_uuid in data.nodegroups:
467
      nodegroup = data.nodegroups[nodegroup_uuid]
468
      if nodegroup.uuid != nodegroup_uuid:
469
        result.append("nodegroup '%s' (uuid: '%s') indexed by wrong uuid '%s'"
470
                      % (nodegroup.name, nodegroup.uuid, nodegroup_uuid))
471
      if nodegroup.name in nodegroups_names:
472
        result.append("duplicate nodegroup name '%s'" % nodegroup.name)
473
      else:
474
        nodegroups_names.add(nodegroup.name)
475

    
476
    # drbd minors check
477
    _, duplicates = self._UnlockedComputeDRBDMap()
478
    for node, minor, instance_a, instance_b in duplicates:
479
      result.append("DRBD minor %d on node %s is assigned twice to instances"
480
                    " %s and %s" % (minor, node, instance_a, instance_b))
481

    
482
    # IP checks
483
    default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
484
    ips = {}
485

    
486
    def _AddIpAddress(ip, name):
487
      ips.setdefault(ip, []).append(name)
488

    
489
    _AddIpAddress(data.cluster.master_ip, "cluster_ip")
490

    
491
    for node in data.nodes.values():
492
      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
493
      if node.secondary_ip != node.primary_ip:
494
        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
495

    
496
    for instance in data.instances.values():
497
      for idx, nic in enumerate(instance.nics):
498
        if nic.ip is None:
499
          continue
500

    
501
        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
502
        nic_mode = nicparams[constants.NIC_MODE]
503
        nic_link = nicparams[constants.NIC_LINK]
504

    
505
        if nic_mode == constants.NIC_MODE_BRIDGED:
506
          link = "bridge:%s" % nic_link
507
        elif nic_mode == constants.NIC_MODE_ROUTED:
508
          link = "route:%s" % nic_link
509
        else:
510
          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
511

    
512
        _AddIpAddress("%s/%s" % (link, nic.ip),
513
                      "instance:%s/nic:%d" % (instance.name, idx))
514

    
515
    for ip, owners in ips.items():
516
      if len(owners) > 1:
517
        result.append("IP address %s is used by multiple owners: %s" %
518
                      (ip, utils.CommaJoin(owners)))
519

    
520
    return result
521

    
522
  @locking.ssynchronized(_config_lock, shared=1)
523
  def VerifyConfig(self):
524
    """Verify function.
525

526
    This is just a wrapper over L{_UnlockedVerifyConfig}.
527

528
    @rtype: list
529
    @return: a list of error messages; a non-empty list signifies
530
        configuration errors
531

532
    """
533
    return self._UnlockedVerifyConfig()
534

    
535
  def _UnlockedSetDiskID(self, disk, node_name):
536
    """Convert the unique ID to the ID needed on the target nodes.
537

538
    This is used only for drbd, which needs ip/port configuration.
539

540
    The routine descends down and updates its children also, because
541
    this helps when the only the top device is passed to the remote
542
    node.
543

544
    This function is for internal use, when the config lock is already held.
545

546
    """
547
    if disk.children:
548
      for child in disk.children:
549
        self._UnlockedSetDiskID(child, node_name)
550

    
551
    if disk.logical_id is None and disk.physical_id is not None:
552
      return
553
    if disk.dev_type == constants.LD_DRBD8:
554
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
555
      if node_name not in (pnode, snode):
556
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
557
                                        node_name)
558
      pnode_info = self._UnlockedGetNodeInfo(pnode)
559
      snode_info = self._UnlockedGetNodeInfo(snode)
560
      if pnode_info is None or snode_info is None:
561
        raise errors.ConfigurationError("Can't find primary or secondary node"
562
                                        " for %s" % str(disk))
563
      p_data = (pnode_info.secondary_ip, port)
564
      s_data = (snode_info.secondary_ip, port)
565
      if pnode == node_name:
566
        disk.physical_id = p_data + s_data + (pminor, secret)
567
      else: # it must be secondary, we tested above
568
        disk.physical_id = s_data + p_data + (sminor, secret)
569
    else:
570
      disk.physical_id = disk.logical_id
571
    return
572

    
573
  @locking.ssynchronized(_config_lock)
574
  def SetDiskID(self, disk, node_name):
575
    """Convert the unique ID to the ID needed on the target nodes.
576

577
    This is used only for drbd, which needs ip/port configuration.
578

579
    The routine descends down and updates its children also, because
580
    this helps when the only the top device is passed to the remote
581
    node.
582

583
    """
584
    return self._UnlockedSetDiskID(disk, node_name)
585

    
586
  @locking.ssynchronized(_config_lock)
587
  def AddTcpUdpPort(self, port):
588
    """Adds a new port to the available port pool.
589

590
    """
591
    if not isinstance(port, int):
592
      raise errors.ProgrammerError("Invalid type passed for port")
593

    
594
    self._config_data.cluster.tcpudp_port_pool.add(port)
595
    self._WriteConfig()
596

    
597
  @locking.ssynchronized(_config_lock, shared=1)
598
  def GetPortList(self):
599
    """Returns a copy of the current port list.
600

601
    """
602
    return self._config_data.cluster.tcpudp_port_pool.copy()
603

    
604
  @locking.ssynchronized(_config_lock)
605
  def AllocatePort(self):
606
    """Allocate a port.
607

608
    The port will be taken from the available port pool or from the
609
    default port range (and in this case we increase
610
    highest_used_port).
611

612
    """
613
    # If there are TCP/IP ports configured, we use them first.
614
    if self._config_data.cluster.tcpudp_port_pool:
615
      port = self._config_data.cluster.tcpudp_port_pool.pop()
616
    else:
617
      port = self._config_data.cluster.highest_used_port + 1
618
      if port >= constants.LAST_DRBD_PORT:
619
        raise errors.ConfigurationError("The highest used port is greater"
620
                                        " than %s. Aborting." %
621
                                        constants.LAST_DRBD_PORT)
622
      self._config_data.cluster.highest_used_port = port
623

    
624
    self._WriteConfig()
625
    return port
626

    
627
  def _UnlockedComputeDRBDMap(self):
628
    """Compute the used DRBD minor/nodes.
629

630
    @rtype: (dict, list)
631
    @return: dictionary of node_name: dict of minor: instance_name;
632
        the returned dict will have all the nodes in it (even if with
633
        an empty list), and a list of duplicates; if the duplicates
634
        list is not empty, the configuration is corrupted and its caller
635
        should raise an exception
636

637
    """
638
    def _AppendUsedPorts(instance_name, disk, used):
639
      duplicates = []
640
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
641
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
642
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
643
          assert node in used, ("Node '%s' of instance '%s' not found"
644
                                " in node list" % (node, instance_name))
645
          if port in used[node]:
646
            duplicates.append((node, port, instance_name, used[node][port]))
647
          else:
648
            used[node][port] = instance_name
649
      if disk.children:
650
        for child in disk.children:
651
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
652
      return duplicates
653

    
654
    duplicates = []
655
    my_dict = dict((node, {}) for node in self._config_data.nodes)
656
    for instance in self._config_data.instances.itervalues():
657
      for disk in instance.disks:
658
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
659
    for (node, minor), instance in self._temporary_drbds.iteritems():
660
      if minor in my_dict[node] and my_dict[node][minor] != instance:
661
        duplicates.append((node, minor, instance, my_dict[node][minor]))
662
      else:
663
        my_dict[node][minor] = instance
664
    return my_dict, duplicates
665

    
666
  @locking.ssynchronized(_config_lock)
667
  def ComputeDRBDMap(self):
668
    """Compute the used DRBD minor/nodes.
669

670
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
671

672
    @return: dictionary of node_name: dict of minor: instance_name;
673
        the returned dict will have all the nodes in it (even if with
674
        an empty list).
675

676
    """
677
    d_map, duplicates = self._UnlockedComputeDRBDMap()
678
    if duplicates:
679
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
680
                                      str(duplicates))
681
    return d_map
682

    
683
  @locking.ssynchronized(_config_lock)
684
  def AllocateDRBDMinor(self, nodes, instance):
685
    """Allocate a drbd minor.
686

687
    The free minor will be automatically computed from the existing
688
    devices. A node can be given multiple times in order to allocate
689
    multiple minors. The result is the list of minors, in the same
690
    order as the passed nodes.
691

692
    @type instance: string
693
    @param instance: the instance for which we allocate minors
694

695
    """
696
    assert isinstance(instance, basestring), \
697
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
698

    
699
    d_map, duplicates = self._UnlockedComputeDRBDMap()
700
    if duplicates:
701
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
702
                                      str(duplicates))
703
    result = []
704
    for nname in nodes:
705
      ndata = d_map[nname]
706
      if not ndata:
707
        # no minors used, we can start at 0
708
        result.append(0)
709
        ndata[0] = instance
710
        self._temporary_drbds[(nname, 0)] = instance
711
        continue
712
      keys = ndata.keys()
713
      keys.sort()
714
      ffree = utils.FirstFree(keys)
715
      if ffree is None:
716
        # return the next minor
717
        # TODO: implement high-limit check
718
        minor = keys[-1] + 1
719
      else:
720
        minor = ffree
721
      # double-check minor against current instances
722
      assert minor not in d_map[nname], \
723
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
724
              " already allocated to instance %s" %
725
              (minor, nname, d_map[nname][minor]))
726
      ndata[minor] = instance
727
      # double-check minor against reservation
728
      r_key = (nname, minor)
729
      assert r_key not in self._temporary_drbds, \
730
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
731
              " reserved for instance %s" %
732
              (minor, nname, self._temporary_drbds[r_key]))
733
      self._temporary_drbds[r_key] = instance
734
      result.append(minor)
735
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
736
                  nodes, result)
737
    return result
738

    
739
  def _UnlockedReleaseDRBDMinors(self, instance):
740
    """Release temporary drbd minors allocated for a given instance.
741

742
    @type instance: string
743
    @param instance: the instance for which temporary minors should be
744
                     released
745

746
    """
747
    assert isinstance(instance, basestring), \
748
           "Invalid argument passed to ReleaseDRBDMinors"
749
    for key, name in self._temporary_drbds.items():
750
      if name == instance:
751
        del self._temporary_drbds[key]
752

    
753
  @locking.ssynchronized(_config_lock)
754
  def ReleaseDRBDMinors(self, instance):
755
    """Release temporary drbd minors allocated for a given instance.
756

757
    This should be called on the error paths, on the success paths
758
    it's automatically called by the ConfigWriter add and update
759
    functions.
760

761
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
762

763
    @type instance: string
764
    @param instance: the instance for which temporary minors should be
765
                     released
766

767
    """
768
    self._UnlockedReleaseDRBDMinors(instance)
769

    
770
  @locking.ssynchronized(_config_lock, shared=1)
771
  def GetConfigVersion(self):
772
    """Get the configuration version.
773

774
    @return: Config version
775

776
    """
777
    return self._config_data.version
778

    
779
  @locking.ssynchronized(_config_lock, shared=1)
780
  def GetClusterName(self):
781
    """Get cluster name.
782

783
    @return: Cluster name
784

785
    """
786
    return self._config_data.cluster.cluster_name
787

    
788
  @locking.ssynchronized(_config_lock, shared=1)
789
  def GetMasterNode(self):
790
    """Get the hostname of the master node for this cluster.
791

792
    @return: Master hostname
793

794
    """
795
    return self._config_data.cluster.master_node
796

    
797
  @locking.ssynchronized(_config_lock, shared=1)
798
  def GetMasterIP(self):
799
    """Get the IP of the master node for this cluster.
800

801
    @return: Master IP
802

803
    """
804
    return self._config_data.cluster.master_ip
805

    
806
  @locking.ssynchronized(_config_lock, shared=1)
807
  def GetMasterNetdev(self):
808
    """Get the master network device for this cluster.
809

810
    """
811
    return self._config_data.cluster.master_netdev
812

    
813
  @locking.ssynchronized(_config_lock, shared=1)
814
  def GetFileStorageDir(self):
815
    """Get the file storage dir for this cluster.
816

817
    """
818
    return self._config_data.cluster.file_storage_dir
819

    
820
  @locking.ssynchronized(_config_lock, shared=1)
821
  def GetHypervisorType(self):
822
    """Get the hypervisor type for this cluster.
823

824
    """
825
    return self._config_data.cluster.enabled_hypervisors[0]
826

    
827
  @locking.ssynchronized(_config_lock, shared=1)
828
  def GetHostKey(self):
829
    """Return the rsa hostkey from the config.
830

831
    @rtype: string
832
    @return: the rsa hostkey
833

834
    """
835
    return self._config_data.cluster.rsahostkeypub
836

    
837
  @locking.ssynchronized(_config_lock, shared=1)
838
  def GetDefaultIAllocator(self):
839
    """Get the default instance allocator for this cluster.
840

841
    """
842
    return self._config_data.cluster.default_iallocator
843

    
844
  @locking.ssynchronized(_config_lock, shared=1)
845
  def GetPrimaryIPFamily(self):
846
    """Get cluster primary ip family.
847

848
    @return: primary ip family
849

850
    """
851
    return self._config_data.cluster.primary_ip_family
852

    
853
  @locking.ssynchronized(_config_lock, shared=1)
854
  def LookupNodeGroup(self, target):
855
    """Lookup a node group.
856

857
    @type target: string or None
858
    @param  target: group name or uuid or None to look for the default
859
    @rtype: string
860
    @return: nodegroup uuid
861
    @raises errors.OpPrereqError: when the target group cannot be found
862

863
    """
864
    if target is None:
865
      if len(self._config_data.nodegroups) != 1:
866
        raise errors.OpPrereqError("More than one nodegroup exists. Target"
867
                                   " group must be specified explicitely.")
868
      else:
869
        return self._config_data.nodegroups.keys()[0]
870
    if target in self._config_data.nodegroups:
871
      return target
872
    for nodegroup in self._config_data.nodegroups.values():
873
      if nodegroup.name == target:
874
        return nodegroup.uuid
875
    raise errors.OpPrereqError("Nodegroup '%s' not found", target)
876

    
877
  @locking.ssynchronized(_config_lock)
878
  def AddInstance(self, instance, ec_id):
879
    """Add an instance to the config.
880

881
    This should be used after creating a new instance.
882

883
    @type instance: L{objects.Instance}
884
    @param instance: the instance object
885

886
    """
887
    if not isinstance(instance, objects.Instance):
888
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
889

    
890
    if instance.disk_template != constants.DT_DISKLESS:
891
      all_lvs = instance.MapLVsByNode()
892
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
893

    
894
    all_macs = self._AllMACs()
895
    for nic in instance.nics:
896
      if nic.mac in all_macs:
897
        raise errors.ConfigurationError("Cannot add instance %s:"
898
                                        " MAC address '%s' already in use." %
899
                                        (instance.name, nic.mac))
900

    
901
    self._EnsureUUID(instance, ec_id)
902

    
903
    instance.serial_no = 1
904
    instance.ctime = instance.mtime = time.time()
905
    self._config_data.instances[instance.name] = instance
906
    self._config_data.cluster.serial_no += 1
907
    self._UnlockedReleaseDRBDMinors(instance.name)
908
    self._WriteConfig()
909

    
910
  def _EnsureUUID(self, item, ec_id):
911
    """Ensures a given object has a valid UUID.
912

913
    @param item: the instance or node to be checked
914
    @param ec_id: the execution context id for the uuid reservation
915

916
    """
917
    if not item.uuid:
918
      item.uuid = self._GenerateUniqueID(ec_id)
919
    elif item.uuid in self._AllIDs(include_temporary=True):
920
      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
921
                                      " in use" % (item.name, item.uuid))
922

    
923
  def _SetInstanceStatus(self, instance_name, status):
924
    """Set the instance's status to a given value.
925

926
    """
927
    assert isinstance(status, bool), \
928
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
929

    
930
    if instance_name not in self._config_data.instances:
931
      raise errors.ConfigurationError("Unknown instance '%s'" %
932
                                      instance_name)
933
    instance = self._config_data.instances[instance_name]
934
    if instance.admin_up != status:
935
      instance.admin_up = status
936
      instance.serial_no += 1
937
      instance.mtime = time.time()
938
      self._WriteConfig()
939

    
940
  @locking.ssynchronized(_config_lock)
941
  def MarkInstanceUp(self, instance_name):
942
    """Mark the instance status to up in the config.
943

944
    """
945
    self._SetInstanceStatus(instance_name, True)
946

    
947
  @locking.ssynchronized(_config_lock)
948
  def RemoveInstance(self, instance_name):
949
    """Remove the instance from the configuration.
950

951
    """
952
    if instance_name not in self._config_data.instances:
953
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
954
    del self._config_data.instances[instance_name]
955
    self._config_data.cluster.serial_no += 1
956
    self._WriteConfig()
957

    
958
  @locking.ssynchronized(_config_lock)
959
  def RenameInstance(self, old_name, new_name):
960
    """Rename an instance.
961

962
    This needs to be done in ConfigWriter and not by RemoveInstance
963
    combined with AddInstance as only we can guarantee an atomic
964
    rename.
965

966
    """
967
    if old_name not in self._config_data.instances:
968
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
969
    inst = self._config_data.instances[old_name]
970
    del self._config_data.instances[old_name]
971
    inst.name = new_name
972

    
973
    for disk in inst.disks:
974
      if disk.dev_type == constants.LD_FILE:
975
        # rename the file paths in logical and physical id
976
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
977
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
978
                                              utils.PathJoin(file_storage_dir,
979
                                                             inst.name,
980
                                                             disk.iv_name))
981

    
982
    self._config_data.instances[inst.name] = inst
983
    self._WriteConfig()
984

    
985
  @locking.ssynchronized(_config_lock)
986
  def MarkInstanceDown(self, instance_name):
987
    """Mark the status of an instance to down in the configuration.
988

989
    """
990
    self._SetInstanceStatus(instance_name, False)
991

    
992
  def _UnlockedGetInstanceList(self):
993
    """Get the list of instances.
994

995
    This function is for internal use, when the config lock is already held.
996

997
    """
998
    return self._config_data.instances.keys()
999

    
1000
  @locking.ssynchronized(_config_lock, shared=1)
1001
  def GetInstanceList(self):
1002
    """Get the list of instances.
1003

1004
    @return: array of instances, ex. ['instance2.example.com',
1005
        'instance1.example.com']
1006

1007
    """
1008
    return self._UnlockedGetInstanceList()
1009

    
1010
  @locking.ssynchronized(_config_lock, shared=1)
1011
  def ExpandInstanceName(self, short_name):
1012
    """Attempt to expand an incomplete instance name.
1013

1014
    """
1015
    return utils.MatchNameComponent(short_name,
1016
                                    self._config_data.instances.keys(),
1017
                                    case_sensitive=False)
1018

    
1019
  def _UnlockedGetInstanceInfo(self, instance_name):
1020
    """Returns information about an instance.
1021

1022
    This function is for internal use, when the config lock is already held.
1023

1024
    """
1025
    if instance_name not in self._config_data.instances:
1026
      return None
1027

    
1028
    return self._config_data.instances[instance_name]
1029

    
1030
  @locking.ssynchronized(_config_lock, shared=1)
1031
  def GetInstanceInfo(self, instance_name):
1032
    """Returns information about an instance.
1033

1034
    It takes the information from the configuration file. Other information of
1035
    an instance are taken from the live systems.
1036

1037
    @param instance_name: name of the instance, e.g.
1038
        I{instance1.example.com}
1039

1040
    @rtype: L{objects.Instance}
1041
    @return: the instance object
1042

1043
    """
1044
    return self._UnlockedGetInstanceInfo(instance_name)
1045

    
1046
  @locking.ssynchronized(_config_lock, shared=1)
1047
  def GetAllInstancesInfo(self):
1048
    """Get the configuration of all instances.
1049

1050
    @rtype: dict
1051
    @return: dict of (instance, instance_info), where instance_info is what
1052
              would GetInstanceInfo return for the node
1053

1054
    """
1055
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1056
                    for instance in self._UnlockedGetInstanceList()])
1057
    return my_dict
1058

    
1059
  @locking.ssynchronized(_config_lock)
1060
  def AddNode(self, node, ec_id):
1061
    """Add a node to the configuration.
1062

1063
    @type node: L{objects.Node}
1064
    @param node: a Node instance
1065

1066
    """
1067
    logging.info("Adding node %s to configuration", node.name)
1068

    
1069
    self._EnsureUUID(node, ec_id)
1070

    
1071
    node.serial_no = 1
1072
    node.ctime = node.mtime = time.time()
1073
    self._UnlockedAddNodeToGroup(node.name, node.nodegroup)
1074
    self._config_data.nodes[node.name] = node
1075
    self._config_data.cluster.serial_no += 1
1076
    self._WriteConfig()
1077

    
1078
  @locking.ssynchronized(_config_lock)
1079
  def RemoveNode(self, node_name):
1080
    """Remove a node from the configuration.
1081

1082
    """
1083
    logging.info("Removing node %s from configuration", node_name)
1084

    
1085
    if node_name not in self._config_data.nodes:
1086
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1087

    
1088
    self._UnlockedRemoveNodeFromGroup(self._config_data.nodes[node_name])
1089
    del self._config_data.nodes[node_name]
1090
    self._config_data.cluster.serial_no += 1
1091
    self._WriteConfig()
1092

    
1093
  @locking.ssynchronized(_config_lock, shared=1)
1094
  def ExpandNodeName(self, short_name):
1095
    """Attempt to expand an incomplete instance name.
1096

1097
    """
1098
    return utils.MatchNameComponent(short_name,
1099
                                    self._config_data.nodes.keys(),
1100
                                    case_sensitive=False)
1101

    
1102
  def _UnlockedGetNodeInfo(self, node_name):
1103
    """Get the configuration of a node, as stored in the config.
1104

1105
    This function is for internal use, when the config lock is already
1106
    held.
1107

1108
    @param node_name: the node name, e.g. I{node1.example.com}
1109

1110
    @rtype: L{objects.Node}
1111
    @return: the node object
1112

1113
    """
1114
    if node_name not in self._config_data.nodes:
1115
      return None
1116

    
1117
    return self._config_data.nodes[node_name]
1118

    
1119
  @locking.ssynchronized(_config_lock, shared=1)
1120
  def GetNodeInfo(self, node_name):
1121
    """Get the configuration of a node, as stored in the config.
1122

1123
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1124

1125
    @param node_name: the node name, e.g. I{node1.example.com}
1126

1127
    @rtype: L{objects.Node}
1128
    @return: the node object
1129

1130
    """
1131
    return self._UnlockedGetNodeInfo(node_name)
1132

    
1133
  def _UnlockedGetNodeList(self):
1134
    """Return the list of nodes which are in the configuration.
1135

1136
    This function is for internal use, when the config lock is already
1137
    held.
1138

1139
    @rtype: list
1140

1141
    """
1142
    return self._config_data.nodes.keys()
1143

    
1144
  @locking.ssynchronized(_config_lock, shared=1)
1145
  def GetNodeList(self):
1146
    """Return the list of nodes which are in the configuration.
1147

1148
    """
1149
    return self._UnlockedGetNodeList()
1150

    
1151
  def _UnlockedGetOnlineNodeList(self):
1152
    """Return the list of nodes which are online.
1153

1154
    """
1155
    all_nodes = [self._UnlockedGetNodeInfo(node)
1156
                 for node in self._UnlockedGetNodeList()]
1157
    return [node.name for node in all_nodes if not node.offline]
1158

    
1159
  @locking.ssynchronized(_config_lock, shared=1)
1160
  def GetOnlineNodeList(self):
1161
    """Return the list of nodes which are online.
1162

1163
    """
1164
    return self._UnlockedGetOnlineNodeList()
1165

    
1166
  @locking.ssynchronized(_config_lock, shared=1)
1167
  def GetAllNodesInfo(self):
1168
    """Get the configuration of all nodes.
1169

1170
    @rtype: dict
1171
    @return: dict of (node, node_info), where node_info is what
1172
              would GetNodeInfo return for the node
1173

1174
    """
1175
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1176
                    for node in self._UnlockedGetNodeList()])
1177
    return my_dict
1178

    
1179
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1180
    """Get the number of current and maximum desired and possible candidates.
1181

1182
    @type exceptions: list
1183
    @param exceptions: if passed, list of nodes that should be ignored
1184
    @rtype: tuple
1185
    @return: tuple of (current, desired and possible, possible)
1186

1187
    """
1188
    mc_now = mc_should = mc_max = 0
1189
    for node in self._config_data.nodes.values():
1190
      if exceptions and node.name in exceptions:
1191
        continue
1192
      if not (node.offline or node.drained):
1193
        mc_max += 1
1194
      if node.master_candidate:
1195
        mc_now += 1
1196
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1197
    return (mc_now, mc_should, mc_max)
1198

    
1199
  @locking.ssynchronized(_config_lock, shared=1)
1200
  def GetMasterCandidateStats(self, exceptions=None):
1201
    """Get the number of current and maximum possible candidates.
1202

1203
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1204

1205
    @type exceptions: list
1206
    @param exceptions: if passed, list of nodes that should be ignored
1207
    @rtype: tuple
1208
    @return: tuple of (current, max)
1209

1210
    """
1211
    return self._UnlockedGetMasterCandidateStats(exceptions)
1212

    
1213
  @locking.ssynchronized(_config_lock)
1214
  def MaintainCandidatePool(self, exceptions):
1215
    """Try to grow the candidate pool to the desired size.
1216

1217
    @type exceptions: list
1218
    @param exceptions: if passed, list of nodes that should be ignored
1219
    @rtype: list
1220
    @return: list with the adjusted nodes (L{objects.Node} instances)
1221

1222
    """
1223
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1224
    mod_list = []
1225
    if mc_now < mc_max:
1226
      node_list = self._config_data.nodes.keys()
1227
      random.shuffle(node_list)
1228
      for name in node_list:
1229
        if mc_now >= mc_max:
1230
          break
1231
        node = self._config_data.nodes[name]
1232
        if (node.master_candidate or node.offline or node.drained or
1233
            node.name in exceptions):
1234
          continue
1235
        mod_list.append(node)
1236
        node.master_candidate = True
1237
        node.serial_no += 1
1238
        mc_now += 1
1239
      if mc_now != mc_max:
1240
        # this should not happen
1241
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1242
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1243
      if mod_list:
1244
        self._config_data.cluster.serial_no += 1
1245
        self._WriteConfig()
1246

    
1247
    return mod_list
1248

    
1249
  def _UnlockedAddNodeToGroup(self, node_name, nodegroup_uuid):
1250
    """Add a given node to the specified group.
1251

1252
    """
1253
    if nodegroup_uuid not in self._config_data.nodegroups:
1254
      # This can happen if a node group gets deleted between its lookup and
1255
      # when we're adding the first node to it, since we don't keep a lock in
1256
      # the meantime. It's ok though, as we'll fail cleanly if the node group
1257
      # is not found anymore.
1258
      raise errors.OpExecError("Unknown nodegroup: %s" % nodegroup_uuid)
1259
    if node_name not in self._config_data.nodegroups[nodegroup_uuid].members:
1260
      self._config_data.nodegroups[nodegroup_uuid].members.append(node_name)
1261

    
1262
  def _UnlockedRemoveNodeFromGroup(self, node):
1263
    """Remove a given node from its group.
1264

1265
    """
1266
    nodegroup = node.nodegroup
1267
    if nodegroup not in self._config_data.nodegroups:
1268
      logging.warning("Warning: node '%s' has a non-existing nodegroup '%s'"
1269
                      " (while being removed from it)", node.name, nodegroup)
1270
    nodegroup_obj = self._config_data.nodegroups[nodegroup]
1271
    if node.name not in nodegroup_obj.members:
1272
      logging.warning("Warning: node '%s' not a member of its nodegroup '%s'"
1273
                      " (while being removed from it)", node.name, nodegroup)
1274
    else:
1275
      nodegroup_obj.members.remove(node.name)
1276

    
1277
  def _BumpSerialNo(self):
1278
    """Bump up the serial number of the config.
1279

1280
    """
1281
    self._config_data.serial_no += 1
1282
    self._config_data.mtime = time.time()
1283

    
1284
  def _AllUUIDObjects(self):
1285
    """Returns all objects with uuid attributes.
1286

1287
    """
1288
    return (self._config_data.instances.values() +
1289
            self._config_data.nodes.values() +
1290
            self._config_data.nodegroups.values() +
1291
            [self._config_data.cluster])
1292

    
1293
  def _OpenConfig(self):
1294
    """Read the config data from disk.
1295

1296
    """
1297
    raw_data = utils.ReadFile(self._cfg_file)
1298

    
1299
    try:
1300
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1301
    except Exception, err:
1302
      raise errors.ConfigurationError(err)
1303

    
1304
    # Make sure the configuration has the right version
1305
    _ValidateConfig(data)
1306

    
1307
    if (not hasattr(data, 'cluster') or
1308
        not hasattr(data.cluster, 'rsahostkeypub')):
1309
      raise errors.ConfigurationError("Incomplete configuration"
1310
                                      " (missing cluster.rsahostkeypub)")
1311

    
1312
    # Upgrade configuration if needed
1313
    data.UpgradeConfig()
1314

    
1315
    self._config_data = data
1316
    # reset the last serial as -1 so that the next write will cause
1317
    # ssconf update
1318
    self._last_cluster_serial = -1
1319

    
1320
    # And finally run our (custom) config upgrade sequence
1321
    self._UpgradeConfig()
1322

    
1323
  def _UpgradeConfig(self):
1324
    """Run upgrade steps that cannot be done purely in the objects.
1325

1326
    This is because some data elements need uniqueness across the
1327
    whole configuration, etc.
1328

1329
    @warning: this function will call L{_WriteConfig()}, but also
1330
        L{DropECReservations} so it needs to be called only from a
1331
        "safe" place (the constructor). If one wanted to call it with
1332
        the lock held, a DropECReservationUnlocked would need to be
1333
        created first, to avoid causing deadlock.
1334

1335
    """
1336
    modified = False
1337
    for item in self._AllUUIDObjects():
1338
      if item.uuid is None:
1339
        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1340
        modified = True
1341
    if not self._config_data.nodegroups:
1342
      default_nodegroup_uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1343
      default_nodegroup = objects.NodeGroup(
1344
          uuid=default_nodegroup_uuid,
1345
          name="default",
1346
          members=[],
1347
          )
1348
      self._config_data.nodegroups[default_nodegroup_uuid] = default_nodegroup
1349
      modified = True
1350
    for node in self._config_data.nodes.values():
1351
      if not node.nodegroup:
1352
        node.nodegroup = self.LookupNodeGroup(None)
1353
        modified = True
1354
      # This is technically *not* an upgrade, but needs to be done both when
1355
      # nodegroups are being added, and upon normally loading the config,
1356
      # because the members list of a node group is discarded upon
1357
      # serializing/deserializing the object.
1358
      self._UnlockedAddNodeToGroup(node.name, node.nodegroup)
1359
    if modified:
1360
      self._WriteConfig()
1361
      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1362
      # only called at config init time, without the lock held
1363
      self.DropECReservations(_UPGRADE_CONFIG_JID)
1364

    
1365
  def _DistributeConfig(self, feedback_fn):
1366
    """Distribute the configuration to the other nodes.
1367

1368
    Currently, this only copies the configuration file. In the future,
1369
    it could be used to encapsulate the 2/3-phase update mechanism.
1370

1371
    """
1372
    if self._offline:
1373
      return True
1374

    
1375
    bad = False
1376

    
1377
    node_list = []
1378
    addr_list = []
1379
    myhostname = self._my_hostname
1380
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1381
    # since the node list comes from _UnlocketGetNodeList, and we are
1382
    # called with the lock held, so no modifications should take place
1383
    # in between
1384
    for node_name in self._UnlockedGetNodeList():
1385
      if node_name == myhostname:
1386
        continue
1387
      node_info = self._UnlockedGetNodeInfo(node_name)
1388
      if not node_info.master_candidate:
1389
        continue
1390
      node_list.append(node_info.name)
1391
      addr_list.append(node_info.primary_ip)
1392

    
1393
    result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1394
                                            address_list=addr_list)
1395
    for to_node, to_result in result.items():
1396
      msg = to_result.fail_msg
1397
      if msg:
1398
        msg = ("Copy of file %s to node %s failed: %s" %
1399
               (self._cfg_file, to_node, msg))
1400
        logging.error(msg)
1401

    
1402
        if feedback_fn:
1403
          feedback_fn(msg)
1404

    
1405
        bad = True
1406

    
1407
    return not bad
1408

    
1409
  def _WriteConfig(self, destination=None, feedback_fn=None):
1410
    """Write the configuration data to persistent storage.
1411

1412
    """
1413
    assert feedback_fn is None or callable(feedback_fn)
1414

    
1415
    # Warn on config errors, but don't abort the save - the
1416
    # configuration has already been modified, and we can't revert;
1417
    # the best we can do is to warn the user and save as is, leaving
1418
    # recovery to the user
1419
    config_errors = self._UnlockedVerifyConfig()
1420
    if config_errors:
1421
      errmsg = ("Configuration data is not consistent: %s" %
1422
                (utils.CommaJoin(config_errors)))
1423
      logging.critical(errmsg)
1424
      if feedback_fn:
1425
        feedback_fn(errmsg)
1426

    
1427
    if destination is None:
1428
      destination = self._cfg_file
1429
    self._BumpSerialNo()
1430
    txt = serializer.Dump(self._config_data.ToDict())
1431

    
1432
    getents = self._getents()
1433
    utils.WriteFile(destination, data=txt, gid=getents.confd_gid, mode=0640)
1434

    
1435
    self.write_count += 1
1436

    
1437
    # and redistribute the config file to master candidates
1438
    self._DistributeConfig(feedback_fn)
1439

    
1440
    # Write ssconf files on all nodes (including locally)
1441
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1442
      if not self._offline:
1443
        result = rpc.RpcRunner.call_write_ssconf_files(
1444
          self._UnlockedGetOnlineNodeList(),
1445
          self._UnlockedGetSsconfValues())
1446

    
1447
        for nname, nresu in result.items():
1448
          msg = nresu.fail_msg
1449
          if msg:
1450
            errmsg = ("Error while uploading ssconf files to"
1451
                      " node %s: %s" % (nname, msg))
1452
            logging.warning(errmsg)
1453

    
1454
            if feedback_fn:
1455
              feedback_fn(errmsg)
1456

    
1457
      self._last_cluster_serial = self._config_data.cluster.serial_no
1458

    
1459
  def _UnlockedGetSsconfValues(self):
1460
    """Return the values needed by ssconf.
1461

1462
    @rtype: dict
1463
    @return: a dictionary with keys the ssconf names and values their
1464
        associated value
1465

1466
    """
1467
    fn = "\n".join
1468
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1469
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1470
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1471
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1472
                    for ninfo in node_info]
1473
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1474
                    for ninfo in node_info]
1475

    
1476
    instance_data = fn(instance_names)
1477
    off_data = fn(node.name for node in node_info if node.offline)
1478
    on_data = fn(node.name for node in node_info if not node.offline)
1479
    mc_data = fn(node.name for node in node_info if node.master_candidate)
1480
    mc_ips_data = fn(node.primary_ip for node in node_info
1481
                     if node.master_candidate)
1482
    node_data = fn(node_names)
1483
    node_pri_ips_data = fn(node_pri_ips)
1484
    node_snd_ips_data = fn(node_snd_ips)
1485

    
1486
    cluster = self._config_data.cluster
1487
    cluster_tags = fn(cluster.GetTags())
1488

    
1489
    hypervisor_list = fn(cluster.enabled_hypervisors)
1490

    
1491
    uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1492

    
1493
    nodegroups = ["%s %s" % (nodegroup.uuid, nodegroup.name) for nodegroup in
1494
                  self._config_data.nodegroups.values()]
1495
    nodegroups_data = fn(utils.NiceSort(nodegroups))
1496

    
1497
    return {
1498
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
1499
      constants.SS_CLUSTER_TAGS: cluster_tags,
1500
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1501
      constants.SS_MASTER_CANDIDATES: mc_data,
1502
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1503
      constants.SS_MASTER_IP: cluster.master_ip,
1504
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
1505
      constants.SS_MASTER_NODE: cluster.master_node,
1506
      constants.SS_NODE_LIST: node_data,
1507
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1508
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1509
      constants.SS_OFFLINE_NODES: off_data,
1510
      constants.SS_ONLINE_NODES: on_data,
1511
      constants.SS_PRIMARY_IP_FAMILY: str(cluster.primary_ip_family),
1512
      constants.SS_INSTANCE_LIST: instance_data,
1513
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1514
      constants.SS_HYPERVISOR_LIST: hypervisor_list,
1515
      constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1516
      constants.SS_UID_POOL: uid_pool,
1517
      constants.SS_NODEGROUPS: nodegroups_data,
1518
      }
1519

    
1520
  @locking.ssynchronized(_config_lock, shared=1)
1521
  def GetSsconfValues(self):
1522
    """Wrapper using lock around _UnlockedGetSsconf().
1523

1524
    """
1525
    return self._UnlockedGetSsconfValues()
1526

    
1527
  @locking.ssynchronized(_config_lock, shared=1)
1528
  def GetVGName(self):
1529
    """Return the volume group name.
1530

1531
    """
1532
    return self._config_data.cluster.volume_group_name
1533

    
1534
  @locking.ssynchronized(_config_lock)
1535
  def SetVGName(self, vg_name):
1536
    """Set the volume group name.
1537

1538
    """
1539
    self._config_data.cluster.volume_group_name = vg_name
1540
    self._config_data.cluster.serial_no += 1
1541
    self._WriteConfig()
1542

    
1543
  @locking.ssynchronized(_config_lock, shared=1)
1544
  def GetDRBDHelper(self):
1545
    """Return DRBD usermode helper.
1546

1547
    """
1548
    return self._config_data.cluster.drbd_usermode_helper
1549

    
1550
  @locking.ssynchronized(_config_lock)
1551
  def SetDRBDHelper(self, drbd_helper):
1552
    """Set DRBD usermode helper.
1553

1554
    """
1555
    self._config_data.cluster.drbd_usermode_helper = drbd_helper
1556
    self._config_data.cluster.serial_no += 1
1557
    self._WriteConfig()
1558

    
1559
  @locking.ssynchronized(_config_lock, shared=1)
1560
  def GetMACPrefix(self):
1561
    """Return the mac prefix.
1562

1563
    """
1564
    return self._config_data.cluster.mac_prefix
1565

    
1566
  @locking.ssynchronized(_config_lock, shared=1)
1567
  def GetClusterInfo(self):
1568
    """Returns information about the cluster
1569

1570
    @rtype: L{objects.Cluster}
1571
    @return: the cluster object
1572

1573
    """
1574
    return self._config_data.cluster
1575

    
1576
  @locking.ssynchronized(_config_lock, shared=1)
1577
  def HasAnyDiskOfType(self, dev_type):
1578
    """Check if in there is at disk of the given type in the configuration.
1579

1580
    """
1581
    return self._config_data.HasAnyDiskOfType(dev_type)
1582

    
1583
  @locking.ssynchronized(_config_lock)
1584
  def Update(self, target, feedback_fn):
1585
    """Notify function to be called after updates.
1586

1587
    This function must be called when an object (as returned by
1588
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1589
    caller wants the modifications saved to the backing store. Note
1590
    that all modified objects will be saved, but the target argument
1591
    is the one the caller wants to ensure that it's saved.
1592

1593
    @param target: an instance of either L{objects.Cluster},
1594
        L{objects.Node} or L{objects.Instance} which is existing in
1595
        the cluster
1596
    @param feedback_fn: Callable feedback function
1597

1598
    """
1599
    if self._config_data is None:
1600
      raise errors.ProgrammerError("Configuration file not read,"
1601
                                   " cannot save.")
1602
    update_serial = False
1603
    if isinstance(target, objects.Cluster):
1604
      test = target == self._config_data.cluster
1605
    elif isinstance(target, objects.Node):
1606
      test = target in self._config_data.nodes.values()
1607
      update_serial = True
1608
    elif isinstance(target, objects.Instance):
1609
      test = target in self._config_data.instances.values()
1610
    else:
1611
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
1612
                                   " ConfigWriter.Update" % type(target))
1613
    if not test:
1614
      raise errors.ConfigurationError("Configuration updated since object"
1615
                                      " has been read or unknown object")
1616
    target.serial_no += 1
1617
    target.mtime = now = time.time()
1618

    
1619
    if update_serial:
1620
      # for node updates, we need to increase the cluster serial too
1621
      self._config_data.cluster.serial_no += 1
1622
      self._config_data.cluster.mtime = now
1623

    
1624
    if isinstance(target, objects.Instance):
1625
      self._UnlockedReleaseDRBDMinors(target.name)
1626

    
1627
    self._WriteConfig(feedback_fn=feedback_fn)
1628

    
1629
  @locking.ssynchronized(_config_lock)
1630
  def DropECReservations(self, ec_id):
1631
    """Drop per-execution-context reservations
1632

1633
    """
1634
    for rm in self._all_rms:
1635
      rm.DropECReservations(ec_id)