Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ 0fbae49a

History | View | Annotate | Download (47.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
import os
35
import random
36
import logging
37
import time
38

    
39
from ganeti import errors
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import rpc
44
from ganeti import objects
45
from ganeti import serializer
46
from ganeti import uidpool
47

    
48

    
49
_config_lock = locking.SharedLock()
50

    
51
# job id used for resource management at config upgrade time
52
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
53

    
54

    
55
def _ValidateConfig(data):
56
  """Verifies that a configuration objects looks valid.
57

58
  This only verifies the version of the configuration.
59

60
  @raise errors.ConfigurationError: if the version differs from what
61
      we expect
62

63
  """
64
  if data.version != constants.CONFIG_VERSION:
65
    raise errors.ConfigurationError("Cluster configuration version"
66
                                    " mismatch, got %s instead of %s" %
67
                                    (data.version,
68
                                     constants.CONFIG_VERSION))
69

    
70

    
71
class TemporaryReservationManager:
72
  """A temporary resource reservation manager.
73

74
  This is used to reserve resources in a job, before using them, making sure
75
  other jobs cannot get them in the meantime.
76

77
  """
78
  def __init__(self):
79
    self._ec_reserved = {}
80

    
81
  def Reserved(self, resource):
82
    for holder_reserved in self._ec_reserved.items():
83
      if resource in holder_reserved:
84
        return True
85
    return False
86

    
87
  def Reserve(self, ec_id, resource):
88
    if self.Reserved(resource):
89
      raise errors.ReservationError("Duplicate reservation for resource: %s." %
90
                                    (resource))
91
    if ec_id not in self._ec_reserved:
92
      self._ec_reserved[ec_id] = set([resource])
93
    else:
94
      self._ec_reserved[ec_id].add(resource)
95

    
96
  def DropECReservations(self, ec_id):
97
    if ec_id in self._ec_reserved:
98
      del self._ec_reserved[ec_id]
99

    
100
  def GetReserved(self):
101
    all_reserved = set()
102
    for holder_reserved in self._ec_reserved.values():
103
      all_reserved.update(holder_reserved)
104
    return all_reserved
105

    
106
  def Generate(self, existing, generate_one_fn, ec_id):
107
    """Generate a new resource of this type
108

109
    """
110
    assert callable(generate_one_fn)
111

    
112
    all_elems = self.GetReserved()
113
    all_elems.update(existing)
114
    retries = 64
115
    while retries > 0:
116
      new_resource = generate_one_fn()
117
      if new_resource is not None and new_resource not in all_elems:
118
        break
119
    else:
120
      raise errors.ConfigurationError("Not able generate new resource"
121
                                      " (last tried: %s)" % new_resource)
122
    self.Reserve(ec_id, new_resource)
123
    return new_resource
124

    
125

    
126
class ConfigWriter:
127
  """The interface to the cluster configuration.
128

129
  @ivar _temporary_lvs: reservation manager for temporary LVs
130
  @ivar _all_rms: a list of all temporary reservation managers
131

132
  """
133
  def __init__(self, cfg_file=None, offline=False):
134
    self.write_count = 0
135
    self._lock = _config_lock
136
    self._config_data = None
137
    self._offline = offline
138
    if cfg_file is None:
139
      self._cfg_file = constants.CLUSTER_CONF_FILE
140
    else:
141
      self._cfg_file = cfg_file
142
    self._temporary_ids = TemporaryReservationManager()
143
    self._temporary_drbds = {}
144
    self._temporary_macs = TemporaryReservationManager()
145
    self._temporary_secrets = TemporaryReservationManager()
146
    self._temporary_lvs = TemporaryReservationManager()
147
    self._all_rms = [self._temporary_ids, self._temporary_macs,
148
                     self._temporary_secrets, self._temporary_lvs]
149
    # Note: in order to prevent errors when resolving our name in
150
    # _DistributeConfig, we compute it here once and reuse it; it's
151
    # better to raise an error before starting to modify the config
152
    # file than after it was modified
153
    self._my_hostname = utils.HostInfo().name
154
    self._last_cluster_serial = -1
155
    self._OpenConfig()
156

    
157
  # this method needs to be static, so that we can call it on the class
158
  @staticmethod
159
  def IsCluster():
160
    """Check if the cluster is configured.
161

162
    """
163
    return os.path.exists(constants.CLUSTER_CONF_FILE)
164

    
165
  def _GenerateOneMAC(self):
166
    """Generate one mac address
167

168
    """
169
    prefix = self._config_data.cluster.mac_prefix
170
    byte1 = random.randrange(0, 256)
171
    byte2 = random.randrange(0, 256)
172
    byte3 = random.randrange(0, 256)
173
    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
174
    return mac
175

    
176
  @locking.ssynchronized(_config_lock, shared=1)
177
  def GenerateMAC(self, ec_id):
178
    """Generate a MAC for an instance.
179

180
    This should check the current instances for duplicates.
181

182
    """
183
    existing = self._AllMACs()
184
    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
185

    
186
  @locking.ssynchronized(_config_lock, shared=1)
187
  def ReserveMAC(self, mac, ec_id):
188
    """Reserve a MAC for an instance.
189

190
    This only checks instances managed by this cluster, it does not
191
    check for potential collisions elsewhere.
192

193
    """
194
    all_macs = self._AllMACs()
195
    if mac in all_macs:
196
      raise errors.ReservationError("mac already in use")
197
    else:
198
      self._temporary_macs.Reserve(mac, ec_id)
199

    
200
  @locking.ssynchronized(_config_lock, shared=1)
201
  def ReserveLV(self, lv_name, ec_id):
202
    """Reserve an VG/LV pair for an instance.
203

204
    @type lv_name: string
205
    @param lv_name: the logical volume name to reserve
206

207
    """
208
    all_lvs = self._AllLVs()
209
    if lv_name in all_lvs:
210
      raise errors.ReservationError("LV already in use")
211
    else:
212
      self._temporary_lvs.Reserve(lv_name, ec_id)
213

    
214
  @locking.ssynchronized(_config_lock, shared=1)
215
  def GenerateDRBDSecret(self, ec_id):
216
    """Generate a DRBD secret.
217

218
    This checks the current disks for duplicates.
219

220
    """
221
    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
222
                                            utils.GenerateSecret,
223
                                            ec_id)
224

    
225
  def _AllLVs(self):
226
    """Compute the list of all LVs.
227

228
    """
229
    lvnames = set()
230
    for instance in self._config_data.instances.values():
231
      node_data = instance.MapLVsByNode()
232
      for lv_list in node_data.values():
233
        lvnames.update(lv_list)
234
    return lvnames
235

    
236
  def _AllIDs(self, include_temporary):
237
    """Compute the list of all UUIDs and names we have.
238

239
    @type include_temporary: boolean
240
    @param include_temporary: whether to include the _temporary_ids set
241
    @rtype: set
242
    @return: a set of IDs
243

244
    """
245
    existing = set()
246
    if include_temporary:
247
      existing.update(self._temporary_ids.GetReserved())
248
    existing.update(self._AllLVs())
249
    existing.update(self._config_data.instances.keys())
250
    existing.update(self._config_data.nodes.keys())
251
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
252
    return existing
253

    
254
  def _GenerateUniqueID(self, ec_id):
255
    """Generate an unique UUID.
256

257
    This checks the current node, instances and disk names for
258
    duplicates.
259

260
    @rtype: string
261
    @return: the unique id
262

263
    """
264
    existing = self._AllIDs(include_temporary=False)
265
    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
266

    
267
  @locking.ssynchronized(_config_lock, shared=1)
268
  def GenerateUniqueID(self, ec_id):
269
    """Generate an unique ID.
270

271
    This is just a wrapper over the unlocked version.
272

273
    @type ec_id: string
274
    @param ec_id: unique id for the job to reserve the id to
275

276
    """
277
    return self._GenerateUniqueID(ec_id)
278

    
279
  def _AllMACs(self):
280
    """Return all MACs present in the config.
281

282
    @rtype: list
283
    @return: the list of all MACs
284

285
    """
286
    result = []
287
    for instance in self._config_data.instances.values():
288
      for nic in instance.nics:
289
        result.append(nic.mac)
290

    
291
    return result
292

    
293
  def _AllDRBDSecrets(self):
294
    """Return all DRBD secrets present in the config.
295

296
    @rtype: list
297
    @return: the list of all DRBD secrets
298

299
    """
300
    def helper(disk, result):
301
      """Recursively gather secrets from this disk."""
302
      if disk.dev_type == constants.DT_DRBD8:
303
        result.append(disk.logical_id[5])
304
      if disk.children:
305
        for child in disk.children:
306
          helper(child, result)
307

    
308
    result = []
309
    for instance in self._config_data.instances.values():
310
      for disk in instance.disks:
311
        helper(disk, result)
312

    
313
    return result
314

    
315
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
316
    """Compute duplicate disk IDs
317

318
    @type disk: L{objects.Disk}
319
    @param disk: the disk at which to start searching
320
    @type l_ids: list
321
    @param l_ids: list of current logical ids
322
    @type p_ids: list
323
    @param p_ids: list of current physical ids
324
    @rtype: list
325
    @return: a list of error messages
326

327
    """
328
    result = []
329
    if disk.logical_id is not None:
330
      if disk.logical_id in l_ids:
331
        result.append("duplicate logical id %s" % str(disk.logical_id))
332
      else:
333
        l_ids.append(disk.logical_id)
334
    if disk.physical_id is not None:
335
      if disk.physical_id in p_ids:
336
        result.append("duplicate physical id %s" % str(disk.physical_id))
337
      else:
338
        p_ids.append(disk.physical_id)
339

    
340
    if disk.children:
341
      for child in disk.children:
342
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
343
    return result
344

    
345
  def _UnlockedVerifyConfig(self):
346
    """Verify function.
347

348
    @rtype: list
349
    @return: a list of error messages; a non-empty list signifies
350
        configuration errors
351

352
    """
353
    result = []
354
    seen_macs = []
355
    ports = {}
356
    data = self._config_data
357
    seen_lids = []
358
    seen_pids = []
359

    
360
    # global cluster checks
361
    if not data.cluster.enabled_hypervisors:
362
      result.append("enabled hypervisors list doesn't have any entries")
363
    invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
364
    if invalid_hvs:
365
      result.append("enabled hypervisors contains invalid entries: %s" %
366
                    invalid_hvs)
367

    
368
    if data.cluster.master_node not in data.nodes:
369
      result.append("cluster has invalid primary node '%s'" %
370
                    data.cluster.master_node)
371

    
372
    # per-instance checks
373
    for instance_name in data.instances:
374
      instance = data.instances[instance_name]
375
      if instance.name != instance_name:
376
        result.append("instance '%s' is indexed by wrong name '%s'" %
377
                      (instance.name, instance_name))
378
      if instance.primary_node not in data.nodes:
379
        result.append("instance '%s' has invalid primary node '%s'" %
380
                      (instance_name, instance.primary_node))
381
      for snode in instance.secondary_nodes:
382
        if snode not in data.nodes:
383
          result.append("instance '%s' has invalid secondary node '%s'" %
384
                        (instance_name, snode))
385
      for idx, nic in enumerate(instance.nics):
386
        if nic.mac in seen_macs:
387
          result.append("instance '%s' has NIC %d mac %s duplicate" %
388
                        (instance_name, idx, nic.mac))
389
        else:
390
          seen_macs.append(nic.mac)
391

    
392
      # gather the drbd ports for duplicate checks
393
      for dsk in instance.disks:
394
        if dsk.dev_type in constants.LDS_DRBD:
395
          tcp_port = dsk.logical_id[2]
396
          if tcp_port not in ports:
397
            ports[tcp_port] = []
398
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
399
      # gather network port reservation
400
      net_port = getattr(instance, "network_port", None)
401
      if net_port is not None:
402
        if net_port not in ports:
403
          ports[net_port] = []
404
        ports[net_port].append((instance.name, "network port"))
405

    
406
      # instance disk verify
407
      for idx, disk in enumerate(instance.disks):
408
        result.extend(["instance '%s' disk %d error: %s" %
409
                       (instance.name, idx, msg) for msg in disk.Verify()])
410
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
411

    
412
    # cluster-wide pool of free ports
413
    for free_port in data.cluster.tcpudp_port_pool:
414
      if free_port not in ports:
415
        ports[free_port] = []
416
      ports[free_port].append(("cluster", "port marked as free"))
417

    
418
    # compute tcp/udp duplicate ports
419
    keys = ports.keys()
420
    keys.sort()
421
    for pnum in keys:
422
      pdata = ports[pnum]
423
      if len(pdata) > 1:
424
        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
425
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
426

    
427
    # highest used tcp port check
428
    if keys:
429
      if keys[-1] > data.cluster.highest_used_port:
430
        result.append("Highest used port mismatch, saved %s, computed %s" %
431
                      (data.cluster.highest_used_port, keys[-1]))
432

    
433
    if not data.nodes[data.cluster.master_node].master_candidate:
434
      result.append("Master node is not a master candidate")
435

    
436
    # master candidate checks
437
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
438
    if mc_now < mc_max:
439
      result.append("Not enough master candidates: actual %d, target %d" %
440
                    (mc_now, mc_max))
441

    
442
    # node checks
443
    for node_name, node in data.nodes.items():
444
      if node.name != node_name:
445
        result.append("Node '%s' is indexed by wrong name '%s'" %
446
                      (node.name, node_name))
447
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
448
        result.append("Node %s state is invalid: master_candidate=%s,"
449
                      " drain=%s, offline=%s" %
450
                      (node.name, node.master_candidate, node.drain,
451
                       node.offline))
452

    
453
    # drbd minors check
454
    _, duplicates = self._UnlockedComputeDRBDMap()
455
    for node, minor, instance_a, instance_b in duplicates:
456
      result.append("DRBD minor %d on node %s is assigned twice to instances"
457
                    " %s and %s" % (minor, node, instance_a, instance_b))
458

    
459
    # IP checks
460
    default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
461
    ips = {}
462

    
463
    def _AddIpAddress(ip, name):
464
      ips.setdefault(ip, []).append(name)
465

    
466
    _AddIpAddress(data.cluster.master_ip, "cluster_ip")
467

    
468
    for node in data.nodes.values():
469
      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
470
      if node.secondary_ip != node.primary_ip:
471
        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
472

    
473
    for instance in data.instances.values():
474
      for idx, nic in enumerate(instance.nics):
475
        if nic.ip is None:
476
          continue
477

    
478
        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
479
        nic_mode = nicparams[constants.NIC_MODE]
480
        nic_link = nicparams[constants.NIC_LINK]
481

    
482
        if nic_mode == constants.NIC_MODE_BRIDGED:
483
          link = "bridge:%s" % nic_link
484
        elif nic_mode == constants.NIC_MODE_ROUTED:
485
          link = "route:%s" % nic_link
486
        else:
487
          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
488

    
489
        _AddIpAddress("%s/%s" % (link, nic.ip),
490
                      "instance:%s/nic:%d" % (instance.name, idx))
491

    
492
    for ip, owners in ips.items():
493
      if len(owners) > 1:
494
        result.append("IP address %s is used by multiple owners: %s" %
495
                      (ip, utils.CommaJoin(owners)))
496

    
497
    return result
498

    
499
  @locking.ssynchronized(_config_lock, shared=1)
500
  def VerifyConfig(self):
501
    """Verify function.
502

503
    This is just a wrapper over L{_UnlockedVerifyConfig}.
504

505
    @rtype: list
506
    @return: a list of error messages; a non-empty list signifies
507
        configuration errors
508

509
    """
510
    return self._UnlockedVerifyConfig()
511

    
512
  def _UnlockedSetDiskID(self, disk, node_name):
513
    """Convert the unique ID to the ID needed on the target nodes.
514

515
    This is used only for drbd, which needs ip/port configuration.
516

517
    The routine descends down and updates its children also, because
518
    this helps when the only the top device is passed to the remote
519
    node.
520

521
    This function is for internal use, when the config lock is already held.
522

523
    """
524
    if disk.children:
525
      for child in disk.children:
526
        self._UnlockedSetDiskID(child, node_name)
527

    
528
    if disk.logical_id is None and disk.physical_id is not None:
529
      return
530
    if disk.dev_type == constants.LD_DRBD8:
531
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
532
      if node_name not in (pnode, snode):
533
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
534
                                        node_name)
535
      pnode_info = self._UnlockedGetNodeInfo(pnode)
536
      snode_info = self._UnlockedGetNodeInfo(snode)
537
      if pnode_info is None or snode_info is None:
538
        raise errors.ConfigurationError("Can't find primary or secondary node"
539
                                        " for %s" % str(disk))
540
      p_data = (pnode_info.secondary_ip, port)
541
      s_data = (snode_info.secondary_ip, port)
542
      if pnode == node_name:
543
        disk.physical_id = p_data + s_data + (pminor, secret)
544
      else: # it must be secondary, we tested above
545
        disk.physical_id = s_data + p_data + (sminor, secret)
546
    else:
547
      disk.physical_id = disk.logical_id
548
    return
549

    
550
  @locking.ssynchronized(_config_lock)
551
  def SetDiskID(self, disk, node_name):
552
    """Convert the unique ID to the ID needed on the target nodes.
553

554
    This is used only for drbd, which needs ip/port configuration.
555

556
    The routine descends down and updates its children also, because
557
    this helps when the only the top device is passed to the remote
558
    node.
559

560
    """
561
    return self._UnlockedSetDiskID(disk, node_name)
562

    
563
  @locking.ssynchronized(_config_lock)
564
  def AddTcpUdpPort(self, port):
565
    """Adds a new port to the available port pool.
566

567
    """
568
    if not isinstance(port, int):
569
      raise errors.ProgrammerError("Invalid type passed for port")
570

    
571
    self._config_data.cluster.tcpudp_port_pool.add(port)
572
    self._WriteConfig()
573

    
574
  @locking.ssynchronized(_config_lock, shared=1)
575
  def GetPortList(self):
576
    """Returns a copy of the current port list.
577

578
    """
579
    return self._config_data.cluster.tcpudp_port_pool.copy()
580

    
581
  @locking.ssynchronized(_config_lock)
582
  def AllocatePort(self):
583
    """Allocate a port.
584

585
    The port will be taken from the available port pool or from the
586
    default port range (and in this case we increase
587
    highest_used_port).
588

589
    """
590
    # If there are TCP/IP ports configured, we use them first.
591
    if self._config_data.cluster.tcpudp_port_pool:
592
      port = self._config_data.cluster.tcpudp_port_pool.pop()
593
    else:
594
      port = self._config_data.cluster.highest_used_port + 1
595
      if port >= constants.LAST_DRBD_PORT:
596
        raise errors.ConfigurationError("The highest used port is greater"
597
                                        " than %s. Aborting." %
598
                                        constants.LAST_DRBD_PORT)
599
      self._config_data.cluster.highest_used_port = port
600

    
601
    self._WriteConfig()
602
    return port
603

    
604
  def _UnlockedComputeDRBDMap(self):
605
    """Compute the used DRBD minor/nodes.
606

607
    @rtype: (dict, list)
608
    @return: dictionary of node_name: dict of minor: instance_name;
609
        the returned dict will have all the nodes in it (even if with
610
        an empty list), and a list of duplicates; if the duplicates
611
        list is not empty, the configuration is corrupted and its caller
612
        should raise an exception
613

614
    """
615
    def _AppendUsedPorts(instance_name, disk, used):
616
      duplicates = []
617
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
618
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
619
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
620
          assert node in used, ("Node '%s' of instance '%s' not found"
621
                                " in node list" % (node, instance_name))
622
          if port in used[node]:
623
            duplicates.append((node, port, instance_name, used[node][port]))
624
          else:
625
            used[node][port] = instance_name
626
      if disk.children:
627
        for child in disk.children:
628
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
629
      return duplicates
630

    
631
    duplicates = []
632
    my_dict = dict((node, {}) for node in self._config_data.nodes)
633
    for instance in self._config_data.instances.itervalues():
634
      for disk in instance.disks:
635
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
636
    for (node, minor), instance in self._temporary_drbds.iteritems():
637
      if minor in my_dict[node] and my_dict[node][minor] != instance:
638
        duplicates.append((node, minor, instance, my_dict[node][minor]))
639
      else:
640
        my_dict[node][minor] = instance
641
    return my_dict, duplicates
642

    
643
  @locking.ssynchronized(_config_lock)
644
  def ComputeDRBDMap(self):
645
    """Compute the used DRBD minor/nodes.
646

647
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
648

649
    @return: dictionary of node_name: dict of minor: instance_name;
650
        the returned dict will have all the nodes in it (even if with
651
        an empty list).
652

653
    """
654
    d_map, duplicates = self._UnlockedComputeDRBDMap()
655
    if duplicates:
656
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
657
                                      str(duplicates))
658
    return d_map
659

    
660
  @locking.ssynchronized(_config_lock)
661
  def AllocateDRBDMinor(self, nodes, instance):
662
    """Allocate a drbd minor.
663

664
    The free minor will be automatically computed from the existing
665
    devices. A node can be given multiple times in order to allocate
666
    multiple minors. The result is the list of minors, in the same
667
    order as the passed nodes.
668

669
    @type instance: string
670
    @param instance: the instance for which we allocate minors
671

672
    """
673
    assert isinstance(instance, basestring), \
674
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
675

    
676
    d_map, duplicates = self._UnlockedComputeDRBDMap()
677
    if duplicates:
678
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
679
                                      str(duplicates))
680
    result = []
681
    for nname in nodes:
682
      ndata = d_map[nname]
683
      if not ndata:
684
        # no minors used, we can start at 0
685
        result.append(0)
686
        ndata[0] = instance
687
        self._temporary_drbds[(nname, 0)] = instance
688
        continue
689
      keys = ndata.keys()
690
      keys.sort()
691
      ffree = utils.FirstFree(keys)
692
      if ffree is None:
693
        # return the next minor
694
        # TODO: implement high-limit check
695
        minor = keys[-1] + 1
696
      else:
697
        minor = ffree
698
      # double-check minor against current instances
699
      assert minor not in d_map[nname], \
700
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
701
              " already allocated to instance %s" %
702
              (minor, nname, d_map[nname][minor]))
703
      ndata[minor] = instance
704
      # double-check minor against reservation
705
      r_key = (nname, minor)
706
      assert r_key not in self._temporary_drbds, \
707
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
708
              " reserved for instance %s" %
709
              (minor, nname, self._temporary_drbds[r_key]))
710
      self._temporary_drbds[r_key] = instance
711
      result.append(minor)
712
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
713
                  nodes, result)
714
    return result
715

    
716
  def _UnlockedReleaseDRBDMinors(self, instance):
717
    """Release temporary drbd minors allocated for a given instance.
718

719
    @type instance: string
720
    @param instance: the instance for which temporary minors should be
721
                     released
722

723
    """
724
    assert isinstance(instance, basestring), \
725
           "Invalid argument passed to ReleaseDRBDMinors"
726
    for key, name in self._temporary_drbds.items():
727
      if name == instance:
728
        del self._temporary_drbds[key]
729

    
730
  @locking.ssynchronized(_config_lock)
731
  def ReleaseDRBDMinors(self, instance):
732
    """Release temporary drbd minors allocated for a given instance.
733

734
    This should be called on the error paths, on the success paths
735
    it's automatically called by the ConfigWriter add and update
736
    functions.
737

738
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
739

740
    @type instance: string
741
    @param instance: the instance for which temporary minors should be
742
                     released
743

744
    """
745
    self._UnlockedReleaseDRBDMinors(instance)
746

    
747
  @locking.ssynchronized(_config_lock, shared=1)
748
  def GetConfigVersion(self):
749
    """Get the configuration version.
750

751
    @return: Config version
752

753
    """
754
    return self._config_data.version
755

    
756
  @locking.ssynchronized(_config_lock, shared=1)
757
  def GetClusterName(self):
758
    """Get cluster name.
759

760
    @return: Cluster name
761

762
    """
763
    return self._config_data.cluster.cluster_name
764

    
765
  @locking.ssynchronized(_config_lock, shared=1)
766
  def GetMasterNode(self):
767
    """Get the hostname of the master node for this cluster.
768

769
    @return: Master hostname
770

771
    """
772
    return self._config_data.cluster.master_node
773

    
774
  @locking.ssynchronized(_config_lock, shared=1)
775
  def GetMasterIP(self):
776
    """Get the IP of the master node for this cluster.
777

778
    @return: Master IP
779

780
    """
781
    return self._config_data.cluster.master_ip
782

    
783
  @locking.ssynchronized(_config_lock, shared=1)
784
  def GetMasterNetdev(self):
785
    """Get the master network device for this cluster.
786

787
    """
788
    return self._config_data.cluster.master_netdev
789

    
790
  @locking.ssynchronized(_config_lock, shared=1)
791
  def GetFileStorageDir(self):
792
    """Get the file storage dir for this cluster.
793

794
    """
795
    return self._config_data.cluster.file_storage_dir
796

    
797
  @locking.ssynchronized(_config_lock, shared=1)
798
  def GetHypervisorType(self):
799
    """Get the hypervisor type for this cluster.
800

801
    """
802
    return self._config_data.cluster.enabled_hypervisors[0]
803

    
804
  @locking.ssynchronized(_config_lock, shared=1)
805
  def GetHostKey(self):
806
    """Return the rsa hostkey from the config.
807

808
    @rtype: string
809
    @return: the rsa hostkey
810

811
    """
812
    return self._config_data.cluster.rsahostkeypub
813

    
814
  @locking.ssynchronized(_config_lock)
815
  def AddInstance(self, instance, ec_id):
816
    """Add an instance to the config.
817

818
    This should be used after creating a new instance.
819

820
    @type instance: L{objects.Instance}
821
    @param instance: the instance object
822

823
    """
824
    if not isinstance(instance, objects.Instance):
825
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
826

    
827
    if instance.disk_template != constants.DT_DISKLESS:
828
      all_lvs = instance.MapLVsByNode()
829
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
830

    
831
    all_macs = self._AllMACs()
832
    for nic in instance.nics:
833
      if nic.mac in all_macs:
834
        raise errors.ConfigurationError("Cannot add instance %s:"
835
                                        " MAC address '%s' already in use." %
836
                                        (instance.name, nic.mac))
837

    
838
    self._EnsureUUID(instance, ec_id)
839

    
840
    instance.serial_no = 1
841
    instance.ctime = instance.mtime = time.time()
842
    self._config_data.instances[instance.name] = instance
843
    self._config_data.cluster.serial_no += 1
844
    self._UnlockedReleaseDRBDMinors(instance.name)
845
    self._WriteConfig()
846

    
847
  def _EnsureUUID(self, item, ec_id):
848
    """Ensures a given object has a valid UUID.
849

850
    @param item: the instance or node to be checked
851
    @param ec_id: the execution context id for the uuid reservation
852

853
    """
854
    if not item.uuid:
855
      item.uuid = self._GenerateUniqueID(ec_id)
856
    elif item.uuid in self._AllIDs(include_temporary=True):
857
      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
858
                                      " in use" % (item.name, item.uuid))
859

    
860
  def _SetInstanceStatus(self, instance_name, status):
861
    """Set the instance's status to a given value.
862

863
    """
864
    assert isinstance(status, bool), \
865
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
866

    
867
    if instance_name not in self._config_data.instances:
868
      raise errors.ConfigurationError("Unknown instance '%s'" %
869
                                      instance_name)
870
    instance = self._config_data.instances[instance_name]
871
    if instance.admin_up != status:
872
      instance.admin_up = status
873
      instance.serial_no += 1
874
      instance.mtime = time.time()
875
      self._WriteConfig()
876

    
877
  @locking.ssynchronized(_config_lock)
878
  def MarkInstanceUp(self, instance_name):
879
    """Mark the instance status to up in the config.
880

881
    """
882
    self._SetInstanceStatus(instance_name, True)
883

    
884
  @locking.ssynchronized(_config_lock)
885
  def RemoveInstance(self, instance_name):
886
    """Remove the instance from the configuration.
887

888
    """
889
    if instance_name not in self._config_data.instances:
890
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
891
    del self._config_data.instances[instance_name]
892
    self._config_data.cluster.serial_no += 1
893
    self._WriteConfig()
894

    
895
  @locking.ssynchronized(_config_lock)
896
  def RenameInstance(self, old_name, new_name):
897
    """Rename an instance.
898

899
    This needs to be done in ConfigWriter and not by RemoveInstance
900
    combined with AddInstance as only we can guarantee an atomic
901
    rename.
902

903
    """
904
    if old_name not in self._config_data.instances:
905
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
906
    inst = self._config_data.instances[old_name]
907
    del self._config_data.instances[old_name]
908
    inst.name = new_name
909

    
910
    for disk in inst.disks:
911
      if disk.dev_type == constants.LD_FILE:
912
        # rename the file paths in logical and physical id
913
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
914
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
915
                                              utils.PathJoin(file_storage_dir,
916
                                                             inst.name,
917
                                                             disk.iv_name))
918

    
919
    self._config_data.instances[inst.name] = inst
920
    self._WriteConfig()
921

    
922
  @locking.ssynchronized(_config_lock)
923
  def MarkInstanceDown(self, instance_name):
924
    """Mark the status of an instance to down in the configuration.
925

926
    """
927
    self._SetInstanceStatus(instance_name, False)
928

    
929
  def _UnlockedGetInstanceList(self):
930
    """Get the list of instances.
931

932
    This function is for internal use, when the config lock is already held.
933

934
    """
935
    return self._config_data.instances.keys()
936

    
937
  @locking.ssynchronized(_config_lock, shared=1)
938
  def GetInstanceList(self):
939
    """Get the list of instances.
940

941
    @return: array of instances, ex. ['instance2.example.com',
942
        'instance1.example.com']
943

944
    """
945
    return self._UnlockedGetInstanceList()
946

    
947
  @locking.ssynchronized(_config_lock, shared=1)
948
  def ExpandInstanceName(self, short_name):
949
    """Attempt to expand an incomplete instance name.
950

951
    """
952
    return utils.MatchNameComponent(short_name,
953
                                    self._config_data.instances.keys(),
954
                                    case_sensitive=False)
955

    
956
  def _UnlockedGetInstanceInfo(self, instance_name):
957
    """Returns information about an instance.
958

959
    This function is for internal use, when the config lock is already held.
960

961
    """
962
    if instance_name not in self._config_data.instances:
963
      return None
964

    
965
    return self._config_data.instances[instance_name]
966

    
967
  @locking.ssynchronized(_config_lock, shared=1)
968
  def GetInstanceInfo(self, instance_name):
969
    """Returns information about an instance.
970

971
    It takes the information from the configuration file. Other information of
972
    an instance are taken from the live systems.
973

974
    @param instance_name: name of the instance, e.g.
975
        I{instance1.example.com}
976

977
    @rtype: L{objects.Instance}
978
    @return: the instance object
979

980
    """
981
    return self._UnlockedGetInstanceInfo(instance_name)
982

    
983
  @locking.ssynchronized(_config_lock, shared=1)
984
  def GetAllInstancesInfo(self):
985
    """Get the configuration of all instances.
986

987
    @rtype: dict
988
    @return: dict of (instance, instance_info), where instance_info is what
989
              would GetInstanceInfo return for the node
990

991
    """
992
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
993
                    for instance in self._UnlockedGetInstanceList()])
994
    return my_dict
995

    
996
  @locking.ssynchronized(_config_lock)
997
  def AddNode(self, node, ec_id):
998
    """Add a node to the configuration.
999

1000
    @type node: L{objects.Node}
1001
    @param node: a Node instance
1002

1003
    """
1004
    logging.info("Adding node %s to configuration", node.name)
1005

    
1006
    self._EnsureUUID(node, ec_id)
1007

    
1008
    node.serial_no = 1
1009
    node.ctime = node.mtime = time.time()
1010
    self._config_data.nodes[node.name] = node
1011
    self._config_data.cluster.serial_no += 1
1012
    self._WriteConfig()
1013

    
1014
  @locking.ssynchronized(_config_lock)
1015
  def RemoveNode(self, node_name):
1016
    """Remove a node from the configuration.
1017

1018
    """
1019
    logging.info("Removing node %s from configuration", node_name)
1020

    
1021
    if node_name not in self._config_data.nodes:
1022
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1023

    
1024
    del self._config_data.nodes[node_name]
1025
    self._config_data.cluster.serial_no += 1
1026
    self._WriteConfig()
1027

    
1028
  @locking.ssynchronized(_config_lock, shared=1)
1029
  def ExpandNodeName(self, short_name):
1030
    """Attempt to expand an incomplete instance name.
1031

1032
    """
1033
    return utils.MatchNameComponent(short_name,
1034
                                    self._config_data.nodes.keys(),
1035
                                    case_sensitive=False)
1036

    
1037
  def _UnlockedGetNodeInfo(self, node_name):
1038
    """Get the configuration of a node, as stored in the config.
1039

1040
    This function is for internal use, when the config lock is already
1041
    held.
1042

1043
    @param node_name: the node name, e.g. I{node1.example.com}
1044

1045
    @rtype: L{objects.Node}
1046
    @return: the node object
1047

1048
    """
1049
    if node_name not in self._config_data.nodes:
1050
      return None
1051

    
1052
    return self._config_data.nodes[node_name]
1053

    
1054
  @locking.ssynchronized(_config_lock, shared=1)
1055
  def GetNodeInfo(self, node_name):
1056
    """Get the configuration of a node, as stored in the config.
1057

1058
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1059

1060
    @param node_name: the node name, e.g. I{node1.example.com}
1061

1062
    @rtype: L{objects.Node}
1063
    @return: the node object
1064

1065
    """
1066
    return self._UnlockedGetNodeInfo(node_name)
1067

    
1068
  def _UnlockedGetNodeList(self):
1069
    """Return the list of nodes which are in the configuration.
1070

1071
    This function is for internal use, when the config lock is already
1072
    held.
1073

1074
    @rtype: list
1075

1076
    """
1077
    return self._config_data.nodes.keys()
1078

    
1079
  @locking.ssynchronized(_config_lock, shared=1)
1080
  def GetNodeList(self):
1081
    """Return the list of nodes which are in the configuration.
1082

1083
    """
1084
    return self._UnlockedGetNodeList()
1085

    
1086
  def _UnlockedGetOnlineNodeList(self):
1087
    """Return the list of nodes which are online.
1088

1089
    """
1090
    all_nodes = [self._UnlockedGetNodeInfo(node)
1091
                 for node in self._UnlockedGetNodeList()]
1092
    return [node.name for node in all_nodes if not node.offline]
1093

    
1094
  @locking.ssynchronized(_config_lock, shared=1)
1095
  def GetOnlineNodeList(self):
1096
    """Return the list of nodes which are online.
1097

1098
    """
1099
    return self._UnlockedGetOnlineNodeList()
1100

    
1101
  @locking.ssynchronized(_config_lock, shared=1)
1102
  def GetAllNodesInfo(self):
1103
    """Get the configuration of all nodes.
1104

1105
    @rtype: dict
1106
    @return: dict of (node, node_info), where node_info is what
1107
              would GetNodeInfo return for the node
1108

1109
    """
1110
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1111
                    for node in self._UnlockedGetNodeList()])
1112
    return my_dict
1113

    
1114
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1115
    """Get the number of current and maximum desired and possible candidates.
1116

1117
    @type exceptions: list
1118
    @param exceptions: if passed, list of nodes that should be ignored
1119
    @rtype: tuple
1120
    @return: tuple of (current, desired and possible, possible)
1121

1122
    """
1123
    mc_now = mc_should = mc_max = 0
1124
    for node in self._config_data.nodes.values():
1125
      if exceptions and node.name in exceptions:
1126
        continue
1127
      if not (node.offline or node.drained):
1128
        mc_max += 1
1129
      if node.master_candidate:
1130
        mc_now += 1
1131
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1132
    return (mc_now, mc_should, mc_max)
1133

    
1134
  @locking.ssynchronized(_config_lock, shared=1)
1135
  def GetMasterCandidateStats(self, exceptions=None):
1136
    """Get the number of current and maximum possible candidates.
1137

1138
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1139

1140
    @type exceptions: list
1141
    @param exceptions: if passed, list of nodes that should be ignored
1142
    @rtype: tuple
1143
    @return: tuple of (current, max)
1144

1145
    """
1146
    return self._UnlockedGetMasterCandidateStats(exceptions)
1147

    
1148
  @locking.ssynchronized(_config_lock)
1149
  def MaintainCandidatePool(self, exceptions):
1150
    """Try to grow the candidate pool to the desired size.
1151

1152
    @type exceptions: list
1153
    @param exceptions: if passed, list of nodes that should be ignored
1154
    @rtype: list
1155
    @return: list with the adjusted nodes (L{objects.Node} instances)
1156

1157
    """
1158
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1159
    mod_list = []
1160
    if mc_now < mc_max:
1161
      node_list = self._config_data.nodes.keys()
1162
      random.shuffle(node_list)
1163
      for name in node_list:
1164
        if mc_now >= mc_max:
1165
          break
1166
        node = self._config_data.nodes[name]
1167
        if (node.master_candidate or node.offline or node.drained or
1168
            node.name in exceptions):
1169
          continue
1170
        mod_list.append(node)
1171
        node.master_candidate = True
1172
        node.serial_no += 1
1173
        mc_now += 1
1174
      if mc_now != mc_max:
1175
        # this should not happen
1176
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1177
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1178
      if mod_list:
1179
        self._config_data.cluster.serial_no += 1
1180
        self._WriteConfig()
1181

    
1182
    return mod_list
1183

    
1184
  def _BumpSerialNo(self):
1185
    """Bump up the serial number of the config.
1186

1187
    """
1188
    self._config_data.serial_no += 1
1189
    self._config_data.mtime = time.time()
1190

    
1191
  def _AllUUIDObjects(self):
1192
    """Returns all objects with uuid attributes.
1193

1194
    """
1195
    return (self._config_data.instances.values() +
1196
            self._config_data.nodes.values() +
1197
            [self._config_data.cluster])
1198

    
1199
  def _OpenConfig(self):
1200
    """Read the config data from disk.
1201

1202
    """
1203
    raw_data = utils.ReadFile(self._cfg_file)
1204

    
1205
    try:
1206
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1207
    except Exception, err:
1208
      raise errors.ConfigurationError(err)
1209

    
1210
    # Make sure the configuration has the right version
1211
    _ValidateConfig(data)
1212

    
1213
    if (not hasattr(data, 'cluster') or
1214
        not hasattr(data.cluster, 'rsahostkeypub')):
1215
      raise errors.ConfigurationError("Incomplete configuration"
1216
                                      " (missing cluster.rsahostkeypub)")
1217

    
1218
    # Upgrade configuration if needed
1219
    data.UpgradeConfig()
1220

    
1221
    self._config_data = data
1222
    # reset the last serial as -1 so that the next write will cause
1223
    # ssconf update
1224
    self._last_cluster_serial = -1
1225

    
1226
    # And finally run our (custom) config upgrade sequence
1227
    self._UpgradeConfig()
1228

    
1229
  def _UpgradeConfig(self):
1230
    """Run upgrade steps that cannot be done purely in the objects.
1231

1232
    This is because some data elements need uniqueness across the
1233
    whole configuration, etc.
1234

1235
    @warning: this function will call L{_WriteConfig()}, so it needs
1236
        to either be called with the lock held or from a safe place
1237
        (the constructor)
1238

1239
    """
1240
    modified = False
1241
    for item in self._AllUUIDObjects():
1242
      if item.uuid is None:
1243
        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1244
        modified = True
1245
    if modified:
1246
      self._WriteConfig()
1247
      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1248
      # only called at config init time, without the lock held
1249
      self.DropECReservations(_UPGRADE_CONFIG_JID)
1250

    
1251
  def _DistributeConfig(self, feedback_fn):
1252
    """Distribute the configuration to the other nodes.
1253

1254
    Currently, this only copies the configuration file. In the future,
1255
    it could be used to encapsulate the 2/3-phase update mechanism.
1256

1257
    """
1258
    if self._offline:
1259
      return True
1260

    
1261
    bad = False
1262

    
1263
    node_list = []
1264
    addr_list = []
1265
    myhostname = self._my_hostname
1266
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1267
    # since the node list comes from _UnlocketGetNodeList, and we are
1268
    # called with the lock held, so no modifications should take place
1269
    # in between
1270
    for node_name in self._UnlockedGetNodeList():
1271
      if node_name == myhostname:
1272
        continue
1273
      node_info = self._UnlockedGetNodeInfo(node_name)
1274
      if not node_info.master_candidate:
1275
        continue
1276
      node_list.append(node_info.name)
1277
      addr_list.append(node_info.primary_ip)
1278

    
1279
    result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1280
                                            address_list=addr_list)
1281
    for to_node, to_result in result.items():
1282
      msg = to_result.fail_msg
1283
      if msg:
1284
        msg = ("Copy of file %s to node %s failed: %s" %
1285
               (self._cfg_file, to_node, msg))
1286
        logging.error(msg)
1287

    
1288
        if feedback_fn:
1289
          feedback_fn(msg)
1290

    
1291
        bad = True
1292

    
1293
    return not bad
1294

    
1295
  def _WriteConfig(self, destination=None, feedback_fn=None):
1296
    """Write the configuration data to persistent storage.
1297

1298
    """
1299
    assert feedback_fn is None or callable(feedback_fn)
1300

    
1301
    # Warn on config errors, but don't abort the save - the
1302
    # configuration has already been modified, and we can't revert;
1303
    # the best we can do is to warn the user and save as is, leaving
1304
    # recovery to the user
1305
    config_errors = self._UnlockedVerifyConfig()
1306
    if config_errors:
1307
      errmsg = ("Configuration data is not consistent: %s" %
1308
                (utils.CommaJoin(config_errors)))
1309
      logging.critical(errmsg)
1310
      if feedback_fn:
1311
        feedback_fn(errmsg)
1312

    
1313
    if destination is None:
1314
      destination = self._cfg_file
1315
    self._BumpSerialNo()
1316
    txt = serializer.Dump(self._config_data.ToDict())
1317

    
1318
    utils.WriteFile(destination, data=txt)
1319

    
1320
    self.write_count += 1
1321

    
1322
    # and redistribute the config file to master candidates
1323
    self._DistributeConfig(feedback_fn)
1324

    
1325
    # Write ssconf files on all nodes (including locally)
1326
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1327
      if not self._offline:
1328
        result = rpc.RpcRunner.call_write_ssconf_files(
1329
          self._UnlockedGetOnlineNodeList(),
1330
          self._UnlockedGetSsconfValues())
1331

    
1332
        for nname, nresu in result.items():
1333
          msg = nresu.fail_msg
1334
          if msg:
1335
            errmsg = ("Error while uploading ssconf files to"
1336
                      " node %s: %s" % (nname, msg))
1337
            logging.warning(errmsg)
1338

    
1339
            if feedback_fn:
1340
              feedback_fn(errmsg)
1341

    
1342
      self._last_cluster_serial = self._config_data.cluster.serial_no
1343

    
1344
  def _UnlockedGetSsconfValues(self):
1345
    """Return the values needed by ssconf.
1346

1347
    @rtype: dict
1348
    @return: a dictionary with keys the ssconf names and values their
1349
        associated value
1350

1351
    """
1352
    fn = "\n".join
1353
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1354
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1355
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1356
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1357
                    for ninfo in node_info]
1358
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1359
                    for ninfo in node_info]
1360

    
1361
    instance_data = fn(instance_names)
1362
    off_data = fn(node.name for node in node_info if node.offline)
1363
    on_data = fn(node.name for node in node_info if not node.offline)
1364
    mc_data = fn(node.name for node in node_info if node.master_candidate)
1365
    mc_ips_data = fn(node.primary_ip for node in node_info
1366
                     if node.master_candidate)
1367
    node_data = fn(node_names)
1368
    node_pri_ips_data = fn(node_pri_ips)
1369
    node_snd_ips_data = fn(node_snd_ips)
1370

    
1371
    cluster = self._config_data.cluster
1372
    cluster_tags = fn(cluster.GetTags())
1373

    
1374
    hypervisor_list = fn(cluster.enabled_hypervisors)
1375

    
1376
    uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1377

    
1378
    return {
1379
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
1380
      constants.SS_CLUSTER_TAGS: cluster_tags,
1381
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1382
      constants.SS_MASTER_CANDIDATES: mc_data,
1383
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1384
      constants.SS_MASTER_IP: cluster.master_ip,
1385
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
1386
      constants.SS_MASTER_NODE: cluster.master_node,
1387
      constants.SS_NODE_LIST: node_data,
1388
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1389
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1390
      constants.SS_OFFLINE_NODES: off_data,
1391
      constants.SS_ONLINE_NODES: on_data,
1392
      constants.SS_INSTANCE_LIST: instance_data,
1393
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1394
      constants.SS_HYPERVISOR_LIST: hypervisor_list,
1395
      constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1396
      constants.SS_UID_POOL: uid_pool,
1397
      }
1398

    
1399
  @locking.ssynchronized(_config_lock, shared=1)
1400
  def GetVGName(self):
1401
    """Return the volume group name.
1402

1403
    """
1404
    return self._config_data.cluster.volume_group_name
1405

    
1406
  @locking.ssynchronized(_config_lock)
1407
  def SetVGName(self, vg_name):
1408
    """Set the volume group name.
1409

1410
    """
1411
    self._config_data.cluster.volume_group_name = vg_name
1412
    self._config_data.cluster.serial_no += 1
1413
    self._WriteConfig()
1414

    
1415
  @locking.ssynchronized(_config_lock, shared=1)
1416
  def GetMACPrefix(self):
1417
    """Return the mac prefix.
1418

1419
    """
1420
    return self._config_data.cluster.mac_prefix
1421

    
1422
  @locking.ssynchronized(_config_lock, shared=1)
1423
  def GetClusterInfo(self):
1424
    """Returns information about the cluster
1425

1426
    @rtype: L{objects.Cluster}
1427
    @return: the cluster object
1428

1429
    """
1430
    return self._config_data.cluster
1431

    
1432
  @locking.ssynchronized(_config_lock)
1433
  def Update(self, target, feedback_fn):
1434
    """Notify function to be called after updates.
1435

1436
    This function must be called when an object (as returned by
1437
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1438
    caller wants the modifications saved to the backing store. Note
1439
    that all modified objects will be saved, but the target argument
1440
    is the one the caller wants to ensure that it's saved.
1441

1442
    @param target: an instance of either L{objects.Cluster},
1443
        L{objects.Node} or L{objects.Instance} which is existing in
1444
        the cluster
1445
    @param feedback_fn: Callable feedback function
1446

1447
    """
1448
    if self._config_data is None:
1449
      raise errors.ProgrammerError("Configuration file not read,"
1450
                                   " cannot save.")
1451
    update_serial = False
1452
    if isinstance(target, objects.Cluster):
1453
      test = target == self._config_data.cluster
1454
    elif isinstance(target, objects.Node):
1455
      test = target in self._config_data.nodes.values()
1456
      update_serial = True
1457
    elif isinstance(target, objects.Instance):
1458
      test = target in self._config_data.instances.values()
1459
    else:
1460
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
1461
                                   " ConfigWriter.Update" % type(target))
1462
    if not test:
1463
      raise errors.ConfigurationError("Configuration updated since object"
1464
                                      " has been read or unknown object")
1465
    target.serial_no += 1
1466
    target.mtime = now = time.time()
1467

    
1468
    if update_serial:
1469
      # for node updates, we need to increase the cluster serial too
1470
      self._config_data.cluster.serial_no += 1
1471
      self._config_data.cluster.mtime = now
1472

    
1473
    if isinstance(target, objects.Instance):
1474
      self._UnlockedReleaseDRBDMinors(target.name)
1475

    
1476
    self._WriteConfig(feedback_fn=feedback_fn)
1477

    
1478
  @locking.ssynchronized(_config_lock)
1479
  def DropECReservations(self, ec_id):
1480
    """Drop per-execution-context reservations
1481

1482
    """
1483
    for rm in self._all_rms:
1484
      rm.DropECReservations(ec_id)