Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ 9f3ac970

History | View | Annotate | Download (48.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
import os
35
import random
36
import logging
37
import time
38

    
39
from ganeti import errors
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import rpc
44
from ganeti import objects
45
from ganeti import serializer
46
from ganeti import uidpool
47

    
48

    
49
_config_lock = locking.SharedLock()
50

    
51
# job id used for resource management at config upgrade time
52
_UPGRADE_CONFIG_JID = "jid-cfg-upgrade"
53

    
54

    
55
def _ValidateConfig(data):
56
  """Verifies that a configuration objects looks valid.
57

58
  This only verifies the version of the configuration.
59

60
  @raise errors.ConfigurationError: if the version differs from what
61
      we expect
62

63
  """
64
  if data.version != constants.CONFIG_VERSION:
65
    raise errors.ConfigurationError("Cluster configuration version"
66
                                    " mismatch, got %s instead of %s" %
67
                                    (data.version,
68
                                     constants.CONFIG_VERSION))
69

    
70

    
71
class TemporaryReservationManager:
72
  """A temporary resource reservation manager.
73

74
  This is used to reserve resources in a job, before using them, making sure
75
  other jobs cannot get them in the meantime.
76

77
  """
78
  def __init__(self):
79
    self._ec_reserved = {}
80

    
81
  def Reserved(self, resource):
82
    for holder_reserved in self._ec_reserved.items():
83
      if resource in holder_reserved:
84
        return True
85
    return False
86

    
87
  def Reserve(self, ec_id, resource):
88
    if self.Reserved(resource):
89
      raise errors.ReservationError("Duplicate reservation for resource: %s." %
90
                                    (resource))
91
    if ec_id not in self._ec_reserved:
92
      self._ec_reserved[ec_id] = set([resource])
93
    else:
94
      self._ec_reserved[ec_id].add(resource)
95

    
96
  def DropECReservations(self, ec_id):
97
    if ec_id in self._ec_reserved:
98
      del self._ec_reserved[ec_id]
99

    
100
  def GetReserved(self):
101
    all_reserved = set()
102
    for holder_reserved in self._ec_reserved.values():
103
      all_reserved.update(holder_reserved)
104
    return all_reserved
105

    
106
  def Generate(self, existing, generate_one_fn, ec_id):
107
    """Generate a new resource of this type
108

109
    """
110
    assert callable(generate_one_fn)
111

    
112
    all_elems = self.GetReserved()
113
    all_elems.update(existing)
114
    retries = 64
115
    while retries > 0:
116
      new_resource = generate_one_fn()
117
      if new_resource is not None and new_resource not in all_elems:
118
        break
119
    else:
120
      raise errors.ConfigurationError("Not able generate new resource"
121
                                      " (last tried: %s)" % new_resource)
122
    self.Reserve(ec_id, new_resource)
123
    return new_resource
124

    
125

    
126
class ConfigWriter:
127
  """The interface to the cluster configuration.
128

129
  @ivar _temporary_lvs: reservation manager for temporary LVs
130
  @ivar _all_rms: a list of all temporary reservation managers
131

132
  """
133
  def __init__(self, cfg_file=None, offline=False):
134
    self.write_count = 0
135
    self._lock = _config_lock
136
    self._config_data = None
137
    self._offline = offline
138
    if cfg_file is None:
139
      self._cfg_file = constants.CLUSTER_CONF_FILE
140
    else:
141
      self._cfg_file = cfg_file
142
    self._temporary_ids = TemporaryReservationManager()
143
    self._temporary_drbds = {}
144
    self._temporary_macs = TemporaryReservationManager()
145
    self._temporary_secrets = TemporaryReservationManager()
146
    self._temporary_lvs = TemporaryReservationManager()
147
    self._all_rms = [self._temporary_ids, self._temporary_macs,
148
                     self._temporary_secrets, self._temporary_lvs]
149
    # Note: in order to prevent errors when resolving our name in
150
    # _DistributeConfig, we compute it here once and reuse it; it's
151
    # better to raise an error before starting to modify the config
152
    # file than after it was modified
153
    self._my_hostname = utils.HostInfo().name
154
    self._last_cluster_serial = -1
155
    self._OpenConfig()
156

    
157
  # this method needs to be static, so that we can call it on the class
158
  @staticmethod
159
  def IsCluster():
160
    """Check if the cluster is configured.
161

162
    """
163
    return os.path.exists(constants.CLUSTER_CONF_FILE)
164

    
165
  def _GenerateOneMAC(self):
166
    """Generate one mac address
167

168
    """
169
    prefix = self._config_data.cluster.mac_prefix
170
    byte1 = random.randrange(0, 256)
171
    byte2 = random.randrange(0, 256)
172
    byte3 = random.randrange(0, 256)
173
    mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
174
    return mac
175

    
176
  @locking.ssynchronized(_config_lock, shared=1)
177
  def GenerateMAC(self, ec_id):
178
    """Generate a MAC for an instance.
179

180
    This should check the current instances for duplicates.
181

182
    """
183
    existing = self._AllMACs()
184
    return self._temporary_ids.Generate(existing, self._GenerateOneMAC, ec_id)
185

    
186
  @locking.ssynchronized(_config_lock, shared=1)
187
  def ReserveMAC(self, mac, ec_id):
188
    """Reserve a MAC for an instance.
189

190
    This only checks instances managed by this cluster, it does not
191
    check for potential collisions elsewhere.
192

193
    """
194
    all_macs = self._AllMACs()
195
    if mac in all_macs:
196
      raise errors.ReservationError("mac already in use")
197
    else:
198
      self._temporary_macs.Reserve(mac, ec_id)
199

    
200
  @locking.ssynchronized(_config_lock, shared=1)
201
  def ReserveLV(self, lv_name, ec_id):
202
    """Reserve an VG/LV pair for an instance.
203

204
    @type lv_name: string
205
    @param lv_name: the logical volume name to reserve
206

207
    """
208
    all_lvs = self._AllLVs()
209
    if lv_name in all_lvs:
210
      raise errors.ReservationError("LV already in use")
211
    else:
212
      self._temporary_lvs.Reserve(lv_name, ec_id)
213

    
214
  @locking.ssynchronized(_config_lock, shared=1)
215
  def GenerateDRBDSecret(self, ec_id):
216
    """Generate a DRBD secret.
217

218
    This checks the current disks for duplicates.
219

220
    """
221
    return self._temporary_secrets.Generate(self._AllDRBDSecrets(),
222
                                            utils.GenerateSecret,
223
                                            ec_id)
224

    
225
  def _AllLVs(self):
226
    """Compute the list of all LVs.
227

228
    """
229
    lvnames = set()
230
    for instance in self._config_data.instances.values():
231
      node_data = instance.MapLVsByNode()
232
      for lv_list in node_data.values():
233
        lvnames.update(lv_list)
234
    return lvnames
235

    
236
  def _AllIDs(self, include_temporary):
237
    """Compute the list of all UUIDs and names we have.
238

239
    @type include_temporary: boolean
240
    @param include_temporary: whether to include the _temporary_ids set
241
    @rtype: set
242
    @return: a set of IDs
243

244
    """
245
    existing = set()
246
    if include_temporary:
247
      existing.update(self._temporary_ids.GetReserved())
248
    existing.update(self._AllLVs())
249
    existing.update(self._config_data.instances.keys())
250
    existing.update(self._config_data.nodes.keys())
251
    existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
252
    return existing
253

    
254
  def _GenerateUniqueID(self, ec_id):
255
    """Generate an unique UUID.
256

257
    This checks the current node, instances and disk names for
258
    duplicates.
259

260
    @rtype: string
261
    @return: the unique id
262

263
    """
264
    existing = self._AllIDs(include_temporary=False)
265
    return self._temporary_ids.Generate(existing, utils.NewUUID, ec_id)
266

    
267
  @locking.ssynchronized(_config_lock, shared=1)
268
  def GenerateUniqueID(self, ec_id):
269
    """Generate an unique ID.
270

271
    This is just a wrapper over the unlocked version.
272

273
    @type ec_id: string
274
    @param ec_id: unique id for the job to reserve the id to
275

276
    """
277
    return self._GenerateUniqueID(ec_id)
278

    
279
  def _AllMACs(self):
280
    """Return all MACs present in the config.
281

282
    @rtype: list
283
    @return: the list of all MACs
284

285
    """
286
    result = []
287
    for instance in self._config_data.instances.values():
288
      for nic in instance.nics:
289
        result.append(nic.mac)
290

    
291
    return result
292

    
293
  def _AllDRBDSecrets(self):
294
    """Return all DRBD secrets present in the config.
295

296
    @rtype: list
297
    @return: the list of all DRBD secrets
298

299
    """
300
    def helper(disk, result):
301
      """Recursively gather secrets from this disk."""
302
      if disk.dev_type == constants.DT_DRBD8:
303
        result.append(disk.logical_id[5])
304
      if disk.children:
305
        for child in disk.children:
306
          helper(child, result)
307

    
308
    result = []
309
    for instance in self._config_data.instances.values():
310
      for disk in instance.disks:
311
        helper(disk, result)
312

    
313
    return result
314

    
315
  def _CheckDiskIDs(self, disk, l_ids, p_ids):
316
    """Compute duplicate disk IDs
317

318
    @type disk: L{objects.Disk}
319
    @param disk: the disk at which to start searching
320
    @type l_ids: list
321
    @param l_ids: list of current logical ids
322
    @type p_ids: list
323
    @param p_ids: list of current physical ids
324
    @rtype: list
325
    @return: a list of error messages
326

327
    """
328
    result = []
329
    if disk.logical_id is not None:
330
      if disk.logical_id in l_ids:
331
        result.append("duplicate logical id %s" % str(disk.logical_id))
332
      else:
333
        l_ids.append(disk.logical_id)
334
    if disk.physical_id is not None:
335
      if disk.physical_id in p_ids:
336
        result.append("duplicate physical id %s" % str(disk.physical_id))
337
      else:
338
        p_ids.append(disk.physical_id)
339

    
340
    if disk.children:
341
      for child in disk.children:
342
        result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
343
    return result
344

    
345
  def _UnlockedVerifyConfig(self):
346
    """Verify function.
347

348
    @rtype: list
349
    @return: a list of error messages; a non-empty list signifies
350
        configuration errors
351

352
    """
353
    result = []
354
    seen_macs = []
355
    ports = {}
356
    data = self._config_data
357
    seen_lids = []
358
    seen_pids = []
359

    
360
    # global cluster checks
361
    if not data.cluster.enabled_hypervisors:
362
      result.append("enabled hypervisors list doesn't have any entries")
363
    invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
364
    if invalid_hvs:
365
      result.append("enabled hypervisors contains invalid entries: %s" %
366
                    invalid_hvs)
367
    missing_hvp = (set(data.cluster.enabled_hypervisors) -
368
                   set(data.cluster.hvparams.keys()))
369
    if missing_hvp:
370
      result.append("hypervisor parameters missing for the enabled"
371
                    " hypervisor(s) %s" % utils.CommaJoin(missing_hvp))
372

    
373
    if data.cluster.master_node not in data.nodes:
374
      result.append("cluster has invalid primary node '%s'" %
375
                    data.cluster.master_node)
376

    
377
    # per-instance checks
378
    for instance_name in data.instances:
379
      instance = data.instances[instance_name]
380
      if instance.name != instance_name:
381
        result.append("instance '%s' is indexed by wrong name '%s'" %
382
                      (instance.name, instance_name))
383
      if instance.primary_node not in data.nodes:
384
        result.append("instance '%s' has invalid primary node '%s'" %
385
                      (instance_name, instance.primary_node))
386
      for snode in instance.secondary_nodes:
387
        if snode not in data.nodes:
388
          result.append("instance '%s' has invalid secondary node '%s'" %
389
                        (instance_name, snode))
390
      for idx, nic in enumerate(instance.nics):
391
        if nic.mac in seen_macs:
392
          result.append("instance '%s' has NIC %d mac %s duplicate" %
393
                        (instance_name, idx, nic.mac))
394
        else:
395
          seen_macs.append(nic.mac)
396

    
397
      # gather the drbd ports for duplicate checks
398
      for dsk in instance.disks:
399
        if dsk.dev_type in constants.LDS_DRBD:
400
          tcp_port = dsk.logical_id[2]
401
          if tcp_port not in ports:
402
            ports[tcp_port] = []
403
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
404
      # gather network port reservation
405
      net_port = getattr(instance, "network_port", None)
406
      if net_port is not None:
407
        if net_port not in ports:
408
          ports[net_port] = []
409
        ports[net_port].append((instance.name, "network port"))
410

    
411
      # instance disk verify
412
      for idx, disk in enumerate(instance.disks):
413
        result.extend(["instance '%s' disk %d error: %s" %
414
                       (instance.name, idx, msg) for msg in disk.Verify()])
415
        result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
416

    
417
    # cluster-wide pool of free ports
418
    for free_port in data.cluster.tcpudp_port_pool:
419
      if free_port not in ports:
420
        ports[free_port] = []
421
      ports[free_port].append(("cluster", "port marked as free"))
422

    
423
    # compute tcp/udp duplicate ports
424
    keys = ports.keys()
425
    keys.sort()
426
    for pnum in keys:
427
      pdata = ports[pnum]
428
      if len(pdata) > 1:
429
        txt = utils.CommaJoin(["%s/%s" % val for val in pdata])
430
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
431

    
432
    # highest used tcp port check
433
    if keys:
434
      if keys[-1] > data.cluster.highest_used_port:
435
        result.append("Highest used port mismatch, saved %s, computed %s" %
436
                      (data.cluster.highest_used_port, keys[-1]))
437

    
438
    if not data.nodes[data.cluster.master_node].master_candidate:
439
      result.append("Master node is not a master candidate")
440

    
441
    # master candidate checks
442
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats()
443
    if mc_now < mc_max:
444
      result.append("Not enough master candidates: actual %d, target %d" %
445
                    (mc_now, mc_max))
446

    
447
    # node checks
448
    for node_name, node in data.nodes.items():
449
      if node.name != node_name:
450
        result.append("Node '%s' is indexed by wrong name '%s'" %
451
                      (node.name, node_name))
452
      if [node.master_candidate, node.drained, node.offline].count(True) > 1:
453
        result.append("Node %s state is invalid: master_candidate=%s,"
454
                      " drain=%s, offline=%s" %
455
                      (node.name, node.master_candidate, node.drain,
456
                       node.offline))
457

    
458
    # drbd minors check
459
    _, duplicates = self._UnlockedComputeDRBDMap()
460
    for node, minor, instance_a, instance_b in duplicates:
461
      result.append("DRBD minor %d on node %s is assigned twice to instances"
462
                    " %s and %s" % (minor, node, instance_a, instance_b))
463

    
464
    # IP checks
465
    default_nicparams = data.cluster.nicparams[constants.PP_DEFAULT]
466
    ips = {}
467

    
468
    def _AddIpAddress(ip, name):
469
      ips.setdefault(ip, []).append(name)
470

    
471
    _AddIpAddress(data.cluster.master_ip, "cluster_ip")
472

    
473
    for node in data.nodes.values():
474
      _AddIpAddress(node.primary_ip, "node:%s/primary" % node.name)
475
      if node.secondary_ip != node.primary_ip:
476
        _AddIpAddress(node.secondary_ip, "node:%s/secondary" % node.name)
477

    
478
    for instance in data.instances.values():
479
      for idx, nic in enumerate(instance.nics):
480
        if nic.ip is None:
481
          continue
482

    
483
        nicparams = objects.FillDict(default_nicparams, nic.nicparams)
484
        nic_mode = nicparams[constants.NIC_MODE]
485
        nic_link = nicparams[constants.NIC_LINK]
486

    
487
        if nic_mode == constants.NIC_MODE_BRIDGED:
488
          link = "bridge:%s" % nic_link
489
        elif nic_mode == constants.NIC_MODE_ROUTED:
490
          link = "route:%s" % nic_link
491
        else:
492
          raise errors.ProgrammerError("NIC mode '%s' not handled" % nic_mode)
493

    
494
        _AddIpAddress("%s/%s" % (link, nic.ip),
495
                      "instance:%s/nic:%d" % (instance.name, idx))
496

    
497
    for ip, owners in ips.items():
498
      if len(owners) > 1:
499
        result.append("IP address %s is used by multiple owners: %s" %
500
                      (ip, utils.CommaJoin(owners)))
501

    
502
    return result
503

    
504
  @locking.ssynchronized(_config_lock, shared=1)
505
  def VerifyConfig(self):
506
    """Verify function.
507

508
    This is just a wrapper over L{_UnlockedVerifyConfig}.
509

510
    @rtype: list
511
    @return: a list of error messages; a non-empty list signifies
512
        configuration errors
513

514
    """
515
    return self._UnlockedVerifyConfig()
516

    
517
  def _UnlockedSetDiskID(self, disk, node_name):
518
    """Convert the unique ID to the ID needed on the target nodes.
519

520
    This is used only for drbd, which needs ip/port configuration.
521

522
    The routine descends down and updates its children also, because
523
    this helps when the only the top device is passed to the remote
524
    node.
525

526
    This function is for internal use, when the config lock is already held.
527

528
    """
529
    if disk.children:
530
      for child in disk.children:
531
        self._UnlockedSetDiskID(child, node_name)
532

    
533
    if disk.logical_id is None and disk.physical_id is not None:
534
      return
535
    if disk.dev_type == constants.LD_DRBD8:
536
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
537
      if node_name not in (pnode, snode):
538
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
539
                                        node_name)
540
      pnode_info = self._UnlockedGetNodeInfo(pnode)
541
      snode_info = self._UnlockedGetNodeInfo(snode)
542
      if pnode_info is None or snode_info is None:
543
        raise errors.ConfigurationError("Can't find primary or secondary node"
544
                                        " for %s" % str(disk))
545
      p_data = (pnode_info.secondary_ip, port)
546
      s_data = (snode_info.secondary_ip, port)
547
      if pnode == node_name:
548
        disk.physical_id = p_data + s_data + (pminor, secret)
549
      else: # it must be secondary, we tested above
550
        disk.physical_id = s_data + p_data + (sminor, secret)
551
    else:
552
      disk.physical_id = disk.logical_id
553
    return
554

    
555
  @locking.ssynchronized(_config_lock)
556
  def SetDiskID(self, disk, node_name):
557
    """Convert the unique ID to the ID needed on the target nodes.
558

559
    This is used only for drbd, which needs ip/port configuration.
560

561
    The routine descends down and updates its children also, because
562
    this helps when the only the top device is passed to the remote
563
    node.
564

565
    """
566
    return self._UnlockedSetDiskID(disk, node_name)
567

    
568
  @locking.ssynchronized(_config_lock)
569
  def AddTcpUdpPort(self, port):
570
    """Adds a new port to the available port pool.
571

572
    """
573
    if not isinstance(port, int):
574
      raise errors.ProgrammerError("Invalid type passed for port")
575

    
576
    self._config_data.cluster.tcpudp_port_pool.add(port)
577
    self._WriteConfig()
578

    
579
  @locking.ssynchronized(_config_lock, shared=1)
580
  def GetPortList(self):
581
    """Returns a copy of the current port list.
582

583
    """
584
    return self._config_data.cluster.tcpudp_port_pool.copy()
585

    
586
  @locking.ssynchronized(_config_lock)
587
  def AllocatePort(self):
588
    """Allocate a port.
589

590
    The port will be taken from the available port pool or from the
591
    default port range (and in this case we increase
592
    highest_used_port).
593

594
    """
595
    # If there are TCP/IP ports configured, we use them first.
596
    if self._config_data.cluster.tcpudp_port_pool:
597
      port = self._config_data.cluster.tcpudp_port_pool.pop()
598
    else:
599
      port = self._config_data.cluster.highest_used_port + 1
600
      if port >= constants.LAST_DRBD_PORT:
601
        raise errors.ConfigurationError("The highest used port is greater"
602
                                        " than %s. Aborting." %
603
                                        constants.LAST_DRBD_PORT)
604
      self._config_data.cluster.highest_used_port = port
605

    
606
    self._WriteConfig()
607
    return port
608

    
609
  def _UnlockedComputeDRBDMap(self):
610
    """Compute the used DRBD minor/nodes.
611

612
    @rtype: (dict, list)
613
    @return: dictionary of node_name: dict of minor: instance_name;
614
        the returned dict will have all the nodes in it (even if with
615
        an empty list), and a list of duplicates; if the duplicates
616
        list is not empty, the configuration is corrupted and its caller
617
        should raise an exception
618

619
    """
620
    def _AppendUsedPorts(instance_name, disk, used):
621
      duplicates = []
622
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
623
        node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
624
        for node, port in ((node_a, minor_a), (node_b, minor_b)):
625
          assert node in used, ("Node '%s' of instance '%s' not found"
626
                                " in node list" % (node, instance_name))
627
          if port in used[node]:
628
            duplicates.append((node, port, instance_name, used[node][port]))
629
          else:
630
            used[node][port] = instance_name
631
      if disk.children:
632
        for child in disk.children:
633
          duplicates.extend(_AppendUsedPorts(instance_name, child, used))
634
      return duplicates
635

    
636
    duplicates = []
637
    my_dict = dict((node, {}) for node in self._config_data.nodes)
638
    for instance in self._config_data.instances.itervalues():
639
      for disk in instance.disks:
640
        duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
641
    for (node, minor), instance in self._temporary_drbds.iteritems():
642
      if minor in my_dict[node] and my_dict[node][minor] != instance:
643
        duplicates.append((node, minor, instance, my_dict[node][minor]))
644
      else:
645
        my_dict[node][minor] = instance
646
    return my_dict, duplicates
647

    
648
  @locking.ssynchronized(_config_lock)
649
  def ComputeDRBDMap(self):
650
    """Compute the used DRBD minor/nodes.
651

652
    This is just a wrapper over L{_UnlockedComputeDRBDMap}.
653

654
    @return: dictionary of node_name: dict of minor: instance_name;
655
        the returned dict will have all the nodes in it (even if with
656
        an empty list).
657

658
    """
659
    d_map, duplicates = self._UnlockedComputeDRBDMap()
660
    if duplicates:
661
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
662
                                      str(duplicates))
663
    return d_map
664

    
665
  @locking.ssynchronized(_config_lock)
666
  def AllocateDRBDMinor(self, nodes, instance):
667
    """Allocate a drbd minor.
668

669
    The free minor will be automatically computed from the existing
670
    devices. A node can be given multiple times in order to allocate
671
    multiple minors. The result is the list of minors, in the same
672
    order as the passed nodes.
673

674
    @type instance: string
675
    @param instance: the instance for which we allocate minors
676

677
    """
678
    assert isinstance(instance, basestring), \
679
           "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
680

    
681
    d_map, duplicates = self._UnlockedComputeDRBDMap()
682
    if duplicates:
683
      raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
684
                                      str(duplicates))
685
    result = []
686
    for nname in nodes:
687
      ndata = d_map[nname]
688
      if not ndata:
689
        # no minors used, we can start at 0
690
        result.append(0)
691
        ndata[0] = instance
692
        self._temporary_drbds[(nname, 0)] = instance
693
        continue
694
      keys = ndata.keys()
695
      keys.sort()
696
      ffree = utils.FirstFree(keys)
697
      if ffree is None:
698
        # return the next minor
699
        # TODO: implement high-limit check
700
        minor = keys[-1] + 1
701
      else:
702
        minor = ffree
703
      # double-check minor against current instances
704
      assert minor not in d_map[nname], \
705
             ("Attempt to reuse allocated DRBD minor %d on node %s,"
706
              " already allocated to instance %s" %
707
              (minor, nname, d_map[nname][minor]))
708
      ndata[minor] = instance
709
      # double-check minor against reservation
710
      r_key = (nname, minor)
711
      assert r_key not in self._temporary_drbds, \
712
             ("Attempt to reuse reserved DRBD minor %d on node %s,"
713
              " reserved for instance %s" %
714
              (minor, nname, self._temporary_drbds[r_key]))
715
      self._temporary_drbds[r_key] = instance
716
      result.append(minor)
717
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
718
                  nodes, result)
719
    return result
720

    
721
  def _UnlockedReleaseDRBDMinors(self, instance):
722
    """Release temporary drbd minors allocated for a given instance.
723

724
    @type instance: string
725
    @param instance: the instance for which temporary minors should be
726
                     released
727

728
    """
729
    assert isinstance(instance, basestring), \
730
           "Invalid argument passed to ReleaseDRBDMinors"
731
    for key, name in self._temporary_drbds.items():
732
      if name == instance:
733
        del self._temporary_drbds[key]
734

    
735
  @locking.ssynchronized(_config_lock)
736
  def ReleaseDRBDMinors(self, instance):
737
    """Release temporary drbd minors allocated for a given instance.
738

739
    This should be called on the error paths, on the success paths
740
    it's automatically called by the ConfigWriter add and update
741
    functions.
742

743
    This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
744

745
    @type instance: string
746
    @param instance: the instance for which temporary minors should be
747
                     released
748

749
    """
750
    self._UnlockedReleaseDRBDMinors(instance)
751

    
752
  @locking.ssynchronized(_config_lock, shared=1)
753
  def GetConfigVersion(self):
754
    """Get the configuration version.
755

756
    @return: Config version
757

758
    """
759
    return self._config_data.version
760

    
761
  @locking.ssynchronized(_config_lock, shared=1)
762
  def GetClusterName(self):
763
    """Get cluster name.
764

765
    @return: Cluster name
766

767
    """
768
    return self._config_data.cluster.cluster_name
769

    
770
  @locking.ssynchronized(_config_lock, shared=1)
771
  def GetMasterNode(self):
772
    """Get the hostname of the master node for this cluster.
773

774
    @return: Master hostname
775

776
    """
777
    return self._config_data.cluster.master_node
778

    
779
  @locking.ssynchronized(_config_lock, shared=1)
780
  def GetMasterIP(self):
781
    """Get the IP of the master node for this cluster.
782

783
    @return: Master IP
784

785
    """
786
    return self._config_data.cluster.master_ip
787

    
788
  @locking.ssynchronized(_config_lock, shared=1)
789
  def GetMasterNetdev(self):
790
    """Get the master network device for this cluster.
791

792
    """
793
    return self._config_data.cluster.master_netdev
794

    
795
  @locking.ssynchronized(_config_lock, shared=1)
796
  def GetFileStorageDir(self):
797
    """Get the file storage dir for this cluster.
798

799
    """
800
    return self._config_data.cluster.file_storage_dir
801

    
802
  @locking.ssynchronized(_config_lock, shared=1)
803
  def GetHypervisorType(self):
804
    """Get the hypervisor type for this cluster.
805

806
    """
807
    return self._config_data.cluster.enabled_hypervisors[0]
808

    
809
  @locking.ssynchronized(_config_lock, shared=1)
810
  def GetHostKey(self):
811
    """Return the rsa hostkey from the config.
812

813
    @rtype: string
814
    @return: the rsa hostkey
815

816
    """
817
    return self._config_data.cluster.rsahostkeypub
818

    
819
  @locking.ssynchronized(_config_lock)
820
  def AddInstance(self, instance, ec_id):
821
    """Add an instance to the config.
822

823
    This should be used after creating a new instance.
824

825
    @type instance: L{objects.Instance}
826
    @param instance: the instance object
827

828
    """
829
    if not isinstance(instance, objects.Instance):
830
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
831

    
832
    if instance.disk_template != constants.DT_DISKLESS:
833
      all_lvs = instance.MapLVsByNode()
834
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
835

    
836
    all_macs = self._AllMACs()
837
    for nic in instance.nics:
838
      if nic.mac in all_macs:
839
        raise errors.ConfigurationError("Cannot add instance %s:"
840
                                        " MAC address '%s' already in use." %
841
                                        (instance.name, nic.mac))
842

    
843
    self._EnsureUUID(instance, ec_id)
844

    
845
    instance.serial_no = 1
846
    instance.ctime = instance.mtime = time.time()
847
    self._config_data.instances[instance.name] = instance
848
    self._config_data.cluster.serial_no += 1
849
    self._UnlockedReleaseDRBDMinors(instance.name)
850
    self._WriteConfig()
851

    
852
  def _EnsureUUID(self, item, ec_id):
853
    """Ensures a given object has a valid UUID.
854

855
    @param item: the instance or node to be checked
856
    @param ec_id: the execution context id for the uuid reservation
857

858
    """
859
    if not item.uuid:
860
      item.uuid = self._GenerateUniqueID(ec_id)
861
    elif item.uuid in self._AllIDs(include_temporary=True):
862
      raise errors.ConfigurationError("Cannot add '%s': UUID %s already"
863
                                      " in use" % (item.name, item.uuid))
864

    
865
  def _SetInstanceStatus(self, instance_name, status):
866
    """Set the instance's status to a given value.
867

868
    """
869
    assert isinstance(status, bool), \
870
           "Invalid status '%s' passed to SetInstanceStatus" % (status,)
871

    
872
    if instance_name not in self._config_data.instances:
873
      raise errors.ConfigurationError("Unknown instance '%s'" %
874
                                      instance_name)
875
    instance = self._config_data.instances[instance_name]
876
    if instance.admin_up != status:
877
      instance.admin_up = status
878
      instance.serial_no += 1
879
      instance.mtime = time.time()
880
      self._WriteConfig()
881

    
882
  @locking.ssynchronized(_config_lock)
883
  def MarkInstanceUp(self, instance_name):
884
    """Mark the instance status to up in the config.
885

886
    """
887
    self._SetInstanceStatus(instance_name, True)
888

    
889
  @locking.ssynchronized(_config_lock)
890
  def RemoveInstance(self, instance_name):
891
    """Remove the instance from the configuration.
892

893
    """
894
    if instance_name not in self._config_data.instances:
895
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
896
    del self._config_data.instances[instance_name]
897
    self._config_data.cluster.serial_no += 1
898
    self._WriteConfig()
899

    
900
  @locking.ssynchronized(_config_lock)
901
  def RenameInstance(self, old_name, new_name):
902
    """Rename an instance.
903

904
    This needs to be done in ConfigWriter and not by RemoveInstance
905
    combined with AddInstance as only we can guarantee an atomic
906
    rename.
907

908
    """
909
    if old_name not in self._config_data.instances:
910
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
911
    inst = self._config_data.instances[old_name]
912
    del self._config_data.instances[old_name]
913
    inst.name = new_name
914

    
915
    for disk in inst.disks:
916
      if disk.dev_type == constants.LD_FILE:
917
        # rename the file paths in logical and physical id
918
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
919
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
920
                                              utils.PathJoin(file_storage_dir,
921
                                                             inst.name,
922
                                                             disk.iv_name))
923

    
924
    self._config_data.instances[inst.name] = inst
925
    self._WriteConfig()
926

    
927
  @locking.ssynchronized(_config_lock)
928
  def MarkInstanceDown(self, instance_name):
929
    """Mark the status of an instance to down in the configuration.
930

931
    """
932
    self._SetInstanceStatus(instance_name, False)
933

    
934
  def _UnlockedGetInstanceList(self):
935
    """Get the list of instances.
936

937
    This function is for internal use, when the config lock is already held.
938

939
    """
940
    return self._config_data.instances.keys()
941

    
942
  @locking.ssynchronized(_config_lock, shared=1)
943
  def GetInstanceList(self):
944
    """Get the list of instances.
945

946
    @return: array of instances, ex. ['instance2.example.com',
947
        'instance1.example.com']
948

949
    """
950
    return self._UnlockedGetInstanceList()
951

    
952
  @locking.ssynchronized(_config_lock, shared=1)
953
  def ExpandInstanceName(self, short_name):
954
    """Attempt to expand an incomplete instance name.
955

956
    """
957
    return utils.MatchNameComponent(short_name,
958
                                    self._config_data.instances.keys(),
959
                                    case_sensitive=False)
960

    
961
  def _UnlockedGetInstanceInfo(self, instance_name):
962
    """Returns information about an instance.
963

964
    This function is for internal use, when the config lock is already held.
965

966
    """
967
    if instance_name not in self._config_data.instances:
968
      return None
969

    
970
    return self._config_data.instances[instance_name]
971

    
972
  @locking.ssynchronized(_config_lock, shared=1)
973
  def GetInstanceInfo(self, instance_name):
974
    """Returns information about an instance.
975

976
    It takes the information from the configuration file. Other information of
977
    an instance are taken from the live systems.
978

979
    @param instance_name: name of the instance, e.g.
980
        I{instance1.example.com}
981

982
    @rtype: L{objects.Instance}
983
    @return: the instance object
984

985
    """
986
    return self._UnlockedGetInstanceInfo(instance_name)
987

    
988
  @locking.ssynchronized(_config_lock, shared=1)
989
  def GetAllInstancesInfo(self):
990
    """Get the configuration of all instances.
991

992
    @rtype: dict
993
    @return: dict of (instance, instance_info), where instance_info is what
994
              would GetInstanceInfo return for the node
995

996
    """
997
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
998
                    for instance in self._UnlockedGetInstanceList()])
999
    return my_dict
1000

    
1001
  @locking.ssynchronized(_config_lock)
1002
  def AddNode(self, node, ec_id):
1003
    """Add a node to the configuration.
1004

1005
    @type node: L{objects.Node}
1006
    @param node: a Node instance
1007

1008
    """
1009
    logging.info("Adding node %s to configuration", node.name)
1010

    
1011
    self._EnsureUUID(node, ec_id)
1012

    
1013
    node.serial_no = 1
1014
    node.ctime = node.mtime = time.time()
1015
    self._config_data.nodes[node.name] = node
1016
    self._config_data.cluster.serial_no += 1
1017
    self._WriteConfig()
1018

    
1019
  @locking.ssynchronized(_config_lock)
1020
  def RemoveNode(self, node_name):
1021
    """Remove a node from the configuration.
1022

1023
    """
1024
    logging.info("Removing node %s from configuration", node_name)
1025

    
1026
    if node_name not in self._config_data.nodes:
1027
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
1028

    
1029
    del self._config_data.nodes[node_name]
1030
    self._config_data.cluster.serial_no += 1
1031
    self._WriteConfig()
1032

    
1033
  @locking.ssynchronized(_config_lock, shared=1)
1034
  def ExpandNodeName(self, short_name):
1035
    """Attempt to expand an incomplete instance name.
1036

1037
    """
1038
    return utils.MatchNameComponent(short_name,
1039
                                    self._config_data.nodes.keys(),
1040
                                    case_sensitive=False)
1041

    
1042
  def _UnlockedGetNodeInfo(self, node_name):
1043
    """Get the configuration of a node, as stored in the config.
1044

1045
    This function is for internal use, when the config lock is already
1046
    held.
1047

1048
    @param node_name: the node name, e.g. I{node1.example.com}
1049

1050
    @rtype: L{objects.Node}
1051
    @return: the node object
1052

1053
    """
1054
    if node_name not in self._config_data.nodes:
1055
      return None
1056

    
1057
    return self._config_data.nodes[node_name]
1058

    
1059
  @locking.ssynchronized(_config_lock, shared=1)
1060
  def GetNodeInfo(self, node_name):
1061
    """Get the configuration of a node, as stored in the config.
1062

1063
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
1064

1065
    @param node_name: the node name, e.g. I{node1.example.com}
1066

1067
    @rtype: L{objects.Node}
1068
    @return: the node object
1069

1070
    """
1071
    return self._UnlockedGetNodeInfo(node_name)
1072

    
1073
  def _UnlockedGetNodeList(self):
1074
    """Return the list of nodes which are in the configuration.
1075

1076
    This function is for internal use, when the config lock is already
1077
    held.
1078

1079
    @rtype: list
1080

1081
    """
1082
    return self._config_data.nodes.keys()
1083

    
1084
  @locking.ssynchronized(_config_lock, shared=1)
1085
  def GetNodeList(self):
1086
    """Return the list of nodes which are in the configuration.
1087

1088
    """
1089
    return self._UnlockedGetNodeList()
1090

    
1091
  def _UnlockedGetOnlineNodeList(self):
1092
    """Return the list of nodes which are online.
1093

1094
    """
1095
    all_nodes = [self._UnlockedGetNodeInfo(node)
1096
                 for node in self._UnlockedGetNodeList()]
1097
    return [node.name for node in all_nodes if not node.offline]
1098

    
1099
  @locking.ssynchronized(_config_lock, shared=1)
1100
  def GetOnlineNodeList(self):
1101
    """Return the list of nodes which are online.
1102

1103
    """
1104
    return self._UnlockedGetOnlineNodeList()
1105

    
1106
  @locking.ssynchronized(_config_lock, shared=1)
1107
  def GetAllNodesInfo(self):
1108
    """Get the configuration of all nodes.
1109

1110
    @rtype: dict
1111
    @return: dict of (node, node_info), where node_info is what
1112
              would GetNodeInfo return for the node
1113

1114
    """
1115
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
1116
                    for node in self._UnlockedGetNodeList()])
1117
    return my_dict
1118

    
1119
  def _UnlockedGetMasterCandidateStats(self, exceptions=None):
1120
    """Get the number of current and maximum desired and possible candidates.
1121

1122
    @type exceptions: list
1123
    @param exceptions: if passed, list of nodes that should be ignored
1124
    @rtype: tuple
1125
    @return: tuple of (current, desired and possible, possible)
1126

1127
    """
1128
    mc_now = mc_should = mc_max = 0
1129
    for node in self._config_data.nodes.values():
1130
      if exceptions and node.name in exceptions:
1131
        continue
1132
      if not (node.offline or node.drained):
1133
        mc_max += 1
1134
      if node.master_candidate:
1135
        mc_now += 1
1136
    mc_should = min(mc_max, self._config_data.cluster.candidate_pool_size)
1137
    return (mc_now, mc_should, mc_max)
1138

    
1139
  @locking.ssynchronized(_config_lock, shared=1)
1140
  def GetMasterCandidateStats(self, exceptions=None):
1141
    """Get the number of current and maximum possible candidates.
1142

1143
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1144

1145
    @type exceptions: list
1146
    @param exceptions: if passed, list of nodes that should be ignored
1147
    @rtype: tuple
1148
    @return: tuple of (current, max)
1149

1150
    """
1151
    return self._UnlockedGetMasterCandidateStats(exceptions)
1152

    
1153
  @locking.ssynchronized(_config_lock)
1154
  def MaintainCandidatePool(self, exceptions):
1155
    """Try to grow the candidate pool to the desired size.
1156

1157
    @type exceptions: list
1158
    @param exceptions: if passed, list of nodes that should be ignored
1159
    @rtype: list
1160
    @return: list with the adjusted nodes (L{objects.Node} instances)
1161

1162
    """
1163
    mc_now, mc_max, _ = self._UnlockedGetMasterCandidateStats(exceptions)
1164
    mod_list = []
1165
    if mc_now < mc_max:
1166
      node_list = self._config_data.nodes.keys()
1167
      random.shuffle(node_list)
1168
      for name in node_list:
1169
        if mc_now >= mc_max:
1170
          break
1171
        node = self._config_data.nodes[name]
1172
        if (node.master_candidate or node.offline or node.drained or
1173
            node.name in exceptions):
1174
          continue
1175
        mod_list.append(node)
1176
        node.master_candidate = True
1177
        node.serial_no += 1
1178
        mc_now += 1
1179
      if mc_now != mc_max:
1180
        # this should not happen
1181
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
1182
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
1183
      if mod_list:
1184
        self._config_data.cluster.serial_no += 1
1185
        self._WriteConfig()
1186

    
1187
    return mod_list
1188

    
1189
  def _BumpSerialNo(self):
1190
    """Bump up the serial number of the config.
1191

1192
    """
1193
    self._config_data.serial_no += 1
1194
    self._config_data.mtime = time.time()
1195

    
1196
  def _AllUUIDObjects(self):
1197
    """Returns all objects with uuid attributes.
1198

1199
    """
1200
    return (self._config_data.instances.values() +
1201
            self._config_data.nodes.values() +
1202
            [self._config_data.cluster])
1203

    
1204
  def _OpenConfig(self):
1205
    """Read the config data from disk.
1206

1207
    """
1208
    raw_data = utils.ReadFile(self._cfg_file)
1209

    
1210
    try:
1211
      data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1212
    except Exception, err:
1213
      raise errors.ConfigurationError(err)
1214

    
1215
    # Make sure the configuration has the right version
1216
    _ValidateConfig(data)
1217

    
1218
    if (not hasattr(data, 'cluster') or
1219
        not hasattr(data.cluster, 'rsahostkeypub')):
1220
      raise errors.ConfigurationError("Incomplete configuration"
1221
                                      " (missing cluster.rsahostkeypub)")
1222

    
1223
    # Upgrade configuration if needed
1224
    data.UpgradeConfig()
1225

    
1226
    self._config_data = data
1227
    # reset the last serial as -1 so that the next write will cause
1228
    # ssconf update
1229
    self._last_cluster_serial = -1
1230

    
1231
    # And finally run our (custom) config upgrade sequence
1232
    self._UpgradeConfig()
1233

    
1234
  def _UpgradeConfig(self):
1235
    """Run upgrade steps that cannot be done purely in the objects.
1236

1237
    This is because some data elements need uniqueness across the
1238
    whole configuration, etc.
1239

1240
    @warning: this function will call L{_WriteConfig()}, so it needs
1241
        to either be called with the lock held or from a safe place
1242
        (the constructor)
1243

1244
    """
1245
    modified = False
1246
    for item in self._AllUUIDObjects():
1247
      if item.uuid is None:
1248
        item.uuid = self._GenerateUniqueID(_UPGRADE_CONFIG_JID)
1249
        modified = True
1250
    if modified:
1251
      self._WriteConfig()
1252
      # This is ok even if it acquires the internal lock, as _UpgradeConfig is
1253
      # only called at config init time, without the lock held
1254
      self.DropECReservations(_UPGRADE_CONFIG_JID)
1255

    
1256
  def _DistributeConfig(self, feedback_fn):
1257
    """Distribute the configuration to the other nodes.
1258

1259
    Currently, this only copies the configuration file. In the future,
1260
    it could be used to encapsulate the 2/3-phase update mechanism.
1261

1262
    """
1263
    if self._offline:
1264
      return True
1265

    
1266
    bad = False
1267

    
1268
    node_list = []
1269
    addr_list = []
1270
    myhostname = self._my_hostname
1271
    # we can skip checking whether _UnlockedGetNodeInfo returns None
1272
    # since the node list comes from _UnlocketGetNodeList, and we are
1273
    # called with the lock held, so no modifications should take place
1274
    # in between
1275
    for node_name in self._UnlockedGetNodeList():
1276
      if node_name == myhostname:
1277
        continue
1278
      node_info = self._UnlockedGetNodeInfo(node_name)
1279
      if not node_info.master_candidate:
1280
        continue
1281
      node_list.append(node_info.name)
1282
      addr_list.append(node_info.primary_ip)
1283

    
1284
    result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1285
                                            address_list=addr_list)
1286
    for to_node, to_result in result.items():
1287
      msg = to_result.fail_msg
1288
      if msg:
1289
        msg = ("Copy of file %s to node %s failed: %s" %
1290
               (self._cfg_file, to_node, msg))
1291
        logging.error(msg)
1292

    
1293
        if feedback_fn:
1294
          feedback_fn(msg)
1295

    
1296
        bad = True
1297

    
1298
    return not bad
1299

    
1300
  def _WriteConfig(self, destination=None, feedback_fn=None):
1301
    """Write the configuration data to persistent storage.
1302

1303
    """
1304
    assert feedback_fn is None or callable(feedback_fn)
1305

    
1306
    # Warn on config errors, but don't abort the save - the
1307
    # configuration has already been modified, and we can't revert;
1308
    # the best we can do is to warn the user and save as is, leaving
1309
    # recovery to the user
1310
    config_errors = self._UnlockedVerifyConfig()
1311
    if config_errors:
1312
      errmsg = ("Configuration data is not consistent: %s" %
1313
                (utils.CommaJoin(config_errors)))
1314
      logging.critical(errmsg)
1315
      if feedback_fn:
1316
        feedback_fn(errmsg)
1317

    
1318
    if destination is None:
1319
      destination = self._cfg_file
1320
    self._BumpSerialNo()
1321
    txt = serializer.Dump(self._config_data.ToDict())
1322

    
1323
    utils.WriteFile(destination, data=txt)
1324

    
1325
    self.write_count += 1
1326

    
1327
    # and redistribute the config file to master candidates
1328
    self._DistributeConfig(feedback_fn)
1329

    
1330
    # Write ssconf files on all nodes (including locally)
1331
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
1332
      if not self._offline:
1333
        result = rpc.RpcRunner.call_write_ssconf_files(
1334
          self._UnlockedGetOnlineNodeList(),
1335
          self._UnlockedGetSsconfValues())
1336

    
1337
        for nname, nresu in result.items():
1338
          msg = nresu.fail_msg
1339
          if msg:
1340
            errmsg = ("Error while uploading ssconf files to"
1341
                      " node %s: %s" % (nname, msg))
1342
            logging.warning(errmsg)
1343

    
1344
            if feedback_fn:
1345
              feedback_fn(errmsg)
1346

    
1347
      self._last_cluster_serial = self._config_data.cluster.serial_no
1348

    
1349
  def _UnlockedGetSsconfValues(self):
1350
    """Return the values needed by ssconf.
1351

1352
    @rtype: dict
1353
    @return: a dictionary with keys the ssconf names and values their
1354
        associated value
1355

1356
    """
1357
    fn = "\n".join
1358
    instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1359
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
1360
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1361
    node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1362
                    for ninfo in node_info]
1363
    node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1364
                    for ninfo in node_info]
1365

    
1366
    instance_data = fn(instance_names)
1367
    off_data = fn(node.name for node in node_info if node.offline)
1368
    on_data = fn(node.name for node in node_info if not node.offline)
1369
    mc_data = fn(node.name for node in node_info if node.master_candidate)
1370
    mc_ips_data = fn(node.primary_ip for node in node_info
1371
                     if node.master_candidate)
1372
    node_data = fn(node_names)
1373
    node_pri_ips_data = fn(node_pri_ips)
1374
    node_snd_ips_data = fn(node_snd_ips)
1375

    
1376
    cluster = self._config_data.cluster
1377
    cluster_tags = fn(cluster.GetTags())
1378

    
1379
    hypervisor_list = fn(cluster.enabled_hypervisors)
1380

    
1381
    uid_pool = uidpool.FormatUidPool(cluster.uid_pool, separator="\n")
1382

    
1383
    return {
1384
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
1385
      constants.SS_CLUSTER_TAGS: cluster_tags,
1386
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1387
      constants.SS_MASTER_CANDIDATES: mc_data,
1388
      constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1389
      constants.SS_MASTER_IP: cluster.master_ip,
1390
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
1391
      constants.SS_MASTER_NODE: cluster.master_node,
1392
      constants.SS_NODE_LIST: node_data,
1393
      constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1394
      constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1395
      constants.SS_OFFLINE_NODES: off_data,
1396
      constants.SS_ONLINE_NODES: on_data,
1397
      constants.SS_INSTANCE_LIST: instance_data,
1398
      constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1399
      constants.SS_HYPERVISOR_LIST: hypervisor_list,
1400
      constants.SS_MAINTAIN_NODE_HEALTH: str(cluster.maintain_node_health),
1401
      constants.SS_UID_POOL: uid_pool,
1402
      }
1403

    
1404
  @locking.ssynchronized(_config_lock, shared=1)
1405
  def GetVGName(self):
1406
    """Return the volume group name.
1407

1408
    """
1409
    return self._config_data.cluster.volume_group_name
1410

    
1411
  @locking.ssynchronized(_config_lock)
1412
  def SetVGName(self, vg_name):
1413
    """Set the volume group name.
1414

1415
    """
1416
    self._config_data.cluster.volume_group_name = vg_name
1417
    self._config_data.cluster.serial_no += 1
1418
    self._WriteConfig()
1419

    
1420
  @locking.ssynchronized(_config_lock, shared=1)
1421
  def GetMACPrefix(self):
1422
    """Return the mac prefix.
1423

1424
    """
1425
    return self._config_data.cluster.mac_prefix
1426

    
1427
  @locking.ssynchronized(_config_lock, shared=1)
1428
  def GetClusterInfo(self):
1429
    """Returns information about the cluster
1430

1431
    @rtype: L{objects.Cluster}
1432
    @return: the cluster object
1433

1434
    """
1435
    return self._config_data.cluster
1436

    
1437
  @locking.ssynchronized(_config_lock)
1438
  def Update(self, target, feedback_fn):
1439
    """Notify function to be called after updates.
1440

1441
    This function must be called when an object (as returned by
1442
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1443
    caller wants the modifications saved to the backing store. Note
1444
    that all modified objects will be saved, but the target argument
1445
    is the one the caller wants to ensure that it's saved.
1446

1447
    @param target: an instance of either L{objects.Cluster},
1448
        L{objects.Node} or L{objects.Instance} which is existing in
1449
        the cluster
1450
    @param feedback_fn: Callable feedback function
1451

1452
    """
1453
    if self._config_data is None:
1454
      raise errors.ProgrammerError("Configuration file not read,"
1455
                                   " cannot save.")
1456
    update_serial = False
1457
    if isinstance(target, objects.Cluster):
1458
      test = target == self._config_data.cluster
1459
    elif isinstance(target, objects.Node):
1460
      test = target in self._config_data.nodes.values()
1461
      update_serial = True
1462
    elif isinstance(target, objects.Instance):
1463
      test = target in self._config_data.instances.values()
1464
    else:
1465
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
1466
                                   " ConfigWriter.Update" % type(target))
1467
    if not test:
1468
      raise errors.ConfigurationError("Configuration updated since object"
1469
                                      " has been read or unknown object")
1470
    target.serial_no += 1
1471
    target.mtime = now = time.time()
1472

    
1473
    if update_serial:
1474
      # for node updates, we need to increase the cluster serial too
1475
      self._config_data.cluster.serial_no += 1
1476
      self._config_data.cluster.mtime = now
1477

    
1478
    if isinstance(target, objects.Instance):
1479
      self._UnlockedReleaseDRBDMinors(target.name)
1480

    
1481
    self._WriteConfig(feedback_fn=feedback_fn)
1482

    
1483
  @locking.ssynchronized(_config_lock)
1484
  def DropECReservations(self, ec_id):
1485
    """Drop per-execution-context reservations
1486

1487
    """
1488
    for rm in self._all_rms:
1489
      rm.DropECReservations(ec_id)