Statistics
| Branch: | Tag: | Revision:

root / lib / config.py @ bf75f132

History | View | Annotate | Download (34.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Configuration management for Ganeti
23

24
This module provides the interface to the Ganeti cluster configuration.
25

26
The configuration data is stored on every node but is updated on the master
27
only. After each update, the master distributes the data to the other nodes.
28

29
Currently, the data storage format is JSON. YAML was slow and consuming too
30
much memory.
31

32
"""
33

    
34
import os
35
import tempfile
36
import random
37
import logging
38

    
39
from ganeti import errors
40
from ganeti import locking
41
from ganeti import utils
42
from ganeti import constants
43
from ganeti import rpc
44
from ganeti import objects
45
from ganeti import serializer
46

    
47

    
48
_config_lock = locking.SharedLock()
49

    
50

    
51
def _ValidateConfig(data):
52
  """Verifies that a configuration objects looks valid.
53

54
  This only verifies the version of the configuration.
55

56
  @raise errors.ConfigurationError: if the version differs from what
57
      we expect
58

59
  """
60
  if data.version != constants.CONFIG_VERSION:
61
    raise errors.ConfigurationError("Cluster configuration version"
62
                                    " mismatch, got %s instead of %s" %
63
                                    (data.version,
64
                                     constants.CONFIG_VERSION))
65

    
66

    
67
class ConfigWriter:
68
  """The interface to the cluster configuration.
69

70
  """
71
  def __init__(self, cfg_file=None, offline=False):
72
    self.write_count = 0
73
    self._lock = _config_lock
74
    self._config_data = None
75
    self._offline = offline
76
    if cfg_file is None:
77
      self._cfg_file = constants.CLUSTER_CONF_FILE
78
    else:
79
      self._cfg_file = cfg_file
80
    self._temporary_ids = set()
81
    self._temporary_drbds = {}
82
    # Note: in order to prevent errors when resolving our name in
83
    # _DistributeConfig, we compute it here once and reuse it; it's
84
    # better to raise an error before starting to modify the config
85
    # file than after it was modified
86
    self._my_hostname = utils.HostInfo().name
87
    self._OpenConfig()
88

    
89
  # this method needs to be static, so that we can call it on the class
90
  @staticmethod
91
  def IsCluster():
92
    """Check if the cluster is configured.
93

94
    """
95
    return os.path.exists(constants.CLUSTER_CONF_FILE)
96

    
97
  @locking.ssynchronized(_config_lock, shared=1)
98
  def GenerateMAC(self):
99
    """Generate a MAC for an instance.
100

101
    This should check the current instances for duplicates.
102

103
    """
104
    prefix = self._config_data.cluster.mac_prefix
105
    all_macs = self._AllMACs()
106
    retries = 64
107
    while retries > 0:
108
      byte1 = random.randrange(0, 256)
109
      byte2 = random.randrange(0, 256)
110
      byte3 = random.randrange(0, 256)
111
      mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
112
      if mac not in all_macs:
113
        break
114
      retries -= 1
115
    else:
116
      raise errors.ConfigurationError("Can't generate unique MAC")
117
    return mac
118

    
119
  @locking.ssynchronized(_config_lock, shared=1)
120
  def IsMacInUse(self, mac):
121
    """Predicate: check if the specified MAC is in use in the Ganeti cluster.
122

123
    This only checks instances managed by this cluster, it does not
124
    check for potential collisions elsewhere.
125

126
    """
127
    all_macs = self._AllMACs()
128
    return mac in all_macs
129

    
130
  @locking.ssynchronized(_config_lock, shared=1)
131
  def GenerateDRBDSecret(self):
132
    """Generate a DRBD secret.
133

134
    This checks the current disks for duplicates.
135

136
    """
137
    all_secrets = self._AllDRBDSecrets()
138
    retries = 64
139
    while retries > 0:
140
      secret = utils.GenerateSecret()
141
      if secret not in all_secrets:
142
        break
143
      retries -= 1
144
    else:
145
      raise errors.ConfigurationError("Can't generate unique DRBD secret")
146
    return secret
147

    
148
  def _ComputeAllLVs(self):
149
    """Compute the list of all LVs.
150

151
    """
152
    lvnames = set()
153
    for instance in self._config_data.instances.values():
154
      node_data = instance.MapLVsByNode()
155
      for lv_list in node_data.values():
156
        lvnames.update(lv_list)
157
    return lvnames
158

    
159
  @locking.ssynchronized(_config_lock, shared=1)
160
  def GenerateUniqueID(self, exceptions=None):
161
    """Generate an unique disk name.
162

163
    This checks the current node, instances and disk names for
164
    duplicates.
165

166
    @param exceptions: a list with some other names which should be checked
167
        for uniqueness (used for example when you want to get
168
        more than one id at one time without adding each one in
169
        turn to the config file)
170

171
    @rtype: string
172
    @return: the unique id
173

174
    """
175
    existing = set()
176
    existing.update(self._temporary_ids)
177
    existing.update(self._ComputeAllLVs())
178
    existing.update(self._config_data.instances.keys())
179
    existing.update(self._config_data.nodes.keys())
180
    if exceptions is not None:
181
      existing.update(exceptions)
182
    retries = 64
183
    while retries > 0:
184
      unique_id = utils.NewUUID()
185
      if unique_id not in existing and unique_id is not None:
186
        break
187
    else:
188
      raise errors.ConfigurationError("Not able generate an unique ID"
189
                                      " (last tried ID: %s" % unique_id)
190
    self._temporary_ids.add(unique_id)
191
    return unique_id
192

    
193
  def _AllMACs(self):
194
    """Return all MACs present in the config.
195

196
    @rtype: list
197
    @return: the list of all MACs
198

199
    """
200
    result = []
201
    for instance in self._config_data.instances.values():
202
      for nic in instance.nics:
203
        result.append(nic.mac)
204

    
205
    return result
206

    
207
  def _AllDRBDSecrets(self):
208
    """Return all DRBD secrets present in the config.
209

210
    @rtype: list
211
    @return: the list of all DRBD secrets
212

213
    """
214
    def helper(disk, result):
215
      """Recursively gather secrets from this disk."""
216
      if disk.dev_type == constants.DT_DRBD8:
217
        result.append(disk.logical_id[5])
218
      if disk.children:
219
        for child in disk.children:
220
          helper(child, result)
221

    
222
    result = []
223
    for instance in self._config_data.instances.values():
224
      for disk in instance.disks:
225
        helper(disk, result)
226

    
227
    return result
228

    
229
  @locking.ssynchronized(_config_lock, shared=1)
230
  def VerifyConfig(self):
231
    """Verify function.
232

233
    """
234
    result = []
235
    seen_macs = []
236
    ports = {}
237
    data = self._config_data
238
    for instance_name in data.instances:
239
      instance = data.instances[instance_name]
240
      if instance.primary_node not in data.nodes:
241
        result.append("instance '%s' has invalid primary node '%s'" %
242
                      (instance_name, instance.primary_node))
243
      for snode in instance.secondary_nodes:
244
        if snode not in data.nodes:
245
          result.append("instance '%s' has invalid secondary node '%s'" %
246
                        (instance_name, snode))
247
      for idx, nic in enumerate(instance.nics):
248
        if nic.mac in seen_macs:
249
          result.append("instance '%s' has NIC %d mac %s duplicate" %
250
                        (instance_name, idx, nic.mac))
251
        else:
252
          seen_macs.append(nic.mac)
253

    
254
      # gather the drbd ports for duplicate checks
255
      for dsk in instance.disks:
256
        if dsk.dev_type in constants.LDS_DRBD:
257
          tcp_port = dsk.logical_id[2]
258
          if tcp_port not in ports:
259
            ports[tcp_port] = []
260
          ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
261
      # gather network port reservation
262
      net_port = getattr(instance, "network_port", None)
263
      if net_port is not None:
264
        if net_port not in ports:
265
          ports[net_port] = []
266
        ports[net_port].append((instance.name, "network port"))
267

    
268
    # cluster-wide pool of free ports
269
    for free_port in data.cluster.tcpudp_port_pool:
270
      if free_port not in ports:
271
        ports[free_port] = []
272
      ports[free_port].append(("cluster", "port marked as free"))
273

    
274
    # compute tcp/udp duplicate ports
275
    keys = ports.keys()
276
    keys.sort()
277
    for pnum in keys:
278
      pdata = ports[pnum]
279
      if len(pdata) > 1:
280
        txt = ", ".join(["%s/%s" % val for val in pdata])
281
        result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
282

    
283
    # highest used tcp port check
284
    if keys:
285
      if keys[-1] > data.cluster.highest_used_port:
286
        result.append("Highest used port mismatch, saved %s, computed %s" %
287
                      (data.cluster.highest_used_port, keys[-1]))
288

    
289
    if not data.nodes[data.cluster.master_node].master_candidate:
290
      result.append("Master node is not a master candidate")
291

    
292
    mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
293
    if mc_now < mc_max:
294
      result.append("Not enough master candidates: actual %d, target %d" %
295
                    (mc_now, mc_max))
296

    
297
    return result
298

    
299
  def _UnlockedSetDiskID(self, disk, node_name):
300
    """Convert the unique ID to the ID needed on the target nodes.
301

302
    This is used only for drbd, which needs ip/port configuration.
303

304
    The routine descends down and updates its children also, because
305
    this helps when the only the top device is passed to the remote
306
    node.
307

308
    This function is for internal use, when the config lock is already held.
309

310
    """
311
    if disk.children:
312
      for child in disk.children:
313
        self._UnlockedSetDiskID(child, node_name)
314

    
315
    if disk.logical_id is None and disk.physical_id is not None:
316
      return
317
    if disk.dev_type == constants.LD_DRBD8:
318
      pnode, snode, port, pminor, sminor, secret = disk.logical_id
319
      if node_name not in (pnode, snode):
320
        raise errors.ConfigurationError("DRBD device not knowing node %s" %
321
                                        node_name)
322
      pnode_info = self._UnlockedGetNodeInfo(pnode)
323
      snode_info = self._UnlockedGetNodeInfo(snode)
324
      if pnode_info is None or snode_info is None:
325
        raise errors.ConfigurationError("Can't find primary or secondary node"
326
                                        " for %s" % str(disk))
327
      p_data = (pnode_info.secondary_ip, port)
328
      s_data = (snode_info.secondary_ip, port)
329
      if pnode == node_name:
330
        disk.physical_id = p_data + s_data + (pminor, secret)
331
      else: # it must be secondary, we tested above
332
        disk.physical_id = s_data + p_data + (sminor, secret)
333
    else:
334
      disk.physical_id = disk.logical_id
335
    return
336

    
337
  @locking.ssynchronized(_config_lock)
338
  def SetDiskID(self, disk, node_name):
339
    """Convert the unique ID to the ID needed on the target nodes.
340

341
    This is used only for drbd, which needs ip/port configuration.
342

343
    The routine descends down and updates its children also, because
344
    this helps when the only the top device is passed to the remote
345
    node.
346

347
    """
348
    return self._UnlockedSetDiskID(disk, node_name)
349

    
350
  @locking.ssynchronized(_config_lock)
351
  def AddTcpUdpPort(self, port):
352
    """Adds a new port to the available port pool.
353

354
    """
355
    if not isinstance(port, int):
356
      raise errors.ProgrammerError("Invalid type passed for port")
357

    
358
    self._config_data.cluster.tcpudp_port_pool.add(port)
359
    self._WriteConfig()
360

    
361
  @locking.ssynchronized(_config_lock, shared=1)
362
  def GetPortList(self):
363
    """Returns a copy of the current port list.
364

365
    """
366
    return self._config_data.cluster.tcpudp_port_pool.copy()
367

    
368
  @locking.ssynchronized(_config_lock)
369
  def AllocatePort(self):
370
    """Allocate a port.
371

372
    The port will be taken from the available port pool or from the
373
    default port range (and in this case we increase
374
    highest_used_port).
375

376
    """
377
    # If there are TCP/IP ports configured, we use them first.
378
    if self._config_data.cluster.tcpudp_port_pool:
379
      port = self._config_data.cluster.tcpudp_port_pool.pop()
380
    else:
381
      port = self._config_data.cluster.highest_used_port + 1
382
      if port >= constants.LAST_DRBD_PORT:
383
        raise errors.ConfigurationError("The highest used port is greater"
384
                                        " than %s. Aborting." %
385
                                        constants.LAST_DRBD_PORT)
386
      self._config_data.cluster.highest_used_port = port
387

    
388
    self._WriteConfig()
389
    return port
390

    
391
  def _ComputeDRBDMap(self, instance):
392
    """Compute the used DRBD minor/nodes.
393

394
    @return: dictionary of node_name: dict of minor: instance_name;
395
        the returned dict will have all the nodes in it (even if with
396
        an empty list).
397

398
    """
399
    def _AppendUsedPorts(instance_name, disk, used):
400
      if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
401
        nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
402
        for node, port in ((nodeA, minorA), (nodeB, minorB)):
403
          assert node in used, "Instance node not found in node list"
404
          if port in used[node]:
405
            raise errors.ProgrammerError("DRBD minor already used:"
406
                                         " %s/%s, %s/%s" %
407
                                         (node, port, instance_name,
408
                                          used[node][port]))
409

    
410
          used[node][port] = instance_name
411
      if disk.children:
412
        for child in disk.children:
413
          _AppendUsedPorts(instance_name, child, used)
414

    
415
    my_dict = dict((node, {}) for node in self._config_data.nodes)
416
    for (node, minor), instance in self._temporary_drbds.iteritems():
417
      my_dict[node][minor] = instance
418
    for instance in self._config_data.instances.itervalues():
419
      for disk in instance.disks:
420
        _AppendUsedPorts(instance.name, disk, my_dict)
421
    return my_dict
422

    
423
  @locking.ssynchronized(_config_lock)
424
  def AllocateDRBDMinor(self, nodes, instance):
425
    """Allocate a drbd minor.
426

427
    The free minor will be automatically computed from the existing
428
    devices. A node can be given multiple times in order to allocate
429
    multiple minors. The result is the list of minors, in the same
430
    order as the passed nodes.
431

432
    """
433
    d_map = self._ComputeDRBDMap(instance)
434
    result = []
435
    for nname in nodes:
436
      ndata = d_map[nname]
437
      if not ndata:
438
        # no minors used, we can start at 0
439
        result.append(0)
440
        ndata[0] = instance
441
        self._temporary_drbds[(nname, 0)] = instance
442
        continue
443
      keys = ndata.keys()
444
      keys.sort()
445
      ffree = utils.FirstFree(keys)
446
      if ffree is None:
447
        # return the next minor
448
        # TODO: implement high-limit check
449
        minor = keys[-1] + 1
450
      else:
451
        minor = ffree
452
      result.append(minor)
453
      ndata[minor] = instance
454
      assert (nname, minor) not in self._temporary_drbds, \
455
             "Attempt to reuse reserved DRBD minor"
456
      self._temporary_drbds[(nname, minor)] = instance
457
    logging.debug("Request to allocate drbd minors, input: %s, returning %s",
458
                  nodes, result)
459
    return result
460

    
461
  @locking.ssynchronized(_config_lock)
462
  def ReleaseDRBDMinors(self, instance):
463
    """Release temporary drbd minors allocated for a given instance.
464

465
    This should be called on both the error paths and on the success
466
    paths (after the instance has been added or updated).
467

468
    @type instance: string
469
    @param instance: the instance for which temporary minors should be
470
                     released
471

472
    """
473
    for key, name in self._temporary_drbds.items():
474
      if name == instance:
475
        del self._temporary_drbds[key]
476

    
477
  @locking.ssynchronized(_config_lock, shared=1)
478
  def GetConfigVersion(self):
479
    """Get the configuration version.
480

481
    @return: Config version
482

483
    """
484
    return self._config_data.version
485

    
486
  @locking.ssynchronized(_config_lock, shared=1)
487
  def GetClusterName(self):
488
    """Get cluster name.
489

490
    @return: Cluster name
491

492
    """
493
    return self._config_data.cluster.cluster_name
494

    
495
  @locking.ssynchronized(_config_lock, shared=1)
496
  def GetMasterNode(self):
497
    """Get the hostname of the master node for this cluster.
498

499
    @return: Master hostname
500

501
    """
502
    return self._config_data.cluster.master_node
503

    
504
  @locking.ssynchronized(_config_lock, shared=1)
505
  def GetMasterIP(self):
506
    """Get the IP of the master node for this cluster.
507

508
    @return: Master IP
509

510
    """
511
    return self._config_data.cluster.master_ip
512

    
513
  @locking.ssynchronized(_config_lock, shared=1)
514
  def GetMasterNetdev(self):
515
    """Get the master network device for this cluster.
516

517
    """
518
    return self._config_data.cluster.master_netdev
519

    
520
  @locking.ssynchronized(_config_lock, shared=1)
521
  def GetFileStorageDir(self):
522
    """Get the file storage dir for this cluster.
523

524
    """
525
    return self._config_data.cluster.file_storage_dir
526

    
527
  @locking.ssynchronized(_config_lock, shared=1)
528
  def GetHypervisorType(self):
529
    """Get the hypervisor type for this cluster.
530

531
    """
532
    return self._config_data.cluster.default_hypervisor
533

    
534
  @locking.ssynchronized(_config_lock, shared=1)
535
  def GetHostKey(self):
536
    """Return the rsa hostkey from the config.
537

538
    @rtype: string
539
    @return: the rsa hostkey
540

541
    """
542
    return self._config_data.cluster.rsahostkeypub
543

    
544
  @locking.ssynchronized(_config_lock)
545
  def AddInstance(self, instance):
546
    """Add an instance to the config.
547

548
    This should be used after creating a new instance.
549

550
    @type instance: L{objects.Instance}
551
    @param instance: the instance object
552

553
    """
554
    if not isinstance(instance, objects.Instance):
555
      raise errors.ProgrammerError("Invalid type passed to AddInstance")
556

    
557
    if instance.disk_template != constants.DT_DISKLESS:
558
      all_lvs = instance.MapLVsByNode()
559
      logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
560

    
561
    instance.serial_no = 1
562
    self._config_data.instances[instance.name] = instance
563
    self._WriteConfig()
564

    
565
  def _SetInstanceStatus(self, instance_name, status):
566
    """Set the instance's status to a given value.
567

568
    """
569
    if status not in ("up", "down"):
570
      raise errors.ProgrammerError("Invalid status '%s' passed to"
571
                                   " ConfigWriter._SetInstanceStatus()" %
572
                                   status)
573

    
574
    if instance_name not in self._config_data.instances:
575
      raise errors.ConfigurationError("Unknown instance '%s'" %
576
                                      instance_name)
577
    instance = self._config_data.instances[instance_name]
578
    if instance.status != status:
579
      instance.status = status
580
      instance.serial_no += 1
581
      self._WriteConfig()
582

    
583
  @locking.ssynchronized(_config_lock)
584
  def MarkInstanceUp(self, instance_name):
585
    """Mark the instance status to up in the config.
586

587
    """
588
    self._SetInstanceStatus(instance_name, "up")
589

    
590
  @locking.ssynchronized(_config_lock)
591
  def RemoveInstance(self, instance_name):
592
    """Remove the instance from the configuration.
593

594
    """
595
    if instance_name not in self._config_data.instances:
596
      raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
597
    del self._config_data.instances[instance_name]
598
    self._WriteConfig()
599

    
600
  @locking.ssynchronized(_config_lock)
601
  def RenameInstance(self, old_name, new_name):
602
    """Rename an instance.
603

604
    This needs to be done in ConfigWriter and not by RemoveInstance
605
    combined with AddInstance as only we can guarantee an atomic
606
    rename.
607

608
    """
609
    if old_name not in self._config_data.instances:
610
      raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
611
    inst = self._config_data.instances[old_name]
612
    del self._config_data.instances[old_name]
613
    inst.name = new_name
614

    
615
    for disk in inst.disks:
616
      if disk.dev_type == constants.LD_FILE:
617
        # rename the file paths in logical and physical id
618
        file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
619
        disk.physical_id = disk.logical_id = (disk.logical_id[0],
620
                                              os.path.join(file_storage_dir,
621
                                                           inst.name,
622
                                                           disk.iv_name))
623

    
624
    self._config_data.instances[inst.name] = inst
625
    self._WriteConfig()
626

    
627
  @locking.ssynchronized(_config_lock)
628
  def MarkInstanceDown(self, instance_name):
629
    """Mark the status of an instance to down in the configuration.
630

631
    """
632
    self._SetInstanceStatus(instance_name, "down")
633

    
634
  def _UnlockedGetInstanceList(self):
635
    """Get the list of instances.
636

637
    This function is for internal use, when the config lock is already held.
638

639
    """
640
    return self._config_data.instances.keys()
641

    
642
  @locking.ssynchronized(_config_lock, shared=1)
643
  def GetInstanceList(self):
644
    """Get the list of instances.
645

646
    @return: array of instances, ex. ['instance2.example.com',
647
        'instance1.example.com']
648

649
    """
650
    return self._UnlockedGetInstanceList()
651

    
652
  @locking.ssynchronized(_config_lock, shared=1)
653
  def ExpandInstanceName(self, short_name):
654
    """Attempt to expand an incomplete instance name.
655

656
    """
657
    return utils.MatchNameComponent(short_name,
658
                                    self._config_data.instances.keys())
659

    
660
  def _UnlockedGetInstanceInfo(self, instance_name):
661
    """Returns informations about an instance.
662

663
    This function is for internal use, when the config lock is already held.
664

665
    """
666
    if instance_name not in self._config_data.instances:
667
      return None
668

    
669
    return self._config_data.instances[instance_name]
670

    
671
  @locking.ssynchronized(_config_lock, shared=1)
672
  def GetInstanceInfo(self, instance_name):
673
    """Returns informations about an instance.
674

675
    It takes the information from the configuration file. Other informations of
676
    an instance are taken from the live systems.
677

678
    @param instance_name: name of the instance, e.g.
679
        I{instance1.example.com}
680

681
    @rtype: L{objects.Instance}
682
    @return: the instance object
683

684
    """
685
    return self._UnlockedGetInstanceInfo(instance_name)
686

    
687
  @locking.ssynchronized(_config_lock, shared=1)
688
  def GetAllInstancesInfo(self):
689
    """Get the configuration of all instances.
690

691
    @rtype: dict
692
    @returns: dict of (instance, instance_info), where instance_info is what
693
              would GetInstanceInfo return for the node
694

695
    """
696
    my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
697
                    for instance in self._UnlockedGetInstanceList()])
698
    return my_dict
699

    
700
  @locking.ssynchronized(_config_lock)
701
  def AddNode(self, node):
702
    """Add a node to the configuration.
703

704
    @type node: L{objects.Node}
705
    @param node: a Node instance
706

707
    """
708
    logging.info("Adding node %s to configuration" % node.name)
709

    
710
    node.serial_no = 1
711
    self._config_data.nodes[node.name] = node
712
    self._config_data.cluster.serial_no += 1
713
    self._WriteConfig()
714

    
715
  @locking.ssynchronized(_config_lock)
716
  def RemoveNode(self, node_name):
717
    """Remove a node from the configuration.
718

719
    """
720
    logging.info("Removing node %s from configuration" % node_name)
721

    
722
    if node_name not in self._config_data.nodes:
723
      raise errors.ConfigurationError("Unknown node '%s'" % node_name)
724

    
725
    del self._config_data.nodes[node_name]
726
    self._config_data.cluster.serial_no += 1
727
    self._WriteConfig()
728

    
729
  @locking.ssynchronized(_config_lock, shared=1)
730
  def ExpandNodeName(self, short_name):
731
    """Attempt to expand an incomplete instance name.
732

733
    """
734
    return utils.MatchNameComponent(short_name,
735
                                    self._config_data.nodes.keys())
736

    
737
  def _UnlockedGetNodeInfo(self, node_name):
738
    """Get the configuration of a node, as stored in the config.
739

740
    This function is for internal use, when the config lock is already
741
    held.
742

743
    @param node_name: the node name, e.g. I{node1.example.com}
744

745
    @rtype: L{objects.Node}
746
    @return: the node object
747

748
    """
749
    if node_name not in self._config_data.nodes:
750
      return None
751

    
752
    return self._config_data.nodes[node_name]
753

    
754

    
755
  @locking.ssynchronized(_config_lock, shared=1)
756
  def GetNodeInfo(self, node_name):
757
    """Get the configuration of a node, as stored in the config.
758

759
    This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
760

761
    @param node_name: the node name, e.g. I{node1.example.com}
762

763
    @rtype: L{objects.Node}
764
    @return: the node object
765

766
    """
767
    return self._UnlockedGetNodeInfo(node_name)
768

    
769
  def _UnlockedGetNodeList(self):
770
    """Return the list of nodes which are in the configuration.
771

772
    This function is for internal use, when the config lock is already
773
    held.
774

775
    @rtype: list
776

777
    """
778
    return self._config_data.nodes.keys()
779

    
780

    
781
  @locking.ssynchronized(_config_lock, shared=1)
782
  def GetNodeList(self):
783
    """Return the list of nodes which are in the configuration.
784

785
    """
786
    return self._UnlockedGetNodeList()
787

    
788
  @locking.ssynchronized(_config_lock, shared=1)
789
  def GetAllNodesInfo(self):
790
    """Get the configuration of all nodes.
791

792
    @rtype: dict
793
    @return: dict of (node, node_info), where node_info is what
794
              would GetNodeInfo return for the node
795

796
    """
797
    my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
798
                    for node in self._UnlockedGetNodeList()])
799
    return my_dict
800

    
801
  def _UnlockedGetMasterCandidateStats(self):
802
    """Get the number of current and maximum desired and possible candidates.
803

804
    @rtype: tuple
805
    @return: tuple of (current, desired and possible)
806

807
    """
808
    mc_now = mc_max = 0
809
    for node in self._config_data.nodes.itervalues():
810
      if not node.offline:
811
        mc_max += 1
812
      if node.master_candidate:
813
        mc_now += 1
814
    mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
815
    return (mc_now, mc_max)
816

    
817
  @locking.ssynchronized(_config_lock, shared=1)
818
  def GetMasterCandidateStats(self):
819
    """Get the number of current and maximum possible candidates.
820

821
    This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
822

823
    @rtype: tuple
824
    @return: tuple of (current, max)
825

826
    """
827
    return self._UnlockedGetMasterCandidateStats()
828

    
829
  @locking.ssynchronized(_config_lock)
830
  def MaintainCandidatePool(self):
831
    """Try to grow the candidate pool to the desired size.
832

833
    @rtype: list
834
    @return: list with the adjusted nodes (L{objects.Node} instances)
835

836
    """
837
    mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
838
    mod_list = []
839
    if mc_now < mc_max:
840
      node_list = self._config_data.nodes.keys()
841
      random.shuffle(node_list)
842
      for name in node_list:
843
        if mc_now >= mc_max:
844
          break
845
        node = self._config_data.nodes[name]
846
        if node.master_candidate or node.offline:
847
          continue
848
        mod_list.append(node)
849
        node.master_candidate = True
850
        node.serial_no += 1
851
        mc_now += 1
852
      if mc_now != mc_max:
853
        # this should not happen
854
        logging.warning("Warning: MaintainCandidatePool didn't manage to"
855
                        " fill the candidate pool (%d/%d)", mc_now, mc_max)
856
      if mod_list:
857
        self._config_data.cluster.serial_no += 1
858
        self._WriteConfig()
859

    
860
    return mod_list
861

    
862
  def _BumpSerialNo(self):
863
    """Bump up the serial number of the config.
864

865
    """
866
    self._config_data.serial_no += 1
867

    
868
  def _OpenConfig(self):
869
    """Read the config data from disk.
870

871
    """
872
    f = open(self._cfg_file, 'r')
873
    try:
874
      try:
875
        data = objects.ConfigData.FromDict(serializer.Load(f.read()))
876
      except Exception, err:
877
        raise errors.ConfigurationError(err)
878
    finally:
879
      f.close()
880

    
881
    # Make sure the configuration has the right version
882
    _ValidateConfig(data)
883

    
884
    if (not hasattr(data, 'cluster') or
885
        not hasattr(data.cluster, 'rsahostkeypub')):
886
      raise errors.ConfigurationError("Incomplete configuration"
887
                                      " (missing cluster.rsahostkeypub)")
888
    self._config_data = data
889
    # init the last serial as -1 so that the next write will cause
890
    # ssconf update
891
    self._last_cluster_serial = -1
892

    
893
  def _DistributeConfig(self):
894
    """Distribute the configuration to the other nodes.
895

896
    Currently, this only copies the configuration file. In the future,
897
    it could be used to encapsulate the 2/3-phase update mechanism.
898

899
    """
900
    if self._offline:
901
      return True
902
    bad = False
903

    
904
    node_list = []
905
    addr_list = []
906
    myhostname = self._my_hostname
907
    # we can skip checking whether _UnlockedGetNodeInfo returns None
908
    # since the node list comes from _UnlocketGetNodeList, and we are
909
    # called with the lock held, so no modifications should take place
910
    # in between
911
    for node_name in self._UnlockedGetNodeList():
912
      if node_name == myhostname:
913
        continue
914
      node_info = self._UnlockedGetNodeInfo(node_name)
915
      if not node_info.master_candidate:
916
        continue
917
      node_list.append(node_info.name)
918
      addr_list.append(node_info.primary_ip)
919

    
920
    result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
921
                                            address_list=addr_list)
922
    for node in node_list:
923
      if not result[node]:
924
        logging.error("copy of file %s to node %s failed",
925
                      self._cfg_file, node)
926
        bad = True
927
    return not bad
928

    
929
  def _WriteConfig(self, destination=None):
930
    """Write the configuration data to persistent storage.
931

932
    """
933
    if destination is None:
934
      destination = self._cfg_file
935
    self._BumpSerialNo()
936
    txt = serializer.Dump(self._config_data.ToDict())
937
    dir_name, file_name = os.path.split(destination)
938
    fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
939
    f = os.fdopen(fd, 'w')
940
    try:
941
      f.write(txt)
942
      os.fsync(f.fileno())
943
    finally:
944
      f.close()
945
    # we don't need to do os.close(fd) as f.close() did it
946
    os.rename(name, destination)
947
    self.write_count += 1
948

    
949
    # and redistribute the config file to master candidates
950
    self._DistributeConfig()
951

    
952
    # Write ssconf files on all nodes (including locally)
953
    if self._last_cluster_serial < self._config_data.cluster.serial_no:
954
      if not self._offline:
955
        rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
956
                                              self._UnlockedGetSsconfValues())
957
      self._last_cluster_serial = self._config_data.cluster.serial_no
958

    
959
  def _UnlockedGetSsconfValues(self):
960
    """Return the values needed by ssconf.
961

962
    @rtype: dict
963
    @return: a dictionary with keys the ssconf names and values their
964
        associated value
965

966
    """
967
    fn = "\n".join
968
    node_names = utils.NiceSort(self._UnlockedGetNodeList())
969
    node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
970

    
971
    off_data = fn(node.name for node in node_info if node.offline)
972
    mc_data = fn(node.name for node in node_info if node.master_candidate)
973
    node_data = fn(node_names)
974

    
975
    cluster = self._config_data.cluster
976
    return {
977
      constants.SS_CLUSTER_NAME: cluster.cluster_name,
978
      constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
979
      constants.SS_MASTER_CANDIDATES: mc_data,
980
      constants.SS_MASTER_IP: cluster.master_ip,
981
      constants.SS_MASTER_NETDEV: cluster.master_netdev,
982
      constants.SS_MASTER_NODE: cluster.master_node,
983
      constants.SS_NODE_LIST: node_data,
984
      constants.SS_OFFLINE_NODES: off_data,
985
      }
986

    
987
  @locking.ssynchronized(_config_lock)
988
  def InitConfig(self, version, cluster_config, master_node_config):
989
    """Create the initial cluster configuration.
990

991
    It will contain the current node, which will also be the master
992
    node, and no instances.
993

994
    @type version: int
995
    @param version: Configuration version
996
    @type cluster_config: objects.Cluster
997
    @param cluster_config: Cluster configuration
998
    @type master_node_config: objects.Node
999
    @param master_node_config: Master node configuration
1000

1001
    """
1002
    nodes = {
1003
      master_node_config.name: master_node_config,
1004
      }
1005

    
1006
    self._config_data = objects.ConfigData(version=version,
1007
                                           cluster=cluster_config,
1008
                                           nodes=nodes,
1009
                                           instances={},
1010
                                           serial_no=1)
1011
    self._WriteConfig()
1012

    
1013
  @locking.ssynchronized(_config_lock, shared=1)
1014
  def GetVGName(self):
1015
    """Return the volume group name.
1016

1017
    """
1018
    return self._config_data.cluster.volume_group_name
1019

    
1020
  @locking.ssynchronized(_config_lock)
1021
  def SetVGName(self, vg_name):
1022
    """Set the volume group name.
1023

1024
    """
1025
    self._config_data.cluster.volume_group_name = vg_name
1026
    self._config_data.cluster.serial_no += 1
1027
    self._WriteConfig()
1028

    
1029
  @locking.ssynchronized(_config_lock, shared=1)
1030
  def GetDefBridge(self):
1031
    """Return the default bridge.
1032

1033
    """
1034
    return self._config_data.cluster.default_bridge
1035

    
1036
  @locking.ssynchronized(_config_lock, shared=1)
1037
  def GetMACPrefix(self):
1038
    """Return the mac prefix.
1039

1040
    """
1041
    return self._config_data.cluster.mac_prefix
1042

    
1043
  @locking.ssynchronized(_config_lock, shared=1)
1044
  def GetClusterInfo(self):
1045
    """Returns informations about the cluster
1046

1047
    @rtype: L{objects.Cluster}
1048
    @return: the cluster object
1049

1050
    """
1051
    return self._config_data.cluster
1052

    
1053
  @locking.ssynchronized(_config_lock)
1054
  def Update(self, target):
1055
    """Notify function to be called after updates.
1056

1057
    This function must be called when an object (as returned by
1058
    GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1059
    caller wants the modifications saved to the backing store. Note
1060
    that all modified objects will be saved, but the target argument
1061
    is the one the caller wants to ensure that it's saved.
1062

1063
    @param target: an instance of either L{objects.Cluster},
1064
        L{objects.Node} or L{objects.Instance} which is existing in
1065
        the cluster
1066

1067
    """
1068
    if self._config_data is None:
1069
      raise errors.ProgrammerError("Configuration file not read,"
1070
                                   " cannot save.")
1071
    update_serial = False
1072
    if isinstance(target, objects.Cluster):
1073
      test = target == self._config_data.cluster
1074
    elif isinstance(target, objects.Node):
1075
      test = target in self._config_data.nodes.values()
1076
      update_serial = True
1077
    elif isinstance(target, objects.Instance):
1078
      test = target in self._config_data.instances.values()
1079
    else:
1080
      raise errors.ProgrammerError("Invalid object type (%s) passed to"
1081
                                   " ConfigWriter.Update" % type(target))
1082
    if not test:
1083
      raise errors.ConfigurationError("Configuration updated since object"
1084
                                      " has been read or unknown object")
1085
    target.serial_no += 1
1086

    
1087
    if update_serial:
1088
      # for node updates, we need to increase the cluster serial too
1089
      self._config_data.cluster.serial_no += 1
1090

    
1091
    self._WriteConfig()