Fix some epydoc style issues
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import tempfile
36 import random
37 import logging
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46
47
48 _config_lock = locking.SharedLock()
49
50
51 def _ValidateConfig(data):
52   """Verifies that a configuration objects looks valid.
53
54   This only verifies the version of the configuration.
55
56   @raise errors.ConfigurationError: if the version differs from what
57       we expect
58
59   """
60   if data.version != constants.CONFIG_VERSION:
61     raise errors.ConfigurationError("Cluster configuration version"
62                                     " mismatch, got %s instead of %s" %
63                                     (data.version,
64                                      constants.CONFIG_VERSION))
65
66
67 class ConfigWriter:
68   """The interface to the cluster configuration.
69
70   """
71   def __init__(self, cfg_file=None, offline=False):
72     self.write_count = 0
73     self._lock = _config_lock
74     self._config_data = None
75     self._offline = offline
76     if cfg_file is None:
77       self._cfg_file = constants.CLUSTER_CONF_FILE
78     else:
79       self._cfg_file = cfg_file
80     self._temporary_ids = set()
81     self._temporary_drbds = {}
82     self._temporary_macs = set()
83     # Note: in order to prevent errors when resolving our name in
84     # _DistributeConfig, we compute it here once and reuse it; it's
85     # better to raise an error before starting to modify the config
86     # file than after it was modified
87     self._my_hostname = utils.HostInfo().name
88     self._last_cluster_serial = -1
89     self._OpenConfig()
90
91   # this method needs to be static, so that we can call it on the class
92   @staticmethod
93   def IsCluster():
94     """Check if the cluster is configured.
95
96     """
97     return os.path.exists(constants.CLUSTER_CONF_FILE)
98
99   @locking.ssynchronized(_config_lock, shared=1)
100   def GenerateMAC(self):
101     """Generate a MAC for an instance.
102
103     This should check the current instances for duplicates.
104
105     """
106     prefix = self._config_data.cluster.mac_prefix
107     all_macs = self._AllMACs()
108     retries = 64
109     while retries > 0:
110       byte1 = random.randrange(0, 256)
111       byte2 = random.randrange(0, 256)
112       byte3 = random.randrange(0, 256)
113       mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
114       if mac not in all_macs and mac not in self._temporary_macs:
115         break
116       retries -= 1
117     else:
118       raise errors.ConfigurationError("Can't generate unique MAC")
119     self._temporary_macs.add(mac)
120     return mac
121
122   @locking.ssynchronized(_config_lock, shared=1)
123   def IsMacInUse(self, mac):
124     """Predicate: check if the specified MAC is in use in the Ganeti cluster.
125
126     This only checks instances managed by this cluster, it does not
127     check for potential collisions elsewhere.
128
129     """
130     all_macs = self._AllMACs()
131     return mac in all_macs or mac in self._temporary_macs
132
133   @locking.ssynchronized(_config_lock, shared=1)
134   def GenerateDRBDSecret(self):
135     """Generate a DRBD secret.
136
137     This checks the current disks for duplicates.
138
139     """
140     all_secrets = self._AllDRBDSecrets()
141     retries = 64
142     while retries > 0:
143       secret = utils.GenerateSecret()
144       if secret not in all_secrets:
145         break
146       retries -= 1
147     else:
148       raise errors.ConfigurationError("Can't generate unique DRBD secret")
149     return secret
150
151   def _ComputeAllLVs(self):
152     """Compute the list of all LVs.
153
154     """
155     lvnames = set()
156     for instance in self._config_data.instances.values():
157       node_data = instance.MapLVsByNode()
158       for lv_list in node_data.values():
159         lvnames.update(lv_list)
160     return lvnames
161
162   @locking.ssynchronized(_config_lock, shared=1)
163   def GenerateUniqueID(self, exceptions=None):
164     """Generate an unique disk name.
165
166     This checks the current node, instances and disk names for
167     duplicates.
168
169     @param exceptions: a list with some other names which should be checked
170         for uniqueness (used for example when you want to get
171         more than one id at one time without adding each one in
172         turn to the config file)
173
174     @rtype: string
175     @return: the unique id
176
177     """
178     existing = set()
179     existing.update(self._temporary_ids)
180     existing.update(self._ComputeAllLVs())
181     existing.update(self._config_data.instances.keys())
182     existing.update(self._config_data.nodes.keys())
183     if exceptions is not None:
184       existing.update(exceptions)
185     retries = 64
186     while retries > 0:
187       unique_id = utils.NewUUID()
188       if unique_id not in existing and unique_id is not None:
189         break
190     else:
191       raise errors.ConfigurationError("Not able generate an unique ID"
192                                       " (last tried ID: %s" % unique_id)
193     self._temporary_ids.add(unique_id)
194     return unique_id
195
196   def _AllMACs(self):
197     """Return all MACs present in the config.
198
199     @rtype: list
200     @return: the list of all MACs
201
202     """
203     result = []
204     for instance in self._config_data.instances.values():
205       for nic in instance.nics:
206         result.append(nic.mac)
207
208     return result
209
210   def _AllDRBDSecrets(self):
211     """Return all DRBD secrets present in the config.
212
213     @rtype: list
214     @return: the list of all DRBD secrets
215
216     """
217     def helper(disk, result):
218       """Recursively gather secrets from this disk."""
219       if disk.dev_type == constants.DT_DRBD8:
220         result.append(disk.logical_id[5])
221       if disk.children:
222         for child in disk.children:
223           helper(child, result)
224
225     result = []
226     for instance in self._config_data.instances.values():
227       for disk in instance.disks:
228         helper(disk, result)
229
230     return result
231
232   def _CheckDiskIDs(self, disk, l_ids, p_ids):
233     """Compute duplicate disk IDs
234
235     @type disk: L{objects.Disk}
236     @param disk: the disk at which to start searching
237     @type l_ids: list
238     @param l_ids: list of current logical ids
239     @type p_ids: list
240     @param p_ids: list of current physical ids
241     @rtype: list
242     @return: a list of error messages
243
244     """
245     result = []
246     if disk.logical_id is not None:
247       if disk.logical_id in l_ids:
248         result.append("duplicate logical id %s" % str(disk.logical_id))
249       else:
250         l_ids.append(disk.logical_id)
251     if disk.physical_id is not None:
252       if disk.physical_id in p_ids:
253         result.append("duplicate physical id %s" % str(disk.physical_id))
254       else:
255         p_ids.append(disk.physical_id)
256
257     if disk.children:
258       for child in disk.children:
259         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
260     return result
261
262   def _UnlockedVerifyConfig(self):
263     """Verify function.
264
265     @rtype: list
266     @return: a list of error messages; a non-empty list signifies
267         configuration errors
268
269     """
270     result = []
271     seen_macs = []
272     ports = {}
273     data = self._config_data
274     seen_lids = []
275     seen_pids = []
276     for instance_name in data.instances:
277       instance = data.instances[instance_name]
278       if instance.primary_node not in data.nodes:
279         result.append("instance '%s' has invalid primary node '%s'" %
280                       (instance_name, instance.primary_node))
281       for snode in instance.secondary_nodes:
282         if snode not in data.nodes:
283           result.append("instance '%s' has invalid secondary node '%s'" %
284                         (instance_name, snode))
285       for idx, nic in enumerate(instance.nics):
286         if nic.mac in seen_macs:
287           result.append("instance '%s' has NIC %d mac %s duplicate" %
288                         (instance_name, idx, nic.mac))
289         else:
290           seen_macs.append(nic.mac)
291
292       # gather the drbd ports for duplicate checks
293       for dsk in instance.disks:
294         if dsk.dev_type in constants.LDS_DRBD:
295           tcp_port = dsk.logical_id[2]
296           if tcp_port not in ports:
297             ports[tcp_port] = []
298           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
299       # gather network port reservation
300       net_port = getattr(instance, "network_port", None)
301       if net_port is not None:
302         if net_port not in ports:
303           ports[net_port] = []
304         ports[net_port].append((instance.name, "network port"))
305
306       # instance disk verify
307       for idx, disk in enumerate(instance.disks):
308         result.extend(["instance '%s' disk %d error: %s" %
309                        (instance.name, idx, msg) for msg in disk.Verify()])
310         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
311
312     # cluster-wide pool of free ports
313     for free_port in data.cluster.tcpudp_port_pool:
314       if free_port not in ports:
315         ports[free_port] = []
316       ports[free_port].append(("cluster", "port marked as free"))
317
318     # compute tcp/udp duplicate ports
319     keys = ports.keys()
320     keys.sort()
321     for pnum in keys:
322       pdata = ports[pnum]
323       if len(pdata) > 1:
324         txt = ", ".join(["%s/%s" % val for val in pdata])
325         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
326
327     # highest used tcp port check
328     if keys:
329       if keys[-1] > data.cluster.highest_used_port:
330         result.append("Highest used port mismatch, saved %s, computed %s" %
331                       (data.cluster.highest_used_port, keys[-1]))
332
333     if not data.nodes[data.cluster.master_node].master_candidate:
334       result.append("Master node is not a master candidate")
335
336     # master candidate checks
337     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
338     if mc_now < mc_max:
339       result.append("Not enough master candidates: actual %d, target %d" %
340                     (mc_now, mc_max))
341
342     # node checks
343     for node in data.nodes.values():
344       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
345         result.append("Node %s state is invalid: master_candidate=%s,"
346                       " drain=%s, offline=%s" %
347                       (node.name, node.master_candidate, node.drain,
348                        node.offline))
349
350     # drbd minors check
351     d_map, duplicates = self._UnlockedComputeDRBDMap()
352     for node, minor, instance_a, instance_b in duplicates:
353       result.append("DRBD minor %d on node %s is assigned twice to instances"
354                     " %s and %s" % (minor, node, instance_a, instance_b))
355
356     return result
357
358   @locking.ssynchronized(_config_lock, shared=1)
359   def VerifyConfig(self):
360     """Verify function.
361
362     This is just a wrapper over L{_UnlockedVerifyConfig}.
363
364     @rtype: list
365     @return: a list of error messages; a non-empty list signifies
366         configuration errors
367
368     """
369     return self._UnlockedVerifyConfig()
370
371   def _UnlockedSetDiskID(self, disk, node_name):
372     """Convert the unique ID to the ID needed on the target nodes.
373
374     This is used only for drbd, which needs ip/port configuration.
375
376     The routine descends down and updates its children also, because
377     this helps when the only the top device is passed to the remote
378     node.
379
380     This function is for internal use, when the config lock is already held.
381
382     """
383     if disk.children:
384       for child in disk.children:
385         self._UnlockedSetDiskID(child, node_name)
386
387     if disk.logical_id is None and disk.physical_id is not None:
388       return
389     if disk.dev_type == constants.LD_DRBD8:
390       pnode, snode, port, pminor, sminor, secret = disk.logical_id
391       if node_name not in (pnode, snode):
392         raise errors.ConfigurationError("DRBD device not knowing node %s" %
393                                         node_name)
394       pnode_info = self._UnlockedGetNodeInfo(pnode)
395       snode_info = self._UnlockedGetNodeInfo(snode)
396       if pnode_info is None or snode_info is None:
397         raise errors.ConfigurationError("Can't find primary or secondary node"
398                                         " for %s" % str(disk))
399       p_data = (pnode_info.secondary_ip, port)
400       s_data = (snode_info.secondary_ip, port)
401       if pnode == node_name:
402         disk.physical_id = p_data + s_data + (pminor, secret)
403       else: # it must be secondary, we tested above
404         disk.physical_id = s_data + p_data + (sminor, secret)
405     else:
406       disk.physical_id = disk.logical_id
407     return
408
409   @locking.ssynchronized(_config_lock)
410   def SetDiskID(self, disk, node_name):
411     """Convert the unique ID to the ID needed on the target nodes.
412
413     This is used only for drbd, which needs ip/port configuration.
414
415     The routine descends down and updates its children also, because
416     this helps when the only the top device is passed to the remote
417     node.
418
419     """
420     return self._UnlockedSetDiskID(disk, node_name)
421
422   @locking.ssynchronized(_config_lock)
423   def AddTcpUdpPort(self, port):
424     """Adds a new port to the available port pool.
425
426     """
427     if not isinstance(port, int):
428       raise errors.ProgrammerError("Invalid type passed for port")
429
430     self._config_data.cluster.tcpudp_port_pool.add(port)
431     self._WriteConfig()
432
433   @locking.ssynchronized(_config_lock, shared=1)
434   def GetPortList(self):
435     """Returns a copy of the current port list.
436
437     """
438     return self._config_data.cluster.tcpudp_port_pool.copy()
439
440   @locking.ssynchronized(_config_lock)
441   def AllocatePort(self):
442     """Allocate a port.
443
444     The port will be taken from the available port pool or from the
445     default port range (and in this case we increase
446     highest_used_port).
447
448     """
449     # If there are TCP/IP ports configured, we use them first.
450     if self._config_data.cluster.tcpudp_port_pool:
451       port = self._config_data.cluster.tcpudp_port_pool.pop()
452     else:
453       port = self._config_data.cluster.highest_used_port + 1
454       if port >= constants.LAST_DRBD_PORT:
455         raise errors.ConfigurationError("The highest used port is greater"
456                                         " than %s. Aborting." %
457                                         constants.LAST_DRBD_PORT)
458       self._config_data.cluster.highest_used_port = port
459
460     self._WriteConfig()
461     return port
462
463   def _UnlockedComputeDRBDMap(self):
464     """Compute the used DRBD minor/nodes.
465
466     @rtype: (dict, list)
467     @return: dictionary of node_name: dict of minor: instance_name;
468         the returned dict will have all the nodes in it (even if with
469         an empty list), and a list of duplicates; if the duplicates
470         list is not empty, the configuration is corrupted and its caller
471         should raise an exception
472
473     """
474     def _AppendUsedPorts(instance_name, disk, used):
475       duplicates = []
476       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
477         nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
478         for node, port in ((nodeA, minorA), (nodeB, minorB)):
479           assert node in used, ("Node '%s' of instance '%s' not found"
480                                 " in node list" % (node, instance_name))
481           if port in used[node]:
482             duplicates.append((node, port, instance_name, used[node][port]))
483           else:
484             used[node][port] = instance_name
485       if disk.children:
486         for child in disk.children:
487           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
488       return duplicates
489
490     duplicates = []
491     my_dict = dict((node, {}) for node in self._config_data.nodes)
492     for instance in self._config_data.instances.itervalues():
493       for disk in instance.disks:
494         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
495     for (node, minor), instance in self._temporary_drbds.iteritems():
496       if minor in my_dict[node] and my_dict[node][minor] != instance:
497         duplicates.append((node, minor, instance, my_dict[node][minor]))
498       else:
499         my_dict[node][minor] = instance
500     return my_dict, duplicates
501
502   @locking.ssynchronized(_config_lock)
503   def ComputeDRBDMap(self):
504     """Compute the used DRBD minor/nodes.
505
506     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
507
508     @return: dictionary of node_name: dict of minor: instance_name;
509         the returned dict will have all the nodes in it (even if with
510         an empty list).
511
512     """
513     d_map, duplicates = self._UnlockedComputeDRBDMap()
514     if duplicates:
515       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
516                                       str(duplicates))
517     return d_map
518
519   @locking.ssynchronized(_config_lock)
520   def AllocateDRBDMinor(self, nodes, instance):
521     """Allocate a drbd minor.
522
523     The free minor will be automatically computed from the existing
524     devices. A node can be given multiple times in order to allocate
525     multiple minors. The result is the list of minors, in the same
526     order as the passed nodes.
527
528     @type instance: string
529     @param instance: the instance for which we allocate minors
530
531     """
532     assert isinstance(instance, basestring), \
533            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
534
535     d_map, duplicates = self._UnlockedComputeDRBDMap()
536     if duplicates:
537       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
538                                       str(duplicates))
539     result = []
540     for nname in nodes:
541       ndata = d_map[nname]
542       if not ndata:
543         # no minors used, we can start at 0
544         result.append(0)
545         ndata[0] = instance
546         self._temporary_drbds[(nname, 0)] = instance
547         continue
548       keys = ndata.keys()
549       keys.sort()
550       ffree = utils.FirstFree(keys)
551       if ffree is None:
552         # return the next minor
553         # TODO: implement high-limit check
554         minor = keys[-1] + 1
555       else:
556         minor = ffree
557       # double-check minor against current instances
558       assert minor not in d_map[nname], \
559              ("Attempt to reuse allocated DRBD minor %d on node %s,"
560               " already allocated to instance %s" %
561               (minor, nname, d_map[nname][minor]))
562       ndata[minor] = instance
563       # double-check minor against reservation
564       r_key = (nname, minor)
565       assert r_key not in self._temporary_drbds, \
566              ("Attempt to reuse reserved DRBD minor %d on node %s,"
567               " reserved for instance %s" %
568               (minor, nname, self._temporary_drbds[r_key]))
569       self._temporary_drbds[r_key] = instance
570       result.append(minor)
571     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
572                   nodes, result)
573     return result
574
575   def _UnlockedReleaseDRBDMinors(self, instance):
576     """Release temporary drbd minors allocated for a given instance.
577
578     @type instance: string
579     @param instance: the instance for which temporary minors should be
580                      released
581
582     """
583     assert isinstance(instance, basestring), \
584            "Invalid argument passed to ReleaseDRBDMinors"
585     for key, name in self._temporary_drbds.items():
586       if name == instance:
587         del self._temporary_drbds[key]
588
589   @locking.ssynchronized(_config_lock)
590   def ReleaseDRBDMinors(self, instance):
591     """Release temporary drbd minors allocated for a given instance.
592
593     This should be called on the error paths, on the success paths
594     it's automatically called by the ConfigWriter add and update
595     functions.
596
597     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
598
599     @type instance: string
600     @param instance: the instance for which temporary minors should be
601                      released
602
603     """
604     self._UnlockedReleaseDRBDMinors(instance)
605
606   @locking.ssynchronized(_config_lock, shared=1)
607   def GetConfigVersion(self):
608     """Get the configuration version.
609
610     @return: Config version
611
612     """
613     return self._config_data.version
614
615   @locking.ssynchronized(_config_lock, shared=1)
616   def GetClusterName(self):
617     """Get cluster name.
618
619     @return: Cluster name
620
621     """
622     return self._config_data.cluster.cluster_name
623
624   @locking.ssynchronized(_config_lock, shared=1)
625   def GetMasterNode(self):
626     """Get the hostname of the master node for this cluster.
627
628     @return: Master hostname
629
630     """
631     return self._config_data.cluster.master_node
632
633   @locking.ssynchronized(_config_lock, shared=1)
634   def GetMasterIP(self):
635     """Get the IP of the master node for this cluster.
636
637     @return: Master IP
638
639     """
640     return self._config_data.cluster.master_ip
641
642   @locking.ssynchronized(_config_lock, shared=1)
643   def GetMasterNetdev(self):
644     """Get the master network device for this cluster.
645
646     """
647     return self._config_data.cluster.master_netdev
648
649   @locking.ssynchronized(_config_lock, shared=1)
650   def GetFileStorageDir(self):
651     """Get the file storage dir for this cluster.
652
653     """
654     return self._config_data.cluster.file_storage_dir
655
656   @locking.ssynchronized(_config_lock, shared=1)
657   def GetHypervisorType(self):
658     """Get the hypervisor type for this cluster.
659
660     """
661     return self._config_data.cluster.default_hypervisor
662
663   @locking.ssynchronized(_config_lock, shared=1)
664   def GetHostKey(self):
665     """Return the rsa hostkey from the config.
666
667     @rtype: string
668     @return: the rsa hostkey
669
670     """
671     return self._config_data.cluster.rsahostkeypub
672
673   @locking.ssynchronized(_config_lock)
674   def AddInstance(self, instance):
675     """Add an instance to the config.
676
677     This should be used after creating a new instance.
678
679     @type instance: L{objects.Instance}
680     @param instance: the instance object
681
682     """
683     if not isinstance(instance, objects.Instance):
684       raise errors.ProgrammerError("Invalid type passed to AddInstance")
685
686     if instance.disk_template != constants.DT_DISKLESS:
687       all_lvs = instance.MapLVsByNode()
688       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
689
690     all_macs = self._AllMACs()
691     for nic in instance.nics:
692       if nic.mac in all_macs:
693         raise errors.ConfigurationError("Cannot add instance %s:"
694           " MAC address '%s' already in use." % (instance.name, nic.mac))
695
696     instance.serial_no = 1
697     self._config_data.instances[instance.name] = instance
698     self._config_data.cluster.serial_no += 1
699     self._UnlockedReleaseDRBDMinors(instance.name)
700     for nic in instance.nics:
701       self._temporary_macs.discard(nic.mac)
702     self._WriteConfig()
703
704   def _SetInstanceStatus(self, instance_name, status):
705     """Set the instance's status to a given value.
706
707     """
708     assert isinstance(status, bool), \
709            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
710
711     if instance_name not in self._config_data.instances:
712       raise errors.ConfigurationError("Unknown instance '%s'" %
713                                       instance_name)
714     instance = self._config_data.instances[instance_name]
715     if instance.admin_up != status:
716       instance.admin_up = status
717       instance.serial_no += 1
718       self._WriteConfig()
719
720   @locking.ssynchronized(_config_lock)
721   def MarkInstanceUp(self, instance_name):
722     """Mark the instance status to up in the config.
723
724     """
725     self._SetInstanceStatus(instance_name, True)
726
727   @locking.ssynchronized(_config_lock)
728   def RemoveInstance(self, instance_name):
729     """Remove the instance from the configuration.
730
731     """
732     if instance_name not in self._config_data.instances:
733       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
734     del self._config_data.instances[instance_name]
735     self._config_data.cluster.serial_no += 1
736     self._WriteConfig()
737
738   @locking.ssynchronized(_config_lock)
739   def RenameInstance(self, old_name, new_name):
740     """Rename an instance.
741
742     This needs to be done in ConfigWriter and not by RemoveInstance
743     combined with AddInstance as only we can guarantee an atomic
744     rename.
745
746     """
747     if old_name not in self._config_data.instances:
748       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
749     inst = self._config_data.instances[old_name]
750     del self._config_data.instances[old_name]
751     inst.name = new_name
752
753     for disk in inst.disks:
754       if disk.dev_type == constants.LD_FILE:
755         # rename the file paths in logical and physical id
756         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
757         disk.physical_id = disk.logical_id = (disk.logical_id[0],
758                                               os.path.join(file_storage_dir,
759                                                            inst.name,
760                                                            disk.iv_name))
761
762     self._config_data.instances[inst.name] = inst
763     self._WriteConfig()
764
765   @locking.ssynchronized(_config_lock)
766   def MarkInstanceDown(self, instance_name):
767     """Mark the status of an instance to down in the configuration.
768
769     """
770     self._SetInstanceStatus(instance_name, False)
771
772   def _UnlockedGetInstanceList(self):
773     """Get the list of instances.
774
775     This function is for internal use, when the config lock is already held.
776
777     """
778     return self._config_data.instances.keys()
779
780   @locking.ssynchronized(_config_lock, shared=1)
781   def GetInstanceList(self):
782     """Get the list of instances.
783
784     @return: array of instances, ex. ['instance2.example.com',
785         'instance1.example.com']
786
787     """
788     return self._UnlockedGetInstanceList()
789
790   @locking.ssynchronized(_config_lock, shared=1)
791   def ExpandInstanceName(self, short_name):
792     """Attempt to expand an incomplete instance name.
793
794     """
795     return utils.MatchNameComponent(short_name,
796                                     self._config_data.instances.keys())
797
798   def _UnlockedGetInstanceInfo(self, instance_name):
799     """Returns informations about an instance.
800
801     This function is for internal use, when the config lock is already held.
802
803     """
804     if instance_name not in self._config_data.instances:
805       return None
806
807     return self._config_data.instances[instance_name]
808
809   @locking.ssynchronized(_config_lock, shared=1)
810   def GetInstanceInfo(self, instance_name):
811     """Returns informations about an instance.
812
813     It takes the information from the configuration file. Other informations of
814     an instance are taken from the live systems.
815
816     @param instance_name: name of the instance, e.g.
817         I{instance1.example.com}
818
819     @rtype: L{objects.Instance}
820     @return: the instance object
821
822     """
823     return self._UnlockedGetInstanceInfo(instance_name)
824
825   @locking.ssynchronized(_config_lock, shared=1)
826   def GetAllInstancesInfo(self):
827     """Get the configuration of all instances.
828
829     @rtype: dict
830     @return: dict of (instance, instance_info), where instance_info is what
831               would GetInstanceInfo return for the node
832
833     """
834     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
835                     for instance in self._UnlockedGetInstanceList()])
836     return my_dict
837
838   @locking.ssynchronized(_config_lock)
839   def AddNode(self, node):
840     """Add a node to the configuration.
841
842     @type node: L{objects.Node}
843     @param node: a Node instance
844
845     """
846     logging.info("Adding node %s to configuration" % node.name)
847
848     node.serial_no = 1
849     self._config_data.nodes[node.name] = node
850     self._config_data.cluster.serial_no += 1
851     self._WriteConfig()
852
853   @locking.ssynchronized(_config_lock)
854   def RemoveNode(self, node_name):
855     """Remove a node from the configuration.
856
857     """
858     logging.info("Removing node %s from configuration" % node_name)
859
860     if node_name not in self._config_data.nodes:
861       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
862
863     del self._config_data.nodes[node_name]
864     self._config_data.cluster.serial_no += 1
865     self._WriteConfig()
866
867   @locking.ssynchronized(_config_lock, shared=1)
868   def ExpandNodeName(self, short_name):
869     """Attempt to expand an incomplete instance name.
870
871     """
872     return utils.MatchNameComponent(short_name,
873                                     self._config_data.nodes.keys())
874
875   def _UnlockedGetNodeInfo(self, node_name):
876     """Get the configuration of a node, as stored in the config.
877
878     This function is for internal use, when the config lock is already
879     held.
880
881     @param node_name: the node name, e.g. I{node1.example.com}
882
883     @rtype: L{objects.Node}
884     @return: the node object
885
886     """
887     if node_name not in self._config_data.nodes:
888       return None
889
890     return self._config_data.nodes[node_name]
891
892
893   @locking.ssynchronized(_config_lock, shared=1)
894   def GetNodeInfo(self, node_name):
895     """Get the configuration of a node, as stored in the config.
896
897     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
898
899     @param node_name: the node name, e.g. I{node1.example.com}
900
901     @rtype: L{objects.Node}
902     @return: the node object
903
904     """
905     return self._UnlockedGetNodeInfo(node_name)
906
907   def _UnlockedGetNodeList(self):
908     """Return the list of nodes which are in the configuration.
909
910     This function is for internal use, when the config lock is already
911     held.
912
913     @rtype: list
914
915     """
916     return self._config_data.nodes.keys()
917
918
919   @locking.ssynchronized(_config_lock, shared=1)
920   def GetNodeList(self):
921     """Return the list of nodes which are in the configuration.
922
923     """
924     return self._UnlockedGetNodeList()
925
926   @locking.ssynchronized(_config_lock, shared=1)
927   def GetOnlineNodeList(self):
928     """Return the list of nodes which are online.
929
930     """
931     all_nodes = [self._UnlockedGetNodeInfo(node)
932                  for node in self._UnlockedGetNodeList()]
933     return [node.name for node in all_nodes if not node.offline]
934
935   @locking.ssynchronized(_config_lock, shared=1)
936   def GetAllNodesInfo(self):
937     """Get the configuration of all nodes.
938
939     @rtype: dict
940     @return: dict of (node, node_info), where node_info is what
941               would GetNodeInfo return for the node
942
943     """
944     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
945                     for node in self._UnlockedGetNodeList()])
946     return my_dict
947
948   def _UnlockedGetMasterCandidateStats(self):
949     """Get the number of current and maximum desired and possible candidates.
950
951     @rtype: tuple
952     @return: tuple of (current, desired and possible)
953
954     """
955     mc_now = mc_max = 0
956     for node in self._config_data.nodes.itervalues():
957       if not (node.offline or node.drained):
958         mc_max += 1
959       if node.master_candidate:
960         mc_now += 1
961     mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
962     return (mc_now, mc_max)
963
964   @locking.ssynchronized(_config_lock, shared=1)
965   def GetMasterCandidateStats(self):
966     """Get the number of current and maximum possible candidates.
967
968     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
969
970     @rtype: tuple
971     @return: tuple of (current, max)
972
973     """
974     return self._UnlockedGetMasterCandidateStats()
975
976   @locking.ssynchronized(_config_lock)
977   def MaintainCandidatePool(self):
978     """Try to grow the candidate pool to the desired size.
979
980     @rtype: list
981     @return: list with the adjusted nodes (L{objects.Node} instances)
982
983     """
984     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
985     mod_list = []
986     if mc_now < mc_max:
987       node_list = self._config_data.nodes.keys()
988       random.shuffle(node_list)
989       for name in node_list:
990         if mc_now >= mc_max:
991           break
992         node = self._config_data.nodes[name]
993         if node.master_candidate or node.offline or node.drained:
994           continue
995         mod_list.append(node)
996         node.master_candidate = True
997         node.serial_no += 1
998         mc_now += 1
999       if mc_now != mc_max:
1000         # this should not happen
1001         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1002                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1003       if mod_list:
1004         self._config_data.cluster.serial_no += 1
1005         self._WriteConfig()
1006
1007     return mod_list
1008
1009   def _BumpSerialNo(self):
1010     """Bump up the serial number of the config.
1011
1012     """
1013     self._config_data.serial_no += 1
1014
1015   def _OpenConfig(self):
1016     """Read the config data from disk.
1017
1018     """
1019     f = open(self._cfg_file, 'r')
1020     try:
1021       try:
1022         data = objects.ConfigData.FromDict(serializer.Load(f.read()))
1023       except Exception, err:
1024         raise errors.ConfigurationError(err)
1025     finally:
1026       f.close()
1027
1028     # Make sure the configuration has the right version
1029     _ValidateConfig(data)
1030
1031     if (not hasattr(data, 'cluster') or
1032         not hasattr(data.cluster, 'rsahostkeypub')):
1033       raise errors.ConfigurationError("Incomplete configuration"
1034                                       " (missing cluster.rsahostkeypub)")
1035     self._config_data = data
1036     # reset the last serial as -1 so that the next write will cause
1037     # ssconf update
1038     self._last_cluster_serial = -1
1039
1040   def _DistributeConfig(self):
1041     """Distribute the configuration to the other nodes.
1042
1043     Currently, this only copies the configuration file. In the future,
1044     it could be used to encapsulate the 2/3-phase update mechanism.
1045
1046     """
1047     if self._offline:
1048       return True
1049     bad = False
1050
1051     node_list = []
1052     addr_list = []
1053     myhostname = self._my_hostname
1054     # we can skip checking whether _UnlockedGetNodeInfo returns None
1055     # since the node list comes from _UnlocketGetNodeList, and we are
1056     # called with the lock held, so no modifications should take place
1057     # in between
1058     for node_name in self._UnlockedGetNodeList():
1059       if node_name == myhostname:
1060         continue
1061       node_info = self._UnlockedGetNodeInfo(node_name)
1062       if not node_info.master_candidate:
1063         continue
1064       node_list.append(node_info.name)
1065       addr_list.append(node_info.primary_ip)
1066
1067     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1068                                             address_list=addr_list)
1069     for node in node_list:
1070       if not result[node]:
1071         logging.error("copy of file %s to node %s failed",
1072                       self._cfg_file, node)
1073         bad = True
1074     return not bad
1075
1076   def _WriteConfig(self, destination=None):
1077     """Write the configuration data to persistent storage.
1078
1079     """
1080     config_errors = self._UnlockedVerifyConfig()
1081     if config_errors:
1082       raise errors.ConfigurationError("Configuration data is not"
1083                                       " consistent: %s" %
1084                                       (", ".join(config_errors)))
1085     if destination is None:
1086       destination = self._cfg_file
1087     self._BumpSerialNo()
1088     txt = serializer.Dump(self._config_data.ToDict())
1089     dir_name, file_name = os.path.split(destination)
1090     fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1091     f = os.fdopen(fd, 'w')
1092     try:
1093       f.write(txt)
1094       os.fsync(f.fileno())
1095     finally:
1096       f.close()
1097     # we don't need to do os.close(fd) as f.close() did it
1098     os.rename(name, destination)
1099     self.write_count += 1
1100
1101     # and redistribute the config file to master candidates
1102     self._DistributeConfig()
1103
1104     # Write ssconf files on all nodes (including locally)
1105     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1106       if not self._offline:
1107         rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1108                                               self._UnlockedGetSsconfValues())
1109       self._last_cluster_serial = self._config_data.cluster.serial_no
1110
1111   def _UnlockedGetSsconfValues(self):
1112     """Return the values needed by ssconf.
1113
1114     @rtype: dict
1115     @return: a dictionary with keys the ssconf names and values their
1116         associated value
1117
1118     """
1119     fn = "\n".join
1120     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1121     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1122     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1123
1124     instance_data = fn(instance_names)
1125     off_data = fn(node.name for node in node_info if node.offline)
1126     on_data = fn(node.name for node in node_info if not node.offline)
1127     mc_data = fn(node.name for node in node_info if node.master_candidate)
1128     node_data = fn(node_names)
1129
1130     cluster = self._config_data.cluster
1131     return {
1132       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1133       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1134       constants.SS_MASTER_CANDIDATES: mc_data,
1135       constants.SS_MASTER_IP: cluster.master_ip,
1136       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1137       constants.SS_MASTER_NODE: cluster.master_node,
1138       constants.SS_NODE_LIST: node_data,
1139       constants.SS_OFFLINE_NODES: off_data,
1140       constants.SS_ONLINE_NODES: on_data,
1141       constants.SS_INSTANCE_LIST: instance_data,
1142       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1143       }
1144
1145   @locking.ssynchronized(_config_lock)
1146   def InitConfig(self, version, cluster_config, master_node_config):
1147     """Create the initial cluster configuration.
1148
1149     It will contain the current node, which will also be the master
1150     node, and no instances.
1151
1152     @type version: int
1153     @param version: Configuration version
1154     @type cluster_config: objects.Cluster
1155     @param cluster_config: Cluster configuration
1156     @type master_node_config: objects.Node
1157     @param master_node_config: Master node configuration
1158
1159     """
1160     nodes = {
1161       master_node_config.name: master_node_config,
1162       }
1163
1164     self._config_data = objects.ConfigData(version=version,
1165                                            cluster=cluster_config,
1166                                            nodes=nodes,
1167                                            instances={},
1168                                            serial_no=1)
1169     self._WriteConfig()
1170
1171   @locking.ssynchronized(_config_lock, shared=1)
1172   def GetVGName(self):
1173     """Return the volume group name.
1174
1175     """
1176     return self._config_data.cluster.volume_group_name
1177
1178   @locking.ssynchronized(_config_lock)
1179   def SetVGName(self, vg_name):
1180     """Set the volume group name.
1181
1182     """
1183     self._config_data.cluster.volume_group_name = vg_name
1184     self._config_data.cluster.serial_no += 1
1185     self._WriteConfig()
1186
1187   @locking.ssynchronized(_config_lock, shared=1)
1188   def GetDefBridge(self):
1189     """Return the default bridge.
1190
1191     """
1192     return self._config_data.cluster.default_bridge
1193
1194   @locking.ssynchronized(_config_lock, shared=1)
1195   def GetMACPrefix(self):
1196     """Return the mac prefix.
1197
1198     """
1199     return self._config_data.cluster.mac_prefix
1200
1201   @locking.ssynchronized(_config_lock, shared=1)
1202   def GetClusterInfo(self):
1203     """Returns informations about the cluster
1204
1205     @rtype: L{objects.Cluster}
1206     @return: the cluster object
1207
1208     """
1209     return self._config_data.cluster
1210
1211   @locking.ssynchronized(_config_lock)
1212   def Update(self, target):
1213     """Notify function to be called after updates.
1214
1215     This function must be called when an object (as returned by
1216     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1217     caller wants the modifications saved to the backing store. Note
1218     that all modified objects will be saved, but the target argument
1219     is the one the caller wants to ensure that it's saved.
1220
1221     @param target: an instance of either L{objects.Cluster},
1222         L{objects.Node} or L{objects.Instance} which is existing in
1223         the cluster
1224
1225     """
1226     if self._config_data is None:
1227       raise errors.ProgrammerError("Configuration file not read,"
1228                                    " cannot save.")
1229     update_serial = False
1230     if isinstance(target, objects.Cluster):
1231       test = target == self._config_data.cluster
1232     elif isinstance(target, objects.Node):
1233       test = target in self._config_data.nodes.values()
1234       update_serial = True
1235     elif isinstance(target, objects.Instance):
1236       test = target in self._config_data.instances.values()
1237     else:
1238       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1239                                    " ConfigWriter.Update" % type(target))
1240     if not test:
1241       raise errors.ConfigurationError("Configuration updated since object"
1242                                       " has been read or unknown object")
1243     target.serial_no += 1
1244
1245     if update_serial:
1246       # for node updates, we need to increase the cluster serial too
1247       self._config_data.cluster.serial_no += 1
1248
1249     if isinstance(target, objects.Instance):
1250       self._UnlockedReleaseDRBDMinors(target.name)
1251       for nic in target.nics:
1252         self._temporary_macs.discard(nic.mac)
1253
1254     self._WriteConfig()