Prevent race condition on MAC addresses
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import tempfile
36 import random
37 import logging
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46
47
48 _config_lock = locking.SharedLock()
49
50
51 def _ValidateConfig(data):
52   """Verifies that a configuration objects looks valid.
53
54   This only verifies the version of the configuration.
55
56   @raise errors.ConfigurationError: if the version differs from what
57       we expect
58
59   """
60   if data.version != constants.CONFIG_VERSION:
61     raise errors.ConfigurationError("Cluster configuration version"
62                                     " mismatch, got %s instead of %s" %
63                                     (data.version,
64                                      constants.CONFIG_VERSION))
65
66
67 class ConfigWriter:
68   """The interface to the cluster configuration.
69
70   """
71   def __init__(self, cfg_file=None, offline=False):
72     self.write_count = 0
73     self._lock = _config_lock
74     self._config_data = None
75     self._offline = offline
76     if cfg_file is None:
77       self._cfg_file = constants.CLUSTER_CONF_FILE
78     else:
79       self._cfg_file = cfg_file
80     self._temporary_ids = set()
81     self._temporary_drbds = {}
82     self._temporary_macs = set()
83     # Note: in order to prevent errors when resolving our name in
84     # _DistributeConfig, we compute it here once and reuse it; it's
85     # better to raise an error before starting to modify the config
86     # file than after it was modified
87     self._my_hostname = utils.HostInfo().name
88     self._last_cluster_serial = -1
89     self._OpenConfig()
90
91   # this method needs to be static, so that we can call it on the class
92   @staticmethod
93   def IsCluster():
94     """Check if the cluster is configured.
95
96     """
97     return os.path.exists(constants.CLUSTER_CONF_FILE)
98
99   @locking.ssynchronized(_config_lock, shared=1)
100   def GenerateMAC(self):
101     """Generate a MAC for an instance.
102
103     This should check the current instances for duplicates.
104
105     """
106     prefix = self._config_data.cluster.mac_prefix
107     all_macs = self._AllMACs()
108     retries = 64
109     while retries > 0:
110       byte1 = random.randrange(0, 256)
111       byte2 = random.randrange(0, 256)
112       byte3 = random.randrange(0, 256)
113       mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
114       if mac not in all_macs and mac not in self._temporary_macs:
115         break
116       retries -= 1
117     else:
118       raise errors.ConfigurationError("Can't generate unique MAC")
119     self._temporary_macs.add(mac)
120     return mac
121
122   @locking.ssynchronized(_config_lock, shared=1)
123   def IsMacInUse(self, mac):
124     """Predicate: check if the specified MAC is in use in the Ganeti cluster.
125
126     This only checks instances managed by this cluster, it does not
127     check for potential collisions elsewhere.
128
129     """
130     all_macs = self._AllMACs()
131     return mac in all_macs or mac in self._temporary_macs
132
133   @locking.ssynchronized(_config_lock, shared=1)
134   def GenerateDRBDSecret(self):
135     """Generate a DRBD secret.
136
137     This checks the current disks for duplicates.
138
139     """
140     all_secrets = self._AllDRBDSecrets()
141     retries = 64
142     while retries > 0:
143       secret = utils.GenerateSecret()
144       if secret not in all_secrets:
145         break
146       retries -= 1
147     else:
148       raise errors.ConfigurationError("Can't generate unique DRBD secret")
149     return secret
150
151   def _ComputeAllLVs(self):
152     """Compute the list of all LVs.
153
154     """
155     lvnames = set()
156     for instance in self._config_data.instances.values():
157       node_data = instance.MapLVsByNode()
158       for lv_list in node_data.values():
159         lvnames.update(lv_list)
160     return lvnames
161
162   @locking.ssynchronized(_config_lock, shared=1)
163   def GenerateUniqueID(self, exceptions=None):
164     """Generate an unique disk name.
165
166     This checks the current node, instances and disk names for
167     duplicates.
168
169     @param exceptions: a list with some other names which should be checked
170         for uniqueness (used for example when you want to get
171         more than one id at one time without adding each one in
172         turn to the config file)
173
174     @rtype: string
175     @return: the unique id
176
177     """
178     existing = set()
179     existing.update(self._temporary_ids)
180     existing.update(self._ComputeAllLVs())
181     existing.update(self._config_data.instances.keys())
182     existing.update(self._config_data.nodes.keys())
183     if exceptions is not None:
184       existing.update(exceptions)
185     retries = 64
186     while retries > 0:
187       unique_id = utils.NewUUID()
188       if unique_id not in existing and unique_id is not None:
189         break
190     else:
191       raise errors.ConfigurationError("Not able generate an unique ID"
192                                       " (last tried ID: %s" % unique_id)
193     self._temporary_ids.add(unique_id)
194     return unique_id
195
196   def _AllMACs(self):
197     """Return all MACs present in the config.
198
199     @rtype: list
200     @return: the list of all MACs
201
202     """
203     result = []
204     for instance in self._config_data.instances.values():
205       for nic in instance.nics:
206         result.append(nic.mac)
207
208     return result
209
210   def _AllDRBDSecrets(self):
211     """Return all DRBD secrets present in the config.
212
213     @rtype: list
214     @return: the list of all DRBD secrets
215
216     """
217     def helper(disk, result):
218       """Recursively gather secrets from this disk."""
219       if disk.dev_type == constants.DT_DRBD8:
220         result.append(disk.logical_id[5])
221       if disk.children:
222         for child in disk.children:
223           helper(child, result)
224
225     result = []
226     for instance in self._config_data.instances.values():
227       for disk in instance.disks:
228         helper(disk, result)
229
230     return result
231
232   def _CheckDiskIDs(self, disk, l_ids, p_ids):
233     """Compute duplicate disk IDs
234
235     @type disk: L{objects.Disk}
236     @param disk: the disk at which to start searching
237     @type l_ids: list
238     @param l_ids: list of current logical ids
239     @type p_ids: list
240     @param p_ids: list of current physical ids
241     @rtype: list
242     @return: a list of error messages
243
244     """
245     result = []
246     if disk.logical_id in l_ids:
247       result.append("duplicate logical id %s" % str(disk.logical_id))
248     else:
249       l_ids.append(disk.logical_id)
250     if disk.physical_id in p_ids:
251       result.append("duplicate physical id %s" % str(disk.physical_id))
252     else:
253       p_ids.append(disk.physical_id)
254
255     if disk.children:
256       for child in disk.children:
257         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
258     return result
259
260   def _UnlockedVerifyConfig(self):
261     """Verify function.
262
263     @rtype: list
264     @return: a list of error messages; a non-empty list signifies
265         configuration errors
266
267     """
268     result = []
269     seen_macs = []
270     ports = {}
271     data = self._config_data
272     seen_lids = []
273     seen_pids = []
274     for instance_name in data.instances:
275       instance = data.instances[instance_name]
276       if instance.primary_node not in data.nodes:
277         result.append("instance '%s' has invalid primary node '%s'" %
278                       (instance_name, instance.primary_node))
279       for snode in instance.secondary_nodes:
280         if snode not in data.nodes:
281           result.append("instance '%s' has invalid secondary node '%s'" %
282                         (instance_name, snode))
283       for idx, nic in enumerate(instance.nics):
284         if nic.mac in seen_macs:
285           result.append("instance '%s' has NIC %d mac %s duplicate" %
286                         (instance_name, idx, nic.mac))
287         else:
288           seen_macs.append(nic.mac)
289
290       # gather the drbd ports for duplicate checks
291       for dsk in instance.disks:
292         if dsk.dev_type in constants.LDS_DRBD:
293           tcp_port = dsk.logical_id[2]
294           if tcp_port not in ports:
295             ports[tcp_port] = []
296           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
297       # gather network port reservation
298       net_port = getattr(instance, "network_port", None)
299       if net_port is not None:
300         if net_port not in ports:
301           ports[net_port] = []
302         ports[net_port].append((instance.name, "network port"))
303
304       # instance disk verify
305       for idx, disk in enumerate(instance.disks):
306         result.extend(["instance '%s' disk %d error: %s" %
307                        (instance.name, idx, msg) for msg in disk.Verify()])
308         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
309
310     # cluster-wide pool of free ports
311     for free_port in data.cluster.tcpudp_port_pool:
312       if free_port not in ports:
313         ports[free_port] = []
314       ports[free_port].append(("cluster", "port marked as free"))
315
316     # compute tcp/udp duplicate ports
317     keys = ports.keys()
318     keys.sort()
319     for pnum in keys:
320       pdata = ports[pnum]
321       if len(pdata) > 1:
322         txt = ", ".join(["%s/%s" % val for val in pdata])
323         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
324
325     # highest used tcp port check
326     if keys:
327       if keys[-1] > data.cluster.highest_used_port:
328         result.append("Highest used port mismatch, saved %s, computed %s" %
329                       (data.cluster.highest_used_port, keys[-1]))
330
331     if not data.nodes[data.cluster.master_node].master_candidate:
332       result.append("Master node is not a master candidate")
333
334     # master candidate checks
335     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
336     if mc_now < mc_max:
337       result.append("Not enough master candidates: actual %d, target %d" %
338                     (mc_now, mc_max))
339
340     # node checks
341     for node in data.nodes.values():
342       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
343         result.append("Node %s state is invalid: master_candidate=%s,"
344                       " drain=%s, offline=%s" %
345                       (node.name, node.master_candidate, node.drain,
346                        node.offline))
347
348     # drbd minors check
349     d_map, duplicates = self._UnlockedComputeDRBDMap()
350     for node, minor, instance_a, instance_b in duplicates:
351       result.append("DRBD minor %d on node %s is assigned twice to instances"
352                     " %s and %s" % (minor, node, instance_a, instance_b))
353
354     return result
355
356   @locking.ssynchronized(_config_lock, shared=1)
357   def VerifyConfig(self):
358     """Verify function.
359
360     This is just a wrapper over L{_UnlockedVerifyConfig}.
361
362     @rtype: list
363     @return: a list of error messages; a non-empty list signifies
364         configuration errors
365
366     """
367     return self._UnlockedVerifyConfig()
368
369   def _UnlockedSetDiskID(self, disk, node_name):
370     """Convert the unique ID to the ID needed on the target nodes.
371
372     This is used only for drbd, which needs ip/port configuration.
373
374     The routine descends down and updates its children also, because
375     this helps when the only the top device is passed to the remote
376     node.
377
378     This function is for internal use, when the config lock is already held.
379
380     """
381     if disk.children:
382       for child in disk.children:
383         self._UnlockedSetDiskID(child, node_name)
384
385     if disk.logical_id is None and disk.physical_id is not None:
386       return
387     if disk.dev_type == constants.LD_DRBD8:
388       pnode, snode, port, pminor, sminor, secret = disk.logical_id
389       if node_name not in (pnode, snode):
390         raise errors.ConfigurationError("DRBD device not knowing node %s" %
391                                         node_name)
392       pnode_info = self._UnlockedGetNodeInfo(pnode)
393       snode_info = self._UnlockedGetNodeInfo(snode)
394       if pnode_info is None or snode_info is None:
395         raise errors.ConfigurationError("Can't find primary or secondary node"
396                                         " for %s" % str(disk))
397       p_data = (pnode_info.secondary_ip, port)
398       s_data = (snode_info.secondary_ip, port)
399       if pnode == node_name:
400         disk.physical_id = p_data + s_data + (pminor, secret)
401       else: # it must be secondary, we tested above
402         disk.physical_id = s_data + p_data + (sminor, secret)
403     else:
404       disk.physical_id = disk.logical_id
405     return
406
407   @locking.ssynchronized(_config_lock)
408   def SetDiskID(self, disk, node_name):
409     """Convert the unique ID to the ID needed on the target nodes.
410
411     This is used only for drbd, which needs ip/port configuration.
412
413     The routine descends down and updates its children also, because
414     this helps when the only the top device is passed to the remote
415     node.
416
417     """
418     return self._UnlockedSetDiskID(disk, node_name)
419
420   @locking.ssynchronized(_config_lock)
421   def AddTcpUdpPort(self, port):
422     """Adds a new port to the available port pool.
423
424     """
425     if not isinstance(port, int):
426       raise errors.ProgrammerError("Invalid type passed for port")
427
428     self._config_data.cluster.tcpudp_port_pool.add(port)
429     self._WriteConfig()
430
431   @locking.ssynchronized(_config_lock, shared=1)
432   def GetPortList(self):
433     """Returns a copy of the current port list.
434
435     """
436     return self._config_data.cluster.tcpudp_port_pool.copy()
437
438   @locking.ssynchronized(_config_lock)
439   def AllocatePort(self):
440     """Allocate a port.
441
442     The port will be taken from the available port pool or from the
443     default port range (and in this case we increase
444     highest_used_port).
445
446     """
447     # If there are TCP/IP ports configured, we use them first.
448     if self._config_data.cluster.tcpudp_port_pool:
449       port = self._config_data.cluster.tcpudp_port_pool.pop()
450     else:
451       port = self._config_data.cluster.highest_used_port + 1
452       if port >= constants.LAST_DRBD_PORT:
453         raise errors.ConfigurationError("The highest used port is greater"
454                                         " than %s. Aborting." %
455                                         constants.LAST_DRBD_PORT)
456       self._config_data.cluster.highest_used_port = port
457
458     self._WriteConfig()
459     return port
460
461   def _UnlockedComputeDRBDMap(self):
462     """Compute the used DRBD minor/nodes.
463
464     @rtype: (dict, list)
465     @return: dictionary of node_name: dict of minor: instance_name;
466         the returned dict will have all the nodes in it (even if with
467         an empty list), and a list of duplicates; if the duplicates
468         list is not empty, the configuration is corrupted and its caller
469         should raise an exception
470
471     """
472     def _AppendUsedPorts(instance_name, disk, used):
473       duplicates = []
474       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
475         nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
476         for node, port in ((nodeA, minorA), (nodeB, minorB)):
477           assert node in used, ("Node '%s' of instance '%s' not found"
478                                 " in node list" % (node, instance_name))
479           if port in used[node]:
480             duplicates.append((node, port, instance_name, used[node][port]))
481           else:
482             used[node][port] = instance_name
483       if disk.children:
484         for child in disk.children:
485           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
486       return duplicates
487
488     duplicates = []
489     my_dict = dict((node, {}) for node in self._config_data.nodes)
490     for instance in self._config_data.instances.itervalues():
491       for disk in instance.disks:
492         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
493     for (node, minor), instance in self._temporary_drbds.iteritems():
494       if minor in my_dict[node] and my_dict[node][minor] != instance:
495         duplicates.append((node, minor, instance, my_dict[node][minor]))
496       else:
497         my_dict[node][minor] = instance
498     return my_dict, duplicates
499
500   @locking.ssynchronized(_config_lock)
501   def ComputeDRBDMap(self):
502     """Compute the used DRBD minor/nodes.
503
504     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
505
506     @return: dictionary of node_name: dict of minor: instance_name;
507         the returned dict will have all the nodes in it (even if with
508         an empty list).
509
510     """
511     d_map, duplicates = self._UnlockedComputeDRBDMap()
512     if duplicates:
513       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
514                                       str(duplicates))
515     return d_map
516
517   @locking.ssynchronized(_config_lock)
518   def AllocateDRBDMinor(self, nodes, instance):
519     """Allocate a drbd minor.
520
521     The free minor will be automatically computed from the existing
522     devices. A node can be given multiple times in order to allocate
523     multiple minors. The result is the list of minors, in the same
524     order as the passed nodes.
525
526     @type instance: string
527     @param instance: the instance for which we allocate minors
528
529     """
530     assert isinstance(instance, basestring), \
531            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
532
533     d_map, duplicates = self._UnlockedComputeDRBDMap()
534     if duplicates:
535       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
536                                       str(duplicates))
537     result = []
538     for nname in nodes:
539       ndata = d_map[nname]
540       if not ndata:
541         # no minors used, we can start at 0
542         result.append(0)
543         ndata[0] = instance
544         self._temporary_drbds[(nname, 0)] = instance
545         continue
546       keys = ndata.keys()
547       keys.sort()
548       ffree = utils.FirstFree(keys)
549       if ffree is None:
550         # return the next minor
551         # TODO: implement high-limit check
552         minor = keys[-1] + 1
553       else:
554         minor = ffree
555       # double-check minor against current instances
556       assert minor not in d_map[nname], \
557              ("Attempt to reuse allocated DRBD minor %d on node %s,"
558               " already allocated to instance %s" %
559               (minor, nname, d_map[nname][minor]))
560       ndata[minor] = instance
561       # double-check minor against reservation
562       r_key = (nname, minor)
563       assert r_key not in self._temporary_drbds, \
564              ("Attempt to reuse reserved DRBD minor %d on node %s,"
565               " reserved for instance %s" %
566               (minor, nname, self._temporary_drbds[r_key]))
567       self._temporary_drbds[r_key] = instance
568       result.append(minor)
569     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
570                   nodes, result)
571     return result
572
573   def _UnlockedReleaseDRBDMinors(self, instance):
574     """Release temporary drbd minors allocated for a given instance.
575
576     @type instance: string
577     @param instance: the instance for which temporary minors should be
578                      released
579
580     """
581     assert isinstance(instance, basestring), \
582            "Invalid argument passed to ReleaseDRBDMinors"
583     for key, name in self._temporary_drbds.items():
584       if name == instance:
585         del self._temporary_drbds[key]
586
587   @locking.ssynchronized(_config_lock)
588   def ReleaseDRBDMinors(self, instance):
589     """Release temporary drbd minors allocated for a given instance.
590
591     This should be called on the error paths, on the success paths
592     it's automatically called by the ConfigWriter add and update
593     functions.
594
595     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
596
597     @type instance: string
598     @param instance: the instance for which temporary minors should be
599                      released
600
601     """
602     self._UnlockedReleaseDRBDMinors(instance)
603
604   @locking.ssynchronized(_config_lock, shared=1)
605   def GetConfigVersion(self):
606     """Get the configuration version.
607
608     @return: Config version
609
610     """
611     return self._config_data.version
612
613   @locking.ssynchronized(_config_lock, shared=1)
614   def GetClusterName(self):
615     """Get cluster name.
616
617     @return: Cluster name
618
619     """
620     return self._config_data.cluster.cluster_name
621
622   @locking.ssynchronized(_config_lock, shared=1)
623   def GetMasterNode(self):
624     """Get the hostname of the master node for this cluster.
625
626     @return: Master hostname
627
628     """
629     return self._config_data.cluster.master_node
630
631   @locking.ssynchronized(_config_lock, shared=1)
632   def GetMasterIP(self):
633     """Get the IP of the master node for this cluster.
634
635     @return: Master IP
636
637     """
638     return self._config_data.cluster.master_ip
639
640   @locking.ssynchronized(_config_lock, shared=1)
641   def GetMasterNetdev(self):
642     """Get the master network device for this cluster.
643
644     """
645     return self._config_data.cluster.master_netdev
646
647   @locking.ssynchronized(_config_lock, shared=1)
648   def GetFileStorageDir(self):
649     """Get the file storage dir for this cluster.
650
651     """
652     return self._config_data.cluster.file_storage_dir
653
654   @locking.ssynchronized(_config_lock, shared=1)
655   def GetHypervisorType(self):
656     """Get the hypervisor type for this cluster.
657
658     """
659     return self._config_data.cluster.default_hypervisor
660
661   @locking.ssynchronized(_config_lock, shared=1)
662   def GetHostKey(self):
663     """Return the rsa hostkey from the config.
664
665     @rtype: string
666     @return: the rsa hostkey
667
668     """
669     return self._config_data.cluster.rsahostkeypub
670
671   @locking.ssynchronized(_config_lock)
672   def AddInstance(self, instance):
673     """Add an instance to the config.
674
675     This should be used after creating a new instance.
676
677     @type instance: L{objects.Instance}
678     @param instance: the instance object
679
680     """
681     if not isinstance(instance, objects.Instance):
682       raise errors.ProgrammerError("Invalid type passed to AddInstance")
683
684     if instance.disk_template != constants.DT_DISKLESS:
685       all_lvs = instance.MapLVsByNode()
686       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
687
688     instance.serial_no = 1
689     self._config_data.instances[instance.name] = instance
690     self._config_data.cluster.serial_no += 1
691     self._UnlockedReleaseDRBDMinors(instance.name)
692     for nic in instance.nics:
693       self._temporary_macs.discard(nic.mac)
694     self._WriteConfig()
695
696   def _SetInstanceStatus(self, instance_name, status):
697     """Set the instance's status to a given value.
698
699     """
700     assert isinstance(status, bool), \
701            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
702
703     if instance_name not in self._config_data.instances:
704       raise errors.ConfigurationError("Unknown instance '%s'" %
705                                       instance_name)
706     instance = self._config_data.instances[instance_name]
707     if instance.admin_up != status:
708       instance.admin_up = status
709       instance.serial_no += 1
710       self._WriteConfig()
711
712   @locking.ssynchronized(_config_lock)
713   def MarkInstanceUp(self, instance_name):
714     """Mark the instance status to up in the config.
715
716     """
717     self._SetInstanceStatus(instance_name, True)
718
719   @locking.ssynchronized(_config_lock)
720   def RemoveInstance(self, instance_name):
721     """Remove the instance from the configuration.
722
723     """
724     if instance_name not in self._config_data.instances:
725       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
726     del self._config_data.instances[instance_name]
727     self._config_data.cluster.serial_no += 1
728     self._WriteConfig()
729
730   @locking.ssynchronized(_config_lock)
731   def RenameInstance(self, old_name, new_name):
732     """Rename an instance.
733
734     This needs to be done in ConfigWriter and not by RemoveInstance
735     combined with AddInstance as only we can guarantee an atomic
736     rename.
737
738     """
739     if old_name not in self._config_data.instances:
740       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
741     inst = self._config_data.instances[old_name]
742     del self._config_data.instances[old_name]
743     inst.name = new_name
744
745     for disk in inst.disks:
746       if disk.dev_type == constants.LD_FILE:
747         # rename the file paths in logical and physical id
748         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
749         disk.physical_id = disk.logical_id = (disk.logical_id[0],
750                                               os.path.join(file_storage_dir,
751                                                            inst.name,
752                                                            disk.iv_name))
753
754     self._config_data.instances[inst.name] = inst
755     self._WriteConfig()
756
757   @locking.ssynchronized(_config_lock)
758   def MarkInstanceDown(self, instance_name):
759     """Mark the status of an instance to down in the configuration.
760
761     """
762     self._SetInstanceStatus(instance_name, False)
763
764   def _UnlockedGetInstanceList(self):
765     """Get the list of instances.
766
767     This function is for internal use, when the config lock is already held.
768
769     """
770     return self._config_data.instances.keys()
771
772   @locking.ssynchronized(_config_lock, shared=1)
773   def GetInstanceList(self):
774     """Get the list of instances.
775
776     @return: array of instances, ex. ['instance2.example.com',
777         'instance1.example.com']
778
779     """
780     return self._UnlockedGetInstanceList()
781
782   @locking.ssynchronized(_config_lock, shared=1)
783   def ExpandInstanceName(self, short_name):
784     """Attempt to expand an incomplete instance name.
785
786     """
787     return utils.MatchNameComponent(short_name,
788                                     self._config_data.instances.keys())
789
790   def _UnlockedGetInstanceInfo(self, instance_name):
791     """Returns informations about an instance.
792
793     This function is for internal use, when the config lock is already held.
794
795     """
796     if instance_name not in self._config_data.instances:
797       return None
798
799     return self._config_data.instances[instance_name]
800
801   @locking.ssynchronized(_config_lock, shared=1)
802   def GetInstanceInfo(self, instance_name):
803     """Returns informations about an instance.
804
805     It takes the information from the configuration file. Other informations of
806     an instance are taken from the live systems.
807
808     @param instance_name: name of the instance, e.g.
809         I{instance1.example.com}
810
811     @rtype: L{objects.Instance}
812     @return: the instance object
813
814     """
815     return self._UnlockedGetInstanceInfo(instance_name)
816
817   @locking.ssynchronized(_config_lock, shared=1)
818   def GetAllInstancesInfo(self):
819     """Get the configuration of all instances.
820
821     @rtype: dict
822     @returns: dict of (instance, instance_info), where instance_info is what
823               would GetInstanceInfo return for the node
824
825     """
826     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
827                     for instance in self._UnlockedGetInstanceList()])
828     return my_dict
829
830   @locking.ssynchronized(_config_lock)
831   def AddNode(self, node):
832     """Add a node to the configuration.
833
834     @type node: L{objects.Node}
835     @param node: a Node instance
836
837     """
838     logging.info("Adding node %s to configuration" % node.name)
839
840     node.serial_no = 1
841     self._config_data.nodes[node.name] = node
842     self._config_data.cluster.serial_no += 1
843     self._WriteConfig()
844
845   @locking.ssynchronized(_config_lock)
846   def RemoveNode(self, node_name):
847     """Remove a node from the configuration.
848
849     """
850     logging.info("Removing node %s from configuration" % node_name)
851
852     if node_name not in self._config_data.nodes:
853       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
854
855     del self._config_data.nodes[node_name]
856     self._config_data.cluster.serial_no += 1
857     self._WriteConfig()
858
859   @locking.ssynchronized(_config_lock, shared=1)
860   def ExpandNodeName(self, short_name):
861     """Attempt to expand an incomplete instance name.
862
863     """
864     return utils.MatchNameComponent(short_name,
865                                     self._config_data.nodes.keys())
866
867   def _UnlockedGetNodeInfo(self, node_name):
868     """Get the configuration of a node, as stored in the config.
869
870     This function is for internal use, when the config lock is already
871     held.
872
873     @param node_name: the node name, e.g. I{node1.example.com}
874
875     @rtype: L{objects.Node}
876     @return: the node object
877
878     """
879     if node_name not in self._config_data.nodes:
880       return None
881
882     return self._config_data.nodes[node_name]
883
884
885   @locking.ssynchronized(_config_lock, shared=1)
886   def GetNodeInfo(self, node_name):
887     """Get the configuration of a node, as stored in the config.
888
889     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
890
891     @param node_name: the node name, e.g. I{node1.example.com}
892
893     @rtype: L{objects.Node}
894     @return: the node object
895
896     """
897     return self._UnlockedGetNodeInfo(node_name)
898
899   def _UnlockedGetNodeList(self):
900     """Return the list of nodes which are in the configuration.
901
902     This function is for internal use, when the config lock is already
903     held.
904
905     @rtype: list
906
907     """
908     return self._config_data.nodes.keys()
909
910
911   @locking.ssynchronized(_config_lock, shared=1)
912   def GetNodeList(self):
913     """Return the list of nodes which are in the configuration.
914
915     """
916     return self._UnlockedGetNodeList()
917
918   @locking.ssynchronized(_config_lock, shared=1)
919   def GetOnlineNodeList(self):
920     """Return the list of nodes which are online.
921
922     """
923     all_nodes = [self._UnlockedGetNodeInfo(node)
924                  for node in self._UnlockedGetNodeList()]
925     return [node.name for node in all_nodes if not node.offline]
926
927   @locking.ssynchronized(_config_lock, shared=1)
928   def GetAllNodesInfo(self):
929     """Get the configuration of all nodes.
930
931     @rtype: dict
932     @return: dict of (node, node_info), where node_info is what
933               would GetNodeInfo return for the node
934
935     """
936     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
937                     for node in self._UnlockedGetNodeList()])
938     return my_dict
939
940   def _UnlockedGetMasterCandidateStats(self):
941     """Get the number of current and maximum desired and possible candidates.
942
943     @rtype: tuple
944     @return: tuple of (current, desired and possible)
945
946     """
947     mc_now = mc_max = 0
948     for node in self._config_data.nodes.itervalues():
949       if not (node.offline or node.drained):
950         mc_max += 1
951       if node.master_candidate:
952         mc_now += 1
953     mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
954     return (mc_now, mc_max)
955
956   @locking.ssynchronized(_config_lock, shared=1)
957   def GetMasterCandidateStats(self):
958     """Get the number of current and maximum possible candidates.
959
960     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
961
962     @rtype: tuple
963     @return: tuple of (current, max)
964
965     """
966     return self._UnlockedGetMasterCandidateStats()
967
968   @locking.ssynchronized(_config_lock)
969   def MaintainCandidatePool(self):
970     """Try to grow the candidate pool to the desired size.
971
972     @rtype: list
973     @return: list with the adjusted nodes (L{objects.Node} instances)
974
975     """
976     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
977     mod_list = []
978     if mc_now < mc_max:
979       node_list = self._config_data.nodes.keys()
980       random.shuffle(node_list)
981       for name in node_list:
982         if mc_now >= mc_max:
983           break
984         node = self._config_data.nodes[name]
985         if node.master_candidate or node.offline or node.drained:
986           continue
987         mod_list.append(node)
988         node.master_candidate = True
989         node.serial_no += 1
990         mc_now += 1
991       if mc_now != mc_max:
992         # this should not happen
993         logging.warning("Warning: MaintainCandidatePool didn't manage to"
994                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
995       if mod_list:
996         self._config_data.cluster.serial_no += 1
997         self._WriteConfig()
998
999     return mod_list
1000
1001   def _BumpSerialNo(self):
1002     """Bump up the serial number of the config.
1003
1004     """
1005     self._config_data.serial_no += 1
1006
1007   def _OpenConfig(self):
1008     """Read the config data from disk.
1009
1010     """
1011     f = open(self._cfg_file, 'r')
1012     try:
1013       try:
1014         data = objects.ConfigData.FromDict(serializer.Load(f.read()))
1015       except Exception, err:
1016         raise errors.ConfigurationError(err)
1017     finally:
1018       f.close()
1019
1020     # Make sure the configuration has the right version
1021     _ValidateConfig(data)
1022
1023     if (not hasattr(data, 'cluster') or
1024         not hasattr(data.cluster, 'rsahostkeypub')):
1025       raise errors.ConfigurationError("Incomplete configuration"
1026                                       " (missing cluster.rsahostkeypub)")
1027     self._config_data = data
1028     # reset the last serial as -1 so that the next write will cause
1029     # ssconf update
1030     self._last_cluster_serial = -1
1031
1032   def _DistributeConfig(self):
1033     """Distribute the configuration to the other nodes.
1034
1035     Currently, this only copies the configuration file. In the future,
1036     it could be used to encapsulate the 2/3-phase update mechanism.
1037
1038     """
1039     if self._offline:
1040       return True
1041     bad = False
1042
1043     node_list = []
1044     addr_list = []
1045     myhostname = self._my_hostname
1046     # we can skip checking whether _UnlockedGetNodeInfo returns None
1047     # since the node list comes from _UnlocketGetNodeList, and we are
1048     # called with the lock held, so no modifications should take place
1049     # in between
1050     for node_name in self._UnlockedGetNodeList():
1051       if node_name == myhostname:
1052         continue
1053       node_info = self._UnlockedGetNodeInfo(node_name)
1054       if not node_info.master_candidate:
1055         continue
1056       node_list.append(node_info.name)
1057       addr_list.append(node_info.primary_ip)
1058
1059     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1060                                             address_list=addr_list)
1061     for node in node_list:
1062       if not result[node]:
1063         logging.error("copy of file %s to node %s failed",
1064                       self._cfg_file, node)
1065         bad = True
1066     return not bad
1067
1068   def _WriteConfig(self, destination=None):
1069     """Write the configuration data to persistent storage.
1070
1071     """
1072     config_errors = self._UnlockedVerifyConfig()
1073     if config_errors:
1074       raise errors.ConfigurationError("Configuration data is not"
1075                                       " consistent: %s" %
1076                                       (", ".join(config_errors)))
1077     if destination is None:
1078       destination = self._cfg_file
1079     self._BumpSerialNo()
1080     txt = serializer.Dump(self._config_data.ToDict())
1081     dir_name, file_name = os.path.split(destination)
1082     fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1083     f = os.fdopen(fd, 'w')
1084     try:
1085       f.write(txt)
1086       os.fsync(f.fileno())
1087     finally:
1088       f.close()
1089     # we don't need to do os.close(fd) as f.close() did it
1090     os.rename(name, destination)
1091     self.write_count += 1
1092
1093     # and redistribute the config file to master candidates
1094     self._DistributeConfig()
1095
1096     # Write ssconf files on all nodes (including locally)
1097     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1098       if not self._offline:
1099         rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1100                                               self._UnlockedGetSsconfValues())
1101       self._last_cluster_serial = self._config_data.cluster.serial_no
1102
1103   def _UnlockedGetSsconfValues(self):
1104     """Return the values needed by ssconf.
1105
1106     @rtype: dict
1107     @return: a dictionary with keys the ssconf names and values their
1108         associated value
1109
1110     """
1111     fn = "\n".join
1112     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1113     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1114     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1115
1116     instance_data = fn(instance_names)
1117     off_data = fn(node.name for node in node_info if node.offline)
1118     on_data = fn(node.name for node in node_info if not node.offline)
1119     mc_data = fn(node.name for node in node_info if node.master_candidate)
1120     node_data = fn(node_names)
1121
1122     cluster = self._config_data.cluster
1123     return {
1124       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1125       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1126       constants.SS_MASTER_CANDIDATES: mc_data,
1127       constants.SS_MASTER_IP: cluster.master_ip,
1128       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1129       constants.SS_MASTER_NODE: cluster.master_node,
1130       constants.SS_NODE_LIST: node_data,
1131       constants.SS_OFFLINE_NODES: off_data,
1132       constants.SS_ONLINE_NODES: on_data,
1133       constants.SS_INSTANCE_LIST: instance_data,
1134       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1135       }
1136
1137   @locking.ssynchronized(_config_lock)
1138   def InitConfig(self, version, cluster_config, master_node_config):
1139     """Create the initial cluster configuration.
1140
1141     It will contain the current node, which will also be the master
1142     node, and no instances.
1143
1144     @type version: int
1145     @param version: Configuration version
1146     @type cluster_config: objects.Cluster
1147     @param cluster_config: Cluster configuration
1148     @type master_node_config: objects.Node
1149     @param master_node_config: Master node configuration
1150
1151     """
1152     nodes = {
1153       master_node_config.name: master_node_config,
1154       }
1155
1156     self._config_data = objects.ConfigData(version=version,
1157                                            cluster=cluster_config,
1158                                            nodes=nodes,
1159                                            instances={},
1160                                            serial_no=1)
1161     self._WriteConfig()
1162
1163   @locking.ssynchronized(_config_lock, shared=1)
1164   def GetVGName(self):
1165     """Return the volume group name.
1166
1167     """
1168     return self._config_data.cluster.volume_group_name
1169
1170   @locking.ssynchronized(_config_lock)
1171   def SetVGName(self, vg_name):
1172     """Set the volume group name.
1173
1174     """
1175     self._config_data.cluster.volume_group_name = vg_name
1176     self._config_data.cluster.serial_no += 1
1177     self._WriteConfig()
1178
1179   @locking.ssynchronized(_config_lock, shared=1)
1180   def GetDefBridge(self):
1181     """Return the default bridge.
1182
1183     """
1184     return self._config_data.cluster.default_bridge
1185
1186   @locking.ssynchronized(_config_lock, shared=1)
1187   def GetMACPrefix(self):
1188     """Return the mac prefix.
1189
1190     """
1191     return self._config_data.cluster.mac_prefix
1192
1193   @locking.ssynchronized(_config_lock, shared=1)
1194   def GetClusterInfo(self):
1195     """Returns informations about the cluster
1196
1197     @rtype: L{objects.Cluster}
1198     @return: the cluster object
1199
1200     """
1201     return self._config_data.cluster
1202
1203   @locking.ssynchronized(_config_lock)
1204   def Update(self, target):
1205     """Notify function to be called after updates.
1206
1207     This function must be called when an object (as returned by
1208     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1209     caller wants the modifications saved to the backing store. Note
1210     that all modified objects will be saved, but the target argument
1211     is the one the caller wants to ensure that it's saved.
1212
1213     @param target: an instance of either L{objects.Cluster},
1214         L{objects.Node} or L{objects.Instance} which is existing in
1215         the cluster
1216
1217     """
1218     if self._config_data is None:
1219       raise errors.ProgrammerError("Configuration file not read,"
1220                                    " cannot save.")
1221     update_serial = False
1222     if isinstance(target, objects.Cluster):
1223       test = target == self._config_data.cluster
1224     elif isinstance(target, objects.Node):
1225       test = target in self._config_data.nodes.values()
1226       update_serial = True
1227     elif isinstance(target, objects.Instance):
1228       test = target in self._config_data.instances.values()
1229     else:
1230       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1231                                    " ConfigWriter.Update" % type(target))
1232     if not test:
1233       raise errors.ConfigurationError("Configuration updated since object"
1234                                       " has been read or unknown object")
1235     target.serial_no += 1
1236
1237     if update_serial:
1238       # for node updates, we need to increase the cluster serial too
1239       self._config_data.cluster.serial_no += 1
1240
1241     if isinstance(target, objects.Instance):
1242       self._UnlockedReleaseDRBDMinors(target.name)
1243       for nic in target.nics:
1244         self._temporary_macs.discard(nic.mac)
1245
1246     self._WriteConfig()