Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ bf4af505

History | View | Annotate | Download (49.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
import os
35
import random
36
import logging
37
import time
38

    
39
from ganeti import errors
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import rpc
44
from ganeti import objects
45
from ganeti import serializer
46
from ganeti import uidpool
47

    
48

    
49
_config_lock = locking.SharedLock()
50

    
51
# job id used for resource management at config upgrade time
52
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
53

    
54

    
55
def _ValidateConfig(data):
56
  """Verifies that a configuration objects looks valid.
57

58
  This only verifies the version of the configuration.
59

60
  @raise errors.ConfigurationError: if the version differs from what
61
      we expect
62

63
  """
64
  if data.version != constants.CONFIG_VERSION:
65
    raise errors.ConfigurationError("Cluster configuration version"
66
                                    " mismatch, got %s instead of %s" %
67
                                    (data.version,
68
                                     constants.CONFIG_VERSION))
69

    
70

    
71
class TemporaryReservationManager:
72
  """A temporary resource reservation manager.
73

74
  This is used to reserve resources in a job, before using them, making sure
75
  other jobs cannot get them in the meantime.
76

77
  """
78
  def __init__(self):
79
    self._ec_reserved = {}
80

    
81
  def Reserved(self, resource):
82
    for holder_reserved in self._ec_reserved.items():
83
      if resource in holder_reserved:
84
        return True
85
    return False
86

    
87
  def Reserve(self, ec_id, resource):
88
    if self.Reserved(resource):
89
      raise errors.ReservationError("Duplicate reservation for resource: %s." %
90
                                    (resource))
91
    if ec_id not in self._ec_reserved:
92
      self._ec_reserved[ec_id] = set([resource])
93
    else:
94
      self._ec_reserved[ec_id].add(resource)
95

    
96
  def DropECReservations(self, ec_id):
97
    if ec_id in self._ec_reserved:
98
      del self._ec_reserved[ec_id]
99

    
100
  def GetReserved(self):
101
    all_reserved = set()
102
    for holder_reserved in self._ec_reserved.values():
103
      all_reserved.update(holder_reserved)
104
    return all_reserved
105

    
106
  def Generate(self, existing, generate_one_fn, ec_id):
107
    """Generate a new resource of this type
108

109
    """
110
    assert callable(generate_one_fn)
111

    
112
    all_elems = self.GetReserved()
113
    all_elems.update(existing)
114
    retries = 64
115
    while retries > 0:
116
      new_resource = generate_one_fn()
117
      if new_resource is not None and new_resource not in all_elems:
118
        break
119
    else:
120
      raise errors.ConfigurationError("Not able generate new resource"
121
                                      " (last tried: %s)" % new_resource)
122
    self.Reserve(ec_id, new_resource)
123
    return new_resource
124

    
125

    
126
class ConfigWriter:
127
  """The interface to the cluster configuration.
128

129
  @ivar _temporary_lvs: reservation manager for temporary LVs
130
  @ivar _all_rms: a list of all temporary reservation managers
131

132
  """
133
  def __init__(self, cfg_file=None, offline=False):
134
    self.write_count = 0
135
    self._lock = _config_lock
136
    self._config_data = None
137
    self._offline = offline
138
    if cfg_file is None:
139
      self._cfg_file = constants.CLUSTER_CONF_FILE
140
    else:
141
      self._cfg_file = cfg_file
142
    self._temporary_ids = TemporaryReservationManager()
143
    self._temporary_drbds = {}
144
    self._temporary_macs = TemporaryReservationManager()
145
    self._temporary_secrets = TemporaryReservationManager()
146
    self._temporary_lvs = TemporaryReservationManager()
147
    self._all_rms = [self._temporary_ids, self._temporary_macs,
148
                     self._temporary_secrets, self._temporary_lvs]
149
    # Note: in order to prevent errors when resolving our name in
150
    # _DistributeConfig, we compute it here once and reuse it; it's
151
    # better to raise an error before starting to modify the config
152
    # file than after it was modified
153
    self._my_hostname = utils.HostInfo().name
154
    self._last_cluster_serial = -1
155
    self._OpenConfig()
156

    
157
  # this method needs to be static, so that we can call it on the class
158
  @staticmethod
159
  def IsCluster():
160
    """Check if the cluster is configured.
161

162
    """
163
    return os.path.exists(constants.CLUSTER_CONF_FILE)
164

    
165
  def _GenerateOneMAC(self):
166
    """Generate one mac address
167

168
    """
169
    prefix = self._config_data.cluster.mac_prefix
170
    byte1 = random.randrange(0, 256)
171
    byte2 = random.randrange(0, 256)
172
    byte3 = random.randrange(0, 256)
173
    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
174
    return mac
175

    
176
  @locking.ssynchronized(_config_lock, shared=1)
177
  def GenerateMAC(self, ec_id):
178
    """Generate a MAC for an instance.
179

180
    This should check the current instances for duplicates.
181

182
    """
183
    existing = self._AllMACs()
184
    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
185

    
186
  @locking.ssynchronized(_config_lock, shared=1)
187
  def ReserveMAC(self, mac, ec_id):
188
    """Reserve a MAC for an instance.
189

190
    This only checks instances managed by this cluster, it does not
191
    check for potential collisions elsewhere.
192

193
    """
194
    all_macs = self._AllMACs()
195
    if mac in all_macs:
196
      raise errors.ReservationError("mac already in use")
197
    else:
198
      self._temporary_macs.Reserve(mac, ec_id)
199

    
200
  @locking.ssynchronized(_config_lock, shared=1)
201
  def ReserveLV(self, lv_name, ec_id):
202
    """Reserve an VG/LV pair for an instance.
203

204
    @type lv_name: string
205
    @param lv_name: the logical volume name to reserve
206

207
    """
208
    all_lvs = self._AllLVs()
209
    if lv_name in all_lvs:
210
      raise errors.ReservationError("LV already in use")
211
    else:
212
      self._temporary_lvs.Reserve(lv_name, ec_id)
213

    
214
  @locking.ssynchronized(_config_lock, shared=1)
215
  def GenerateDRBDSecret(self, ec_id):
216
    """Generate a DRBD secret.
217

218
    This checks the current disks for duplicates.
219

220
    """
221
    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
222
                                            utils.GenerateSecret,
223
                                            ec_id)
224

    
225
  def _AllLVs(self):
226
    """Compute the list of all LVs.
227

228
    """
229
    lvnames = set()
230
    for instance in self._config_data.instances.values():
231
      node_data = instance.MapLVsByNode()
232
      for lv_list in node_data.values():
233
        lvnames.update(lv_list)
234
    return lvnames
235

    
236
  def _AllIDs(self, include_temporary):
237
    """Compute the list of all UUIDs and names we have.
238

239
    @type include_temporary: boolean
240
    @param include_temporary: whether to include the _temporary_ids set
241
    @rtype: set
242
    @return: a set of IDs
243

244
    """
245
    existing = set()
246
    if include_temporary:
247
      existing.update(self._temporary_ids.GetReserved())
248
    existing.update(self._AllLVs())
249
    existing.update(self._config_data.instances.keys())
250
    existing.update(self._config_data.nodes.keys())
251
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
252
    return existing
253

    
254
  def _GenerateUniqueID(self, ec_id):
255
    """Generate an unique UUID.
256

257
    This checks the current node, instances and disk names for
258
    duplicates.
259

260
    @rtype: string
261
    @return: the unique id
262

263
    """
264
    existing = self._AllIDs(include_temporary=False)
265
    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
266

    
267
  @locking.ssynchronized(_config_lock, shared=1)
268
  def GenerateUniqueID(self, ec_id):
269
    """Generate an unique ID.
270

271
    This is just a wrapper over the unlocked version.
272

273
    @type ec_id: string
274
    @param ec_id: unique id for the job to reserve the id to
275

276
    """
277
    return self._GenerateUniqueID(ec_id)
278

    
279
  def _AllMACs(self):
280
    """Return all MACs present in the config.
281

282
    @rtype: list
283
    @return: the list of all MACs
284

285
    """
286
    result = []
287
    for instance in self._config_data.instances.values():
288
      for nic in instance.nics:
289
        result.append(nic.mac)
290

    
291
    return result
292

    
293
  def _AllDRBDSecrets(self):
294
    """Return all DRBD secrets present in the config.
295

296
    @rtype: list
297
    @return: the list of all DRBD secrets
298

299
    """
300
    def helper(disk, result):
301
      """Recursively gather secrets from this disk."""
302
      if disk.dev_type == constants.DT_DRBD8:
303
        result.append(disk.logical_id[5])
304
      if disk.children:
305
        for child in disk.children:
306
          helper(child, result)
307

    
308
    result = []
309
    for instance in self._config_data.instances.values():
310
      for disk in instance.disks:
311
        helper(disk, result)
312

    
313
    return result
314

    
315
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
316
    """Compute duplicate disk IDs
317

318
    @type disk: L{objects.Disk}
319
    @param disk: the disk at which to start searching
320
    @type l_ids: list
321
    @param l_ids: list of current logical ids
322
    @type p_ids: list
323
    @param p_ids: list of current physical ids
324
    @rtype: list
325
    @return: a list of error messages
326

327
    """
328
    result = []
329
    if disk.logical_id is not None:
330
      if disk.logical_id in l_ids:
331
        result.append("duplicate logical id %s" % str(disk.logical_id))
332
      else:
333
        l_ids.append(disk.logical_id)
334
    if disk.physical_id is not None:
335
      if disk.physical_id in p_ids:
336
        result.append("duplicate physical id %s" % str(disk.physical_id))
337
      else:
338
        p_ids.append(disk.physical_id)
339

    
340
    if disk.children:
341
      for child in disk.children:
342
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
343
    return result
344

    
345
  def _UnlockedVerifyConfig(self):
346
    """Verify function.
347

348
    @rtype: list
349
    @return: a list of error messages; a non-empty list signifies
350
        configuration errors
351

352
    """
353
    result = []
354
    seen_macs = []
355
    ports = {}
356
    data = self._config_data
357
    seen_lids = []
358
    seen_pids = []
359

    
360
    # global cluster checks
361
    if not data.cluster.enabled_hypervisors:
362
      result.append("enabled hypervisors list doesn't have any entries")
363
    invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
364
    if invalid_hvs:
365
      result.append("enabled hypervisors contains invalid entries: %s" %
366
                    invalid_hvs)
367
    missing_hvp = (set(data.cluster.enabled_hypervisors) -
368
                   set(data.cluster.hvparams.keys()))
369
    if missing_hvp:
370
      result.append("hypervisor parameters missing for the enabled"
371
                    " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
372

    
373
    if data.cluster.master_node not in data.nodes:
374
      result.append("cluster has invalid primary node '%s'" %
375
                    data.cluster.master_node)
376

    
377
    # per-instance checks
378
    for instance_name in data.instances:
379
      instance = data.instances[instance_name]
380
      if instance.name != instance_name:
381
        result.append("instance '%s' is indexed by wrong name '%s'" %
382
                      (instance.name, instance_name))
383
      if instance.primary_node not in data.nodes:
384
        result.append("instance '%s' has invalid primary node '%s'" %
385
                      (instance_name, instance.primary_node))
386
      for snode in instance.secondary_nodes:
387
        if snode not in data.nodes:
388
          result.append("instance '%s' has invalid secondary node '%s'" %
389
                        (instance_name, snode))
390
      for idx, nic in enumerate(instance.nics):
391
        if nic.mac in seen_macs:
392
          result.append("instance '%s' has NIC %d mac %s duplicate" %
393
                        (instance_name, idx, nic.mac))
394
        else:
395
          seen_macs.append(nic.mac)
396

    
397
      # gather the drbd ports for duplicate checks
398
      for dsk in instance.disks:
399
        if dsk.dev_type in constants.LDS_DRBD:
400
          tcp_port = dsk.logical_id[2]
401
          if tcp_port not in ports:
402
            ports[tcp_port] = []
403
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
404
      # gather network port reservation
405
      net_port = getattr(instance, "network_port", None)
406
      if net_port is not None:
407
        if net_port not in ports:
408
          ports[net_port] = []
409
        ports[net_port].append((instance.name, "network port"))
410

    
411
      # instance disk verify
412
      for idx, disk in enumerate(instance.disks):
413
        result.extend(["instance '%s' disk %d error: %s" %
414
                       (instance.name, idx, msg) for msg in disk.Verify()])
415
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
416

    
417
    # cluster-wide pool of free ports
418
    for free_port in data.cluster.tcpudp_port_pool:
419
      if free_port not in ports:
420
        ports[free_port] = []
421
      ports[free_port].append(("cluster", "port marked as free"))
422

    
423
    # compute tcp/udp duplicate ports
424
    keys = ports.keys()
425
    keys.sort()
426
    for pnum in keys:
427
      pdata = ports[pnum]
428
      if len(pdata) > 1:
429
        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
430
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
431

    
432
    # highest used tcp port check
433
    if keys:
434
      if keys[-1] > data.cluster.highest_used_port:
435
        result.append("Highest used port mismatch, saved %s, computed %s" %
436
                      (data.cluster.highest_used_port, keys[-1]))
437

    
438
    if not data.nodes[data.cluster.master_node].master_candidate:
439
      result.append("Master node is not a master candidate")
440

    
441
    # master candidate checks
442
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
443
    if mc_now < mc_max:
444
      result.append("Not enough master candidates: actual %d, target %d" %
445
                    (mc_now, mc_max))
446

    
447
    # node checks
448
    for node_name, node in data.nodes.items():
449
      if node.name != node_name:
450
        result.append("Node '%s' is indexed by wrong name '%s'" %
451
                      (node.name, node_name))
452
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
453
        result.append("Node %s state is invalid: master_candidate=%s,"
454
                      " drain=%s, offline=%s" %
455
                      (node.name, node.master_candidate, node.drain,
456
                       node.offline))
457

    
458
    # drbd minors check
459
    _, duplicates = self._UnlockedComputeDRBDMap()
460
    for node, minor, instance_a, instance_b in duplicates:
461
      result.append("DRBD minor %d on node %s is assigned twice to instances"
462
                    " %s and %s" % (minor, node, instance_a, instance_b))
463

    
464
    # IP checks
465
    default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
466
    ips = {}
467

    
468
    def _AddIpAddress(ip, name):
469
      ips.setdefault(ip, []).append(name)
470

    
471
    _AddIpAddress(data.cluster.master_ip, "cluster_ip")
472

    
473
    for node in data.nodes.values():
474
      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
475
      if node.secondary_ip != node.primary_ip:
476
        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
477

    
478
    for instance in data.instances.values():
479
      for idx, nic in enumerate(instance.nics):
480
        if nic.ip is None:
481
          continue
482

    
483
        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
484
        nic_mode = nicparams[constants.NIC_MODE]
485
        nic_link = nicparams[constants.NIC_LINK]
486

    
487
        if nic_mode == constants.NIC_MODE_BRIDGED:
488
          link = "bridge:%s" % nic_link
489
        elif nic_mode == constants.NIC_MODE_ROUTED:
490
          link = "route:%s" % nic_link
491
        else:
492
          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
493

    
494
        _AddIpAddress("%s/%s" % (link, nic.ip),
495
                      "instance:%s/nic:%d" % (instance.name, idx))
496

    
497
    for ip, owners in ips.items():
498
      if len(owners) > 1:
499
        result.append("IP address %s is used by multiple owners: %s" %
500
                      (ip, utils.CommaJoin(owners)))
501

    
502
    return result
503

    
504
  @locking.ssynchronized(_config_lock, shared=1)
505
  def VerifyConfig(self):
506
    """Verify function.
507

508
    This is just a wrapper over L{_UnlockedVerifyConfig}.
509

510
    @rtype: list
511
    @return: a list of error messages; a non-empty list signifies
512
        configuration errors
513

514
    """
515
    return self._UnlockedVerifyConfig()
516

    
517
  def _UnlockedSetDiskID(self, disk, node_name):
518
    """Convert the unique ID to the ID needed on the target nodes.
519

520
    This is used only for drbd, which needs ip/port configuration.
521

522
    The routine descends down and updates its children also, because
523
    this helps when the only the top device is passed to the remote
524
    node.
525

526
    This function is for internal use, when the config lock is already held.
527

528
    """
529
    if disk.children:
530
      for child in disk.children:
531
        self._UnlockedSetDiskID(child, node_name)
532

    
533
    if disk.logical_id is None and disk.physical_id is not None:
534
      return
535
    if disk.dev_type == constants.LD_DRBD8:
536
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
537
      if node_name not in (pnode, snode):
538
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
539
                                        node_name)
540
      pnode_info = self._UnlockedGetNodeInfo(pnode)
541
      snode_info = self._UnlockedGetNodeInfo(snode)
542
      if pnode_info is None or snode_info is None:
543
        raise errors.ConfigurationError("Can't find primary or secondary node"
544
                                        " for %s" % str(disk))
545
      p_data = (pnode_info.secondary_ip, port)
546
      s_data = (snode_info.secondary_ip, port)
547
      if pnode == node_name:
548
        disk.physical_id = p_data + s_data + (pminor, secret)
549
      else: # it must be secondary, we tested above
550
        disk.physical_id = s_data + p_data + (sminor, secret)
551
    else:
552
      disk.physical_id = disk.logical_id
553
    return
554

    
555
  @locking.ssynchronized(_config_lock)
556
  def SetDiskID(self, disk, node_name):
557
    """Convert the unique ID to the ID needed on the target nodes.
558

559
    This is used only for drbd, which needs ip/port configuration.
560

561
    The routine descends down and updates its children also, because
562
    this helps when the only the top device is passed to the remote
563
    node.
564

565
    """
566
    return self._UnlockedSetDiskID(disk, node_name)
567

    
568
  @locking.ssynchronized(_config_lock)
569
  def AddTcpUdpPort(self, port):
570
    """Adds a new port to the available port pool.
571

572
    """
573
    if not isinstance(port, int):
574
      raise errors.ProgrammerError("Invalid type passed for port")
575

    
576
    self._config_data.cluster.tcpudp_port_pool.add(port)
577
    self._WriteConfig()
578

    
579
  @locking.ssynchronized(_config_lock, shared=1)
580
  def GetPortList(self):
581
    """Returns a copy of the current port list.
582

583
    """
584
    return self._config_data.cluster.tcpudp_port_pool.copy()
585

    
586
  @locking.ssynchronized(_config_lock)
587
  def AllocatePort(self):
588
    """Allocate a port.
589

590
    The port will be taken from the available port pool or from the
591
    default port range (and in this case we increase
592
    highest_used_port).
593

594
    """
595
    # If there are TCP/IP ports configured, we use them first.
596
    if self._config_data.cluster.tcpudp_port_pool:
597
      port = self._config_data.cluster.tcpudp_port_pool.pop()
598
    else:
599
      port = self._config_data.cluster.highest_used_port + 1
600
      if port >= constants.LAST_DRBD_PORT:
601
        raise errors.ConfigurationError("The highest used port is greater"
602
                                        " than %s. Aborting." %
603
                                        constants.LAST_DRBD_PORT)
604
      self._config_data.cluster.highest_used_port = port
605

    
606
    self._WriteConfig()
607
    return port
608

    
609
  def _UnlockedComputeDRBDMap(self):
610
    """Compute the used DRBD minor/nodes.
611

612
    @rtype: (dict, list)
613
    @return: dictionary of node_name: dict of minor: instance_name;
614
        the returned dict will have all the nodes in it (even if with
615
        an empty list), and a list of duplicates; if the duplicates
616
        list is not empty, the configuration is corrupted and its caller
617
        should raise an exception
618

619
    """
620
    def _AppendUsedPorts(instance_name, disk, used):
621
      duplicates = []
622
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
623
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
624
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
625
          assert node in used, ("Node '%s' of instance '%s' not found"
626
                                " in node list" % (node, instance_name))
627
          if port in used[node]:
628
            duplicates.append((node, port, instance_name, used[node][port]))
629
          else:
630
            used[node][port] = instance_name
631
      if disk.children:
632
        for child in disk.children:
633
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
634
      return duplicates
635

    
636
    duplicates = []
637
    my_dict = dict((node, {}) for node in self._config_data.nodes)
638
    for instance in self._config_data.instances.itervalues():
639
      for disk in instance.disks:
640
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
641
    for (node, minor), instance in self._temporary_drbds.iteritems():
642
      if minor in my_dict[node] and my_dict[node][minor] != instance:
643
        duplicates.append((node, minor, instance, my_dict[node][minor]))
644
      else:
645
        my_dict[node][minor] = instance
646
    return my_dict, duplicates
647

    
648
  @locking.ssynchronized(_config_lock)
649
  def ComputeDRBDMap(self):
650
    """Compute the used DRBD minor/nodes.
651

652
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
653

654
    @return: dictionary of node_name: dict of minor: instance_name;
655
        the returned dict will have all the nodes in it (even if with
656
        an empty list).
657

658
    """
659
    d_map, duplicates = self._UnlockedComputeDRBDMap()
660
    if duplicates:
661
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
662
                                      str(duplicates))
663
    return d_map
664

    
665
  @locking.ssynchronized(_config_lock)
666
  def AllocateDRBDMinor(self, nodes, instance):
667
    """Allocate a drbd minor.
668

669
    The free minor will be automatically computed from the existing
670
    devices. A node can be given multiple times in order to allocate
671
    multiple minors. The result is the list of minors, in the same
672
    order as the passed nodes.
673

674
    @type instance: string
675
    @param instance: the instance for which we allocate minors
676

677
    """
678
    assert isinstance(instance, basestring), \
679
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
680

    
681
    d_map, duplicates = self._UnlockedComputeDRBDMap()
682
    if duplicates:
683
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
684
                                      str(duplicates))
685
    result = []
686
    for nname in nodes:
687
      ndata = d_map[nname]
688
      if not ndata:
689
        # no minors used, we can start at 0
690
        result.append(0)
691
        ndata[0] = instance
692
        self._temporary_drbds[(nname, 0)] = instance
693
        continue
694
      keys = ndata.keys()
695
      keys.sort()
696
      ffree = utils.FirstFree(keys)
697
      if ffree is None:
698
        # return the next minor
699
        # TODO: implement high-limit check
700
        minor = keys[-1] + 1
701
      else:
702
        minor = ffree
703
      # double-check minor against current instances
704
      assert minor not in d_map[nname], \
705
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
706
              " already allocated to instance %s" %
707
              (minor, nname, d_map[nname][minor]))
708
      ndata[minor] = instance
709
      # double-check minor against reservation
710
      r_key = (nname, minor)
711
      assert r_key not in self._temporary_drbds, \
712
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
713
              " reserved for instance %s" %
714
              (minor, nname, self._temporary_drbds[r_key]))
715
      self._temporary_drbds[r_key] = instance
716
      result.append(minor)
717
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
718
                  nodes, result)
719
    return result
720

    
721
  def _UnlockedReleaseDRBDMinors(self, instance):
722
    """Release temporary drbd minors allocated for a given instance.
723

724
    @type instance: string
725
    @param instance: the instance for which temporary minors should be
726
                     released
727

728
    """
729
    assert isinstance(instance, basestring), \
730
           "Invalid argument passed to ReleaseDRBDMinors"
731
    for key, name in self._temporary_drbds.items():
732
      if name == instance:
733
        del self._temporary_drbds[key]
734

    
735
  @locking.ssynchronized(_config_lock)
736
  def ReleaseDRBDMinors(self, instance):
737
    """Release temporary drbd minors allocated for a given instance.
738

739
    This should be called on the error paths, on the success paths
740
    it's automatically called by the ConfigWriter add and update
741
    functions.
742

743
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
744

745
    @type instance: string
746
    @param instance: the instance for which temporary minors should be
747
                     released
748

749
    """
750
    self._UnlockedReleaseDRBDMinors(instance)
751

    
752
  @locking.ssynchronized(_config_lock, shared=1)
753
  def GetConfigVersion(self):
754
    """Get the configuration version.
755

756
    @return: Config version
757

758
    """
759
    return self._config_data.version
760

    
761
  @locking.ssynchronized(_config_lock, shared=1)
762
  def GetClusterName(self):
763
    """Get cluster name.
764

765
    @return: Cluster name
766

767
    """
768
    return self._config_data.cluster.cluster_name
769

    
770
  @locking.ssynchronized(_config_lock, shared=1)
771
  def GetMasterNode(self):
772
    """Get the hostname of the master node for this cluster.
773

774
    @return: Master hostname
775

776
    """
777
    return self._config_data.cluster.master_node
778

    
779
  @locking.ssynchronized(_config_lock, shared=1)
780
  def GetMasterIP(self):
781
    """Get the IP of the master node for this cluster.
782

783
    @return: Master IP
784

785
    """
786
    return self._config_data.cluster.master_ip
787

    
788
  @locking.ssynchronized(_config_lock, shared=1)
789
  def GetMasterNetdev(self):
790
    """Get the master network device for this cluster.
791

792
    """
793
    return self._config_data.cluster.master_netdev
794

    
795
  @locking.ssynchronized(_config_lock, shared=1)
796
  def GetFileStorageDir(self):
797
    """Get the file storage dir for this cluster.
798

799
    """
800
    return self._config_data.cluster.file_storage_dir
801

    
802
  @locking.ssynchronized(_config_lock, shared=1)
803
  def GetHypervisorType(self):
804
    """Get the hypervisor type for this cluster.
805

806
    """
807
    return self._config_data.cluster.enabled_hypervisors[0]
808

    
809
  @locking.ssynchronized(_config_lock, shared=1)
810
  def GetHostKey(self):
811
    """Return the rsa hostkey from the config.
812

813
    @rtype: string
814
    @return: the rsa hostkey
815

816
    """
817
    return self._config_data.cluster.rsahostkeypub
818

    
819
  @locking.ssynchronized(_config_lock, shared=1)
820
  def GetDefaultIAllocator(self):
821
    """Get the default instance allocator for this cluster.
822

823
    """
824
    return self._config_data.cluster.default_iallocator
825

    
826
  @locking.ssynchronized(_config_lock)
827
  def AddInstance(self, instance, ec_id):
828
    """Add an instance to the config.
829

830
    This should be used after creating a new instance.
831

832
    @type instance: L{objects.Instance}
833
    @param instance: the instance object
834

835
    """
836
    if not isinstance(instance, objects.Instance):
837
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
838

    
839
    if instance.disk_template != constants.DT_DISKLESS:
840
      all_lvs = instance.MapLVsByNode()
841
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
842

    
843
    all_macs = self._AllMACs()
844
    for nic in instance.nics:
845
      if nic.mac in all_macs:
846
        raise errors.ConfigurationError("Cannot add instance %s:"
847
                                        " MAC address '%s' already in use." %
848
                                        (instance.name, nic.mac))
849

    
850
    self._EnsureUUID(instance, ec_id)
851

    
852
    instance.serial_no = 1
853
    instance.ctime = instance.mtime = time.time()
854
    self._config_data.instances[instance.name] = instance
855
    self._config_data.cluster.serial_no += 1
856
    self._UnlockedReleaseDRBDMinors(instance.name)
857
    self._WriteConfig()
858

    
859
  def _EnsureUUID(self, item, ec_id):
860
    """Ensures a given object has a valid UUID.
861

862
    @param item: the instance or node to be checked
863
    @param ec_id: the execution context id for the uuid reservation
864

865
    """
866
    if not item.uuid:
867
      item.uuid = self._GenerateUniqueID(ec_id)
868
    elif item.uuid in self._AllIDs(include_temporary=True):
869
      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
870
                                      " in use" % (item.name, item.uuid))
871

    
872
  def _SetInstanceStatus(self, instance_name, status):
873
    """Set the instance's status to a given value.
874

875
    """
876
    assert isinstance(status, bool), \
877
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
878

    
879
    if instance_name not in self._config_data.instances:
880
      raise errors.ConfigurationError("Unknown instance '%s'" %
881
                                      instance_name)
882
    instance = self._config_data.instances[instance_name]
883
    if instance.admin_up != status:
884
      instance.admin_up = status
885
      instance.serial_no += 1
886
      instance.mtime = time.time()
887
      self._WriteConfig()
888

    
889
  @locking.ssynchronized(_config_lock)
890
  def MarkInstanceUp(self, instance_name):
891
    """Mark the instance status to up in the config.
892

893
    """
894
    self._SetInstanceStatus(instance_name, True)
895

    
896
  @locking.ssynchronized(_config_lock)
897
  def RemoveInstance(self, instance_name):
898
    """Remove the instance from the configuration.
899

900
    """
901
    if instance_name not in self._config_data.instances:
902
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
903
    del self._config_data.instances[instance_name]
904
    self._config_data.cluster.serial_no += 1
905
    self._WriteConfig()
906

    
907
  @locking.ssynchronized(_config_lock)
908
  def RenameInstance(self, old_name, new_name):
909
    """Rename an instance.
910

911
    This needs to be done in ConfigWriter and not by RemoveInstance
912
    combined with AddInstance as only we can guarantee an atomic
913
    rename.
914

915
    """
916
    if old_name not in self._config_data.instances:
917
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
918
    inst = self._config_data.instances[old_name]
919
    del self._config_data.instances[old_name]
920
    inst.name = new_name
921

    
922
    for disk in inst.disks:
923
      if disk.dev_type == constants.LD_FILE:
924
        # rename the file paths in logical and physical id
925
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
926
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
927
                                              utils.PathJoin(file_storage_dir,
928
                                                             inst.name,
929
                                                             disk.iv_name))
930

    
931
    self._config_data.instances[inst.name] = inst
932
    self._WriteConfig()
933

    
934
  @locking.ssynchronized(_config_lock)
935
  def MarkInstanceDown(self, instance_name):
936
    """Mark the status of an instance to down in the configuration.
937

938
    """
939
    self._SetInstanceStatus(instance_name, False)
940

    
941
  def _UnlockedGetInstanceList(self):
942
    """Get the list of instances.
943

944
    This function is for internal use, when the config lock is already held.
945

946
    """
947
    return self._config_data.instances.keys()
948

    
949
  @locking.ssynchronized(_config_lock, shared=1)
950
  def GetInstanceList(self):
951
    """Get the list of instances.
952

953
    @return: array of instances, ex. ['instance2.example.com',
954
        'instance1.example.com']
955

956
    """
957
    return self._UnlockedGetInstanceList()
958

    
959
  @locking.ssynchronized(_config_lock, shared=1)
960
  def ExpandInstanceName(self, short_name):
961
    """Attempt to expand an incomplete instance name.
962

963
    """
964
    return utils.MatchNameComponent(short_name,
965
                                    self._config_data.instances.keys(),
966
                                    case_sensitive=False)
967

    
968
  def _UnlockedGetInstanceInfo(self, instance_name):
969
    """Returns information about an instance.
970

971
    This function is for internal use, when the config lock is already held.
972

973
    """
974
    if instance_name not in self._config_data.instances:
975
      return None
976

    
977
    return self._config_data.instances[instance_name]
978

    
979
  @locking.ssynchronized(_config_lock, shared=1)
980
  def GetInstanceInfo(self, instance_name):
981
    """Returns information about an instance.
982

983
    It takes the information from the configuration file. Other information of
984
    an instance are taken from the live systems.
985

986
    @param instance_name: name of the instance, e.g.
987
        I{instance1.example.com}
988

989
    @rtype: L{objects.Instance}
990
    @return: the instance object
991

992
    """
993
    return self._UnlockedGetInstanceInfo(instance_name)
994

    
995
  @locking.ssynchronized(_config_lock, shared=1)
996
  def GetAllInstancesInfo(self):
997
    """Get the configuration of all instances.
998

999
    @rtype: dict
1000
    @return: dict of (instance, instance_info), where instance_info is what
1001
              would GetInstanceInfo return for the node
1002

1003
    """
1004
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
1005
                    for instance in self._UnlockedGetInstanceList()])
1006
    return my_dict
1007

    
1008
  @locking.ssynchronized(_config_lock)
1009
  def AddNode(self, node, ec_id):
1010
    """Add a node to the configuration.
1011

1012
    @type node: L{objects.Node}
1013
    @param node: a Node instance
1014

1015
    """
1016
    logging.info("Adding node %s to configuration", node.name)
1017

    
1018
    self._EnsureUUID(node, ec_id)
1019

    
1020
    node.serial_no = 1
1021
    node.ctime = node.mtime = time.time()
1022
    self._config_data.nodes[node.name] = node
1023
    self._config_data.cluster.serial_no += 1
1024
    self._WriteConfig()
1025

    
1026
  @locking.ssynchronized(_config_lock)
1027
  def RemoveNode(self, node_name):
1028
    """Remove a node from the configuration.
1029

1030
    """
1031
    logging.info("Removing node %s from configuration", node_name)
1032

    
1033
    if node_name not in self._config_data.nodes:
1034
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1035

    
1036
    del self._config_data.nodes[node_name]
1037
    self._config_data.cluster.serial_no += 1
1038
    self._WriteConfig()
1039

    
1040
  @locking.ssynchronized(_config_lock, shared=1)
1041
  def ExpandNodeName(self, short_name):
1042
    """Attempt to expand an incomplete instance name.
1043

1044
    """
1045
    return utils.MatchNameComponent(short_name,
1046
                                    self._config_data.nodes.keys(),
1047
                                    case_sensitive=False)
1048

    
1049
  def _UnlockedGetNodeInfo(self, node_name):
1050
    """Get the configuration of a node, as stored in the config.
1051

1052
    This function is for internal use, when the config lock is already
1053
    held.
1054

1055
    @param node_name: the node name, e.g. I{node1.example.com}
1056

1057
    @rtype: L{objects.Node}
1058
    @return: the node object
1059

1060
    """
1061
    if node_name not in self._config_data.nodes:
1062
      return None
1063

    
1064
    return self._config_data.nodes[node_name]
1065

    
1066
  @locking.ssynchronized(_config_lock, shared=1)
1067
  def GetNodeInfo(self, node_name):
1068
    """Get the configuration of a node, as stored in the config.
1069

1070
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1071

1072
    @param node_name: the node name, e.g. I{node1.example.com}
1073

1074
    @rtype: L{objects.Node}
1075
    @return: the node object
1076

1077
    """
1078
    return self._UnlockedGetNodeInfo(node_name)
1079

    
1080
  def _UnlockedGetNodeList(self):
1081
    """Return the list of nodes which are in the configuration.
1082

1083
    This function is for internal use, when the config lock is already
1084
    held.
1085

1086
    @rtype: list
1087

1088
    """
1089
    return self._config_data.nodes.keys()
1090

    
1091
  @locking.ssynchronized(_config_lock, shared=1)
1092
  def GetNodeList(self):
1093
    """Return the list of nodes which are in the configuration.
1094

1095
    """
1096
    return self._UnlockedGetNodeList()
1097

    
1098
  def _UnlockedGetOnlineNodeList(self):
1099
    """Return the list of nodes which are online.
1100

1101
    """
1102
    all_nodes = [self._UnlockedGetNodeInfo(node)
1103
                 for node in self._UnlockedGetNodeList()]
1104
    return [node.name for node in all_nodes if not node.offline]
1105

    
1106
  @locking.ssynchronized(_config_lock, shared=1)
1107
  def GetOnlineNodeList(self):
1108
    """Return the list of nodes which are online.
1109

1110
    """
1111
    return self._UnlockedGetOnlineNodeList()
1112

    
1113
  @locking.ssynchronized(_config_lock, shared=1)
1114
  def GetAllNodesInfo(self):
1115
    """Get the configuration of all nodes.
1116

1117
    @rtype: dict
1118
    @return: dict of (node, node_info), where node_info is what
1119
              would GetNodeInfo return for the node
1120

1121
    """
1122
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1123
                    for node in self._UnlockedGetNodeList()])
1124
    return my_dict
1125

    
1126
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1127
    """Get the number of current and maximum desired and possible candidates.
1128

1129
    @type exceptions: list
1130
    @param exceptions: if passed, list of nodes that should be ignored
1131
    @rtype: tuple
1132
    @return: tuple of (current, desired and possible, possible)
1133

1134
    """
1135
    mc_now = mc_should = mc_max = 0
1136
    for node in self._config_data.nodes.values():
1137
      if exceptions and node.name in exceptions:
1138
        continue
1139
      if not (node.offline or node.drained):
1140
        mc_max += 1
1141
      if node.master_candidate:
1142
        mc_now += 1
1143
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1144
    return (mc_now, mc_should, mc_max)
1145

    
1146
  @locking.ssynchronized(_config_lock, shared=1)
1147
  def GetMasterCandidateStats(self, exceptions=None):
1148
    """Get the number of current and maximum possible candidates.
1149

1150
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1151

1152
    @type exceptions: list
1153
    @param exceptions: if passed, list of nodes that should be ignored
1154
    @rtype: tuple
1155
    @return: tuple of (current, max)
1156

1157
    """
1158
    return self._UnlockedGetMasterCandidateStats(exceptions)
1159

    
1160
  @locking.ssynchronized(_config_lock)
1161
  def MaintainCandidatePool(self, exceptions):
1162
    """Try to grow the candidate pool to the desired size.
1163

1164
    @type exceptions: list
1165
    @param exceptions: if passed, list of nodes that should be ignored
1166
    @rtype: list
1167
    @return: list with the adjusted nodes (L{objects.Node} instances)
1168

1169
    """
1170
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1171
    mod_list = []
1172
    if mc_now < mc_max:
1173
      node_list = self._config_data.nodes.keys()
1174
      random.shuffle(node_list)
1175
      for name in node_list:
1176
        if mc_now >= mc_max:
1177
          break
1178
        node = self._config_data.nodes[name]
1179
        if (node.master_candidate or node.offline or node.drained or
1180
            node.name in exceptions):
1181
          continue
1182
        mod_list.append(node)
1183
        node.master_candidate = True
1184
        node.serial_no += 1
1185
        mc_now += 1
1186
      if mc_now != mc_max:
1187
        # this should not happen
1188
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1189
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1190
      if mod_list:
1191
        self._config_data.cluster.serial_no += 1
1192
        self._WriteConfig()
1193

    
1194
    return mod_list
1195

    
1196
  def _BumpSerialNo(self):
1197
    """Bump up the serial number of the config.
1198

1199
    """
1200
    self._config_data.serial_no += 1
1201
    self._config_data.mtime = time.time()
1202

    
1203
  def _AllUUIDObjects(self):
1204
    """Returns all objects with uuid attributes.
1205

1206
    """
1207
    return (self._config_data.instances.values() +
1208
            self._config_data.nodes.values() +
1209
            [self._config_data.cluster])
1210

    
1211
  def _OpenConfig(self):
1212
    """Read the config data from disk.
1213

1214
    """
1215
    raw_data = utils.ReadFile(self._cfg_file)
1216

    
1217
    try:
1218
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1219
    except Exception, err:
1220
      raise errors.ConfigurationError(err)
1221

    
1222
    # Make sure the configuration has the right version
1223
    _ValidateConfig(data)
1224

    
1225
    if (not hasattr(data, 'cluster') or
1226
        not hasattr(data.cluster, 'rsahostkeypub')):
1227
      raise errors.ConfigurationError("Incomplete configuration"
1228
                                      " (missing cluster.rsahostkeypub)")
1229

    
1230
    # Upgrade configuration if needed
1231
    data.UpgradeConfig()
1232

    
1233
    self._config_data = data
1234
    # reset the last serial as -1 so that the next write will cause
1235
    # ssconf update
1236
    self._last_cluster_serial = -1
1237

    
1238
    # And finally run our (custom) config upgrade sequence
1239
    self._UpgradeConfig()
1240

    
1241
  def _UpgradeConfig(self):
1242
    """Run upgrade steps that cannot be done purely in the objects.
1243

1244
    This is because some data elements need uniqueness across the
1245
    whole configuration, etc.
1246

1247
    @warning: this function will call L{_WriteConfig()}, but also
1248
        L{DropECReservations} so it needs to be called only from a
1249
        "safe" place (the constructor). If one wanted to call it with
1250
        the lock held, a DropECReservationUnlocked would need to be
1251
        created first, to avoid causing deadlock.
1252

1253
    """
1254
    modified = False
1255
    for item in self._AllUUIDObjects():
1256
      if item.uuid is None:
1257
        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1258
        modified = True
1259
    if modified:
1260
      self._WriteConfig()
1261
      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1262
      # only called at config init time, without the lock held
1263
      self.DropECReservations(_UPGRADE_CONFIG_JID)
1264

    
1265
  def _DistributeConfig(self, feedback_fn):
1266
    """Distribute the configuration to the other nodes.
1267

1268
    Currently, this only copies the configuration file. In the future,
1269
    it could be used to encapsulate the 2/3-phase update mechanism.
1270

1271
    """
1272
    if self._offline:
1273
      return True
1274

    
1275
    bad = False
1276

    
1277
    node_list = []
1278
    addr_list = []
1279
    myhostname = self._my_hostname
1280
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1281
    # since the node list comes from _UnlocketGetNodeList, and we are
1282
    # called with the lock held, so no modifications should take place
1283
    # in between
1284
    for node_name in self._UnlockedGetNodeList():
1285
      if node_name == myhostname:
1286
        continue
1287
      node_info = self._UnlockedGetNodeInfo(node_name)
1288
      if not node_info.master_candidate:
1289
        continue
1290
      node_list.append(node_info.name)
1291
      addr_list.append(node_info.primary_ip)
1292

    
1293
    result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1294
                                            address_list=addr_list)
1295
    for to_node, to_result in result.items():
1296
      msg = to_result.fail_msg
1297
      if msg:
1298
        msg = ("Copy of file %s to node %s failed: %s" %
1299
               (self._cfg_file, to_node, msg))
1300
        logging.error(msg)
1301

    
1302
        if feedback_fn:
1303
          feedback_fn(msg)
1304

    
1305
        bad = True
1306

    
1307
    return not bad
1308

    
1309
  def _WriteConfig(self, destination=None, feedback_fn=None):
1310
    """Write the configuration data to persistent storage.
1311

1312
    """
1313
    assert feedback_fn is None or callable(feedback_fn)
1314

    
1315
    # Warn on config errors, but don't abort the save - the
1316
    # configuration has already been modified, and we can't revert;
1317
    # the best we can do is to warn the user and save as is, leaving
1318
    # recovery to the user
1319
    config_errors = self._UnlockedVerifyConfig()
1320
    if config_errors:
1321
      errmsg = ("Configuration data is not consistent: %s" %
1322
                (utils.CommaJoin(config_errors)))
1323
      logging.critical(errmsg)
1324
      if feedback_fn:
1325
        feedback_fn(errmsg)
1326

    
1327
    if destination is None:
1328
      destination = self._cfg_file
1329
    self._BumpSerialNo()
1330
    txt = serializer.Dump(self._config_data.ToDict())
1331

    
1332
    utils.WriteFile(destination, data=txt)
1333

    
1334
    self.write_count += 1
1335

    
1336
    # and redistribute the config file to master candidates
1337
    self._DistributeConfig(feedback_fn)
1338

    
1339
    # Write ssconf files on all nodes (including locally)
1340
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1341
      if not self._offline:
1342
        result = rpc.RpcRunner.call_write_ssconf_files(
1343
          self._UnlockedGetOnlineNodeList(),
1344
          self._UnlockedGetSsconfValues())
1345

    
1346
        for nname, nresu in result.items():
1347
          msg = nresu.fail_msg
1348
          if msg:
1349
            errmsg = ("Error while uploading ssconf files to"
1350
                      " node %s: %s" % (nname, msg))
1351
            logging.warning(errmsg)
1352

    
1353
            if feedback_fn:
1354
              feedback_fn(errmsg)
1355

    
1356
      self._last_cluster_serial = self._config_data.cluster.serial_no
1357

    
1358
  def _UnlockedGetSsconfValues(self):
1359
    """Return the values needed by ssconf.
1360

1361
    @rtype: dict
1362
    @return: a dictionary with keys the ssconf names and values their
1363
        associated value
1364

1365
    """
1366
    fn = "\n".join
1367
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1368
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1369
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1370
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1371
                    for ninfo in node_info]
1372
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1373
                    for ninfo in node_info]
1374

    
1375
    instance_data = fn(instance_names)
1376
    off_data = fn(node.name for node in node_info if node.offline)
1377
    on_data = fn(node.name for node in node_info if not node.offline)
1378
    mc_data = fn(node.name for node in node_info if node.master_candidate)
1379
    mc_ips_data = fn(node.primary_ip for node in node_info
1380
                     if node.master_candidate)
1381
    node_data = fn(node_names)
1382
    node_pri_ips_data = fn(node_pri_ips)
1383
    node_snd_ips_data = fn(node_snd_ips)
1384

    
1385
    cluster = self._config_data.cluster
1386
    cluster_tags = fn(cluster.GetTags())
1387

    
1388
    hypervisor_list = fn(cluster.enabled_hypervisors)
1389

    
1390
    uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1391

    
1392
    return {
1393
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
1394
      constants.SS_CLUSTER_TAGS: cluster_tags,
1395
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1396
      constants.SS_MASTER_CANDIDATES: mc_data,
1397
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1398
      constants.SS_MASTER_IP: cluster.master_ip,
1399
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
1400
      constants.SS_MASTER_NODE: cluster.master_node,
1401
      constants.SS_NODE_LIST: node_data,
1402
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1403
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1404
      constants.SS_OFFLINE_NODES: off_data,
1405
      constants.SS_ONLINE_NODES: on_data,
1406
      constants.SS_INSTANCE_LIST: instance_data,
1407
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1408
      constants.SS_HYPERVISOR_LIST: hypervisor_list,
1409
      constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1410
      constants.SS_UID_POOL: uid_pool,
1411
      }
1412

    
1413
  @locking.ssynchronized(_config_lock, shared=1)
1414
  def GetVGName(self):
1415
    """Return the volume group name.
1416

1417
    """
1418
    return self._config_data.cluster.volume_group_name
1419

    
1420
  @locking.ssynchronized(_config_lock)
1421
  def SetVGName(self, vg_name):
1422
    """Set the volume group name.
1423

1424
    """
1425
    self._config_data.cluster.volume_group_name = vg_name
1426
    self._config_data.cluster.serial_no += 1
1427
    self._WriteConfig()
1428

    
1429
  @locking.ssynchronized(_config_lock, shared=1)
1430
  def GetDRBDHelper(self):
1431
    """Return DRBD usermode helper.
1432

1433
    """
1434
    return self._config_data.cluster.drbd_usermode_helper
1435

    
1436
  @locking.ssynchronized(_config_lock)
1437
  def SetDRBDHelper(self, drbd_helper):
1438
    """Set DRBD usermode helper.
1439

1440
    """
1441
    self._config_data.cluster.drbd_usermode_helper = drbd_helper
1442
    self._config_data.cluster.serial_no += 1
1443
    self._WriteConfig()
1444

    
1445
  @locking.ssynchronized(_config_lock, shared=1)
1446
  def GetMACPrefix(self):
1447
    """Return the mac prefix.
1448

1449
    """
1450
    return self._config_data.cluster.mac_prefix
1451

    
1452
  @locking.ssynchronized(_config_lock, shared=1)
1453
  def GetClusterInfo(self):
1454
    """Returns information about the cluster
1455

1456
    @rtype: L{objects.Cluster}
1457
    @return: the cluster object
1458

1459
    """
1460
    return self._config_data.cluster
1461

    
1462
  @locking.ssynchronized(_config_lock, shared=1)
1463
  def HasAnyDiskOfType(self, dev_type):
1464
    """Check if in there is at disk of the given type in the configuration.
1465

1466
    """
1467
    return self._config_data.HasAnyDiskOfType(dev_type)
1468

    
1469
  @locking.ssynchronized(_config_lock)
1470
  def Update(self, target, feedback_fn):
1471
    """Notify function to be called after updates.
1472

1473
    This function must be called when an object (as returned by
1474
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1475
    caller wants the modifications saved to the backing store. Note
1476
    that all modified objects will be saved, but the target argument
1477
    is the one the caller wants to ensure that it's saved.
1478

1479
    @param target: an instance of either L{objects.Cluster},
1480
        L{objects.Node} or L{objects.Instance} which is existing in
1481
        the cluster
1482
    @param feedback_fn: Callable feedback function
1483

1484
    """
1485
    if self._config_data is None:
1486
      raise errors.ProgrammerError("Configuration file not read,"
1487
                                   " cannot save.")
1488
    update_serial = False
1489
    if isinstance(target, objects.Cluster):
1490
      test = target == self._config_data.cluster
1491
    elif isinstance(target, objects.Node):
1492
      test = target in self._config_data.nodes.values()
1493
      update_serial = True
1494
    elif isinstance(target, objects.Instance):
1495
      test = target in self._config_data.instances.values()
1496
    else:
1497
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
1498
                                   " ConfigWriter.Update" % type(target))
1499
    if not test:
1500
      raise errors.ConfigurationError("Configuration updated since object"
1501
                                      " has been read or unknown object")
1502
    target.serial_no += 1
1503
    target.mtime = now = time.time()
1504

    
1505
    if update_serial:
1506
      # for node updates, we need to increase the cluster serial too
1507
      self._config_data.cluster.serial_no += 1
1508
      self._config_data.cluster.mtime = now
1509

    
1510
    if isinstance(target, objects.Instance):
1511
      self._UnlockedReleaseDRBDMinors(target.name)
1512

    
1513
    self._WriteConfig(feedback_fn=feedback_fn)
1514

    
1515
  @locking.ssynchronized(_config_lock)
1516
  def DropECReservations(self, ec_id):
1517
    """Drop per-execution-context reservations
1518

1519
    """
1520
    for rm in self._all_rms:
1521
      rm.DropECReservations(ec_id)