Do not check 'None' disk IDs for duplicates
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import tempfile
36 import random
37 import logging
38
39 from ganeti import errors
40 from ganeti import locking
41 from ganeti import utils
42 from ganeti import constants
43 from ganeti import rpc
44 from ganeti import objects
45 from ganeti import serializer
46
47
48 _config_lock = locking.SharedLock()
49
50
51 def _ValidateConfig(data):
52   """Verifies that a configuration objects looks valid.
53
54   This only verifies the version of the configuration.
55
56   @raise errors.ConfigurationError: if the version differs from what
57       we expect
58
59   """
60   if data.version != constants.CONFIG_VERSION:
61     raise errors.ConfigurationError("Cluster configuration version"
62                                     " mismatch, got %s instead of %s" %
63                                     (data.version,
64                                      constants.CONFIG_VERSION))
65
66
67 class ConfigWriter:
68   """The interface to the cluster configuration.
69
70   """
71   def __init__(self, cfg_file=None, offline=False):
72     self.write_count = 0
73     self._lock = _config_lock
74     self._config_data = None
75     self._offline = offline
76     if cfg_file is None:
77       self._cfg_file = constants.CLUSTER_CONF_FILE
78     else:
79       self._cfg_file = cfg_file
80     self._temporary_ids = set()
81     self._temporary_drbds = {}
82     self._temporary_macs = set()
83     # Note: in order to prevent errors when resolving our name in
84     # _DistributeConfig, we compute it here once and reuse it; it's
85     # better to raise an error before starting to modify the config
86     # file than after it was modified
87     self._my_hostname = utils.HostInfo().name
88     self._last_cluster_serial = -1
89     self._OpenConfig()
90
91   # this method needs to be static, so that we can call it on the class
92   @staticmethod
93   def IsCluster():
94     """Check if the cluster is configured.
95
96     """
97     return os.path.exists(constants.CLUSTER_CONF_FILE)
98
99   @locking.ssynchronized(_config_lock, shared=1)
100   def GenerateMAC(self):
101     """Generate a MAC for an instance.
102
103     This should check the current instances for duplicates.
104
105     """
106     prefix = self._config_data.cluster.mac_prefix
107     all_macs = self._AllMACs()
108     retries = 64
109     while retries > 0:
110       byte1 = random.randrange(0, 256)
111       byte2 = random.randrange(0, 256)
112       byte3 = random.randrange(0, 256)
113       mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
114       if mac not in all_macs and mac not in self._temporary_macs:
115         break
116       retries -= 1
117     else:
118       raise errors.ConfigurationError("Can't generate unique MAC")
119     self._temporary_macs.add(mac)
120     return mac
121
122   @locking.ssynchronized(_config_lock, shared=1)
123   def IsMacInUse(self, mac):
124     """Predicate: check if the specified MAC is in use in the Ganeti cluster.
125
126     This only checks instances managed by this cluster, it does not
127     check for potential collisions elsewhere.
128
129     """
130     all_macs = self._AllMACs()
131     return mac in all_macs or mac in self._temporary_macs
132
133   @locking.ssynchronized(_config_lock, shared=1)
134   def GenerateDRBDSecret(self):
135     """Generate a DRBD secret.
136
137     This checks the current disks for duplicates.
138
139     """
140     all_secrets = self._AllDRBDSecrets()
141     retries = 64
142     while retries > 0:
143       secret = utils.GenerateSecret()
144       if secret not in all_secrets:
145         break
146       retries -= 1
147     else:
148       raise errors.ConfigurationError("Can't generate unique DRBD secret")
149     return secret
150
151   def _ComputeAllLVs(self):
152     """Compute the list of all LVs.
153
154     """
155     lvnames = set()
156     for instance in self._config_data.instances.values():
157       node_data = instance.MapLVsByNode()
158       for lv_list in node_data.values():
159         lvnames.update(lv_list)
160     return lvnames
161
162   @locking.ssynchronized(_config_lock, shared=1)
163   def GenerateUniqueID(self, exceptions=None):
164     """Generate an unique disk name.
165
166     This checks the current node, instances and disk names for
167     duplicates.
168
169     @param exceptions: a list with some other names which should be checked
170         for uniqueness (used for example when you want to get
171         more than one id at one time without adding each one in
172         turn to the config file)
173
174     @rtype: string
175     @return: the unique id
176
177     """
178     existing = set()
179     existing.update(self._temporary_ids)
180     existing.update(self._ComputeAllLVs())
181     existing.update(self._config_data.instances.keys())
182     existing.update(self._config_data.nodes.keys())
183     if exceptions is not None:
184       existing.update(exceptions)
185     retries = 64
186     while retries > 0:
187       unique_id = utils.NewUUID()
188       if unique_id not in existing and unique_id is not None:
189         break
190     else:
191       raise errors.ConfigurationError("Not able generate an unique ID"
192                                       " (last tried ID: %s" % unique_id)
193     self._temporary_ids.add(unique_id)
194     return unique_id
195
196   def _AllMACs(self):
197     """Return all MACs present in the config.
198
199     @rtype: list
200     @return: the list of all MACs
201
202     """
203     result = []
204     for instance in self._config_data.instances.values():
205       for nic in instance.nics:
206         result.append(nic.mac)
207
208     return result
209
210   def _AllDRBDSecrets(self):
211     """Return all DRBD secrets present in the config.
212
213     @rtype: list
214     @return: the list of all DRBD secrets
215
216     """
217     def helper(disk, result):
218       """Recursively gather secrets from this disk."""
219       if disk.dev_type == constants.DT_DRBD8:
220         result.append(disk.logical_id[5])
221       if disk.children:
222         for child in disk.children:
223           helper(child, result)
224
225     result = []
226     for instance in self._config_data.instances.values():
227       for disk in instance.disks:
228         helper(disk, result)
229
230     return result
231
232   def _CheckDiskIDs(self, disk, l_ids, p_ids):
233     """Compute duplicate disk IDs
234
235     @type disk: L{objects.Disk}
236     @param disk: the disk at which to start searching
237     @type l_ids: list
238     @param l_ids: list of current logical ids
239     @type p_ids: list
240     @param p_ids: list of current physical ids
241     @rtype: list
242     @return: a list of error messages
243
244     """
245     result = []
246     if disk.logical_id is not None:
247       if disk.logical_id in l_ids:
248         result.append("duplicate logical id %s" % str(disk.logical_id))
249       else:
250         l_ids.append(disk.logical_id)
251     if disk.physical_id is not None:
252       if disk.physical_id in p_ids:
253         result.append("duplicate physical id %s" % str(disk.physical_id))
254       else:
255         p_ids.append(disk.physical_id)
256
257     if disk.children:
258       for child in disk.children:
259         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
260     return result
261
262   def _UnlockedVerifyConfig(self):
263     """Verify function.
264
265     @rtype: list
266     @return: a list of error messages; a non-empty list signifies
267         configuration errors
268
269     """
270     result = []
271     seen_macs = []
272     ports = {}
273     data = self._config_data
274     seen_lids = []
275     seen_pids = []
276     for instance_name in data.instances:
277       instance = data.instances[instance_name]
278       if instance.primary_node not in data.nodes:
279         result.append("instance '%s' has invalid primary node '%s'" %
280                       (instance_name, instance.primary_node))
281       for snode in instance.secondary_nodes:
282         if snode not in data.nodes:
283           result.append("instance '%s' has invalid secondary node '%s'" %
284                         (instance_name, snode))
285       for idx, nic in enumerate(instance.nics):
286         if nic.mac in seen_macs:
287           result.append("instance '%s' has NIC %d mac %s duplicate" %
288                         (instance_name, idx, nic.mac))
289         else:
290           seen_macs.append(nic.mac)
291
292       # gather the drbd ports for duplicate checks
293       for dsk in instance.disks:
294         if dsk.dev_type in constants.LDS_DRBD:
295           tcp_port = dsk.logical_id[2]
296           if tcp_port not in ports:
297             ports[tcp_port] = []
298           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
299       # gather network port reservation
300       net_port = getattr(instance, "network_port", None)
301       if net_port is not None:
302         if net_port not in ports:
303           ports[net_port] = []
304         ports[net_port].append((instance.name, "network port"))
305
306       # instance disk verify
307       for idx, disk in enumerate(instance.disks):
308         result.extend(["instance '%s' disk %d error: %s" %
309                        (instance.name, idx, msg) for msg in disk.Verify()])
310         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
311
312     # cluster-wide pool of free ports
313     for free_port in data.cluster.tcpudp_port_pool:
314       if free_port not in ports:
315         ports[free_port] = []
316       ports[free_port].append(("cluster", "port marked as free"))
317
318     # compute tcp/udp duplicate ports
319     keys = ports.keys()
320     keys.sort()
321     for pnum in keys:
322       pdata = ports[pnum]
323       if len(pdata) > 1:
324         txt = ", ".join(["%s/%s" % val for val in pdata])
325         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
326
327     # highest used tcp port check
328     if keys:
329       if keys[-1] > data.cluster.highest_used_port:
330         result.append("Highest used port mismatch, saved %s, computed %s" %
331                       (data.cluster.highest_used_port, keys[-1]))
332
333     if not data.nodes[data.cluster.master_node].master_candidate:
334       result.append("Master node is not a master candidate")
335
336     # master candidate checks
337     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
338     if mc_now < mc_max:
339       result.append("Not enough master candidates: actual %d, target %d" %
340                     (mc_now, mc_max))
341
342     # node checks
343     for node in data.nodes.values():
344       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
345         result.append("Node %s state is invalid: master_candidate=%s,"
346                       " drain=%s, offline=%s" %
347                       (node.name, node.master_candidate, node.drain,
348                        node.offline))
349
350     # drbd minors check
351     d_map, duplicates = self._UnlockedComputeDRBDMap()
352     for node, minor, instance_a, instance_b in duplicates:
353       result.append("DRBD minor %d on node %s is assigned twice to instances"
354                     " %s and %s" % (minor, node, instance_a, instance_b))
355
356     return result
357
358   @locking.ssynchronized(_config_lock, shared=1)
359   def VerifyConfig(self):
360     """Verify function.
361
362     This is just a wrapper over L{_UnlockedVerifyConfig}.
363
364     @rtype: list
365     @return: a list of error messages; a non-empty list signifies
366         configuration errors
367
368     """
369     return self._UnlockedVerifyConfig()
370
371   def _UnlockedSetDiskID(self, disk, node_name):
372     """Convert the unique ID to the ID needed on the target nodes.
373
374     This is used only for drbd, which needs ip/port configuration.
375
376     The routine descends down and updates its children also, because
377     this helps when the only the top device is passed to the remote
378     node.
379
380     This function is for internal use, when the config lock is already held.
381
382     """
383     if disk.children:
384       for child in disk.children:
385         self._UnlockedSetDiskID(child, node_name)
386
387     if disk.logical_id is None and disk.physical_id is not None:
388       return
389     if disk.dev_type == constants.LD_DRBD8:
390       pnode, snode, port, pminor, sminor, secret = disk.logical_id
391       if node_name not in (pnode, snode):
392         raise errors.ConfigurationError("DRBD device not knowing node %s" %
393                                         node_name)
394       pnode_info = self._UnlockedGetNodeInfo(pnode)
395       snode_info = self._UnlockedGetNodeInfo(snode)
396       if pnode_info is None or snode_info is None:
397         raise errors.ConfigurationError("Can't find primary or secondary node"
398                                         " for %s" % str(disk))
399       p_data = (pnode_info.secondary_ip, port)
400       s_data = (snode_info.secondary_ip, port)
401       if pnode == node_name:
402         disk.physical_id = p_data + s_data + (pminor, secret)
403       else: # it must be secondary, we tested above
404         disk.physical_id = s_data + p_data + (sminor, secret)
405     else:
406       disk.physical_id = disk.logical_id
407     return
408
409   @locking.ssynchronized(_config_lock)
410   def SetDiskID(self, disk, node_name):
411     """Convert the unique ID to the ID needed on the target nodes.
412
413     This is used only for drbd, which needs ip/port configuration.
414
415     The routine descends down and updates its children also, because
416     this helps when the only the top device is passed to the remote
417     node.
418
419     """
420     return self._UnlockedSetDiskID(disk, node_name)
421
422   @locking.ssynchronized(_config_lock)
423   def AddTcpUdpPort(self, port):
424     """Adds a new port to the available port pool.
425
426     """
427     if not isinstance(port, int):
428       raise errors.ProgrammerError("Invalid type passed for port")
429
430     self._config_data.cluster.tcpudp_port_pool.add(port)
431     self._WriteConfig()
432
433   @locking.ssynchronized(_config_lock, shared=1)
434   def GetPortList(self):
435     """Returns a copy of the current port list.
436
437     """
438     return self._config_data.cluster.tcpudp_port_pool.copy()
439
440   @locking.ssynchronized(_config_lock)
441   def AllocatePort(self):
442     """Allocate a port.
443
444     The port will be taken from the available port pool or from the
445     default port range (and in this case we increase
446     highest_used_port).
447
448     """
449     # If there are TCP/IP ports configured, we use them first.
450     if self._config_data.cluster.tcpudp_port_pool:
451       port = self._config_data.cluster.tcpudp_port_pool.pop()
452     else:
453       port = self._config_data.cluster.highest_used_port + 1
454       if port >= constants.LAST_DRBD_PORT:
455         raise errors.ConfigurationError("The highest used port is greater"
456                                         " than %s. Aborting." %
457                                         constants.LAST_DRBD_PORT)
458       self._config_data.cluster.highest_used_port = port
459
460     self._WriteConfig()
461     return port
462
463   def _UnlockedComputeDRBDMap(self):
464     """Compute the used DRBD minor/nodes.
465
466     @rtype: (dict, list)
467     @return: dictionary of node_name: dict of minor: instance_name;
468         the returned dict will have all the nodes in it (even if with
469         an empty list), and a list of duplicates; if the duplicates
470         list is not empty, the configuration is corrupted and its caller
471         should raise an exception
472
473     """
474     def _AppendUsedPorts(instance_name, disk, used):
475       duplicates = []
476       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
477         nodeA, nodeB, dummy, minorA, minorB = disk.logical_id[:5]
478         for node, port in ((nodeA, minorA), (nodeB, minorB)):
479           assert node in used, ("Node '%s' of instance '%s' not found"
480                                 " in node list" % (node, instance_name))
481           if port in used[node]:
482             duplicates.append((node, port, instance_name, used[node][port]))
483           else:
484             used[node][port] = instance_name
485       if disk.children:
486         for child in disk.children:
487           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
488       return duplicates
489
490     duplicates = []
491     my_dict = dict((node, {}) for node in self._config_data.nodes)
492     for instance in self._config_data.instances.itervalues():
493       for disk in instance.disks:
494         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
495     for (node, minor), instance in self._temporary_drbds.iteritems():
496       if minor in my_dict[node] and my_dict[node][minor] != instance:
497         duplicates.append((node, minor, instance, my_dict[node][minor]))
498       else:
499         my_dict[node][minor] = instance
500     return my_dict, duplicates
501
502   @locking.ssynchronized(_config_lock)
503   def ComputeDRBDMap(self):
504     """Compute the used DRBD minor/nodes.
505
506     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
507
508     @return: dictionary of node_name: dict of minor: instance_name;
509         the returned dict will have all the nodes in it (even if with
510         an empty list).
511
512     """
513     d_map, duplicates = self._UnlockedComputeDRBDMap()
514     if duplicates:
515       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
516                                       str(duplicates))
517     return d_map
518
519   @locking.ssynchronized(_config_lock)
520   def AllocateDRBDMinor(self, nodes, instance):
521     """Allocate a drbd minor.
522
523     The free minor will be automatically computed from the existing
524     devices. A node can be given multiple times in order to allocate
525     multiple minors. The result is the list of minors, in the same
526     order as the passed nodes.
527
528     @type instance: string
529     @param instance: the instance for which we allocate minors
530
531     """
532     assert isinstance(instance, basestring), \
533            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
534
535     d_map, duplicates = self._UnlockedComputeDRBDMap()
536     if duplicates:
537       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
538                                       str(duplicates))
539     result = []
540     for nname in nodes:
541       ndata = d_map[nname]
542       if not ndata:
543         # no minors used, we can start at 0
544         result.append(0)
545         ndata[0] = instance
546         self._temporary_drbds[(nname, 0)] = instance
547         continue
548       keys = ndata.keys()
549       keys.sort()
550       ffree = utils.FirstFree(keys)
551       if ffree is None:
552         # return the next minor
553         # TODO: implement high-limit check
554         minor = keys[-1] + 1
555       else:
556         minor = ffree
557       # double-check minor against current instances
558       assert minor not in d_map[nname], \
559              ("Attempt to reuse allocated DRBD minor %d on node %s,"
560               " already allocated to instance %s" %
561               (minor, nname, d_map[nname][minor]))
562       ndata[minor] = instance
563       # double-check minor against reservation
564       r_key = (nname, minor)
565       assert r_key not in self._temporary_drbds, \
566              ("Attempt to reuse reserved DRBD minor %d on node %s,"
567               " reserved for instance %s" %
568               (minor, nname, self._temporary_drbds[r_key]))
569       self._temporary_drbds[r_key] = instance
570       result.append(minor)
571     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
572                   nodes, result)
573     return result
574
575   def _UnlockedReleaseDRBDMinors(self, instance):
576     """Release temporary drbd minors allocated for a given instance.
577
578     @type instance: string
579     @param instance: the instance for which temporary minors should be
580                      released
581
582     """
583     assert isinstance(instance, basestring), \
584            "Invalid argument passed to ReleaseDRBDMinors"
585     for key, name in self._temporary_drbds.items():
586       if name == instance:
587         del self._temporary_drbds[key]
588
589   @locking.ssynchronized(_config_lock)
590   def ReleaseDRBDMinors(self, instance):
591     """Release temporary drbd minors allocated for a given instance.
592
593     This should be called on the error paths, on the success paths
594     it's automatically called by the ConfigWriter add and update
595     functions.
596
597     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
598
599     @type instance: string
600     @param instance: the instance for which temporary minors should be
601                      released
602
603     """
604     self._UnlockedReleaseDRBDMinors(instance)
605
606   @locking.ssynchronized(_config_lock, shared=1)
607   def GetConfigVersion(self):
608     """Get the configuration version.
609
610     @return: Config version
611
612     """
613     return self._config_data.version
614
615   @locking.ssynchronized(_config_lock, shared=1)
616   def GetClusterName(self):
617     """Get cluster name.
618
619     @return: Cluster name
620
621     """
622     return self._config_data.cluster.cluster_name
623
624   @locking.ssynchronized(_config_lock, shared=1)
625   def GetMasterNode(self):
626     """Get the hostname of the master node for this cluster.
627
628     @return: Master hostname
629
630     """
631     return self._config_data.cluster.master_node
632
633   @locking.ssynchronized(_config_lock, shared=1)
634   def GetMasterIP(self):
635     """Get the IP of the master node for this cluster.
636
637     @return: Master IP
638
639     """
640     return self._config_data.cluster.master_ip
641
642   @locking.ssynchronized(_config_lock, shared=1)
643   def GetMasterNetdev(self):
644     """Get the master network device for this cluster.
645
646     """
647     return self._config_data.cluster.master_netdev
648
649   @locking.ssynchronized(_config_lock, shared=1)
650   def GetFileStorageDir(self):
651     """Get the file storage dir for this cluster.
652
653     """
654     return self._config_data.cluster.file_storage_dir
655
656   @locking.ssynchronized(_config_lock, shared=1)
657   def GetHypervisorType(self):
658     """Get the hypervisor type for this cluster.
659
660     """
661     return self._config_data.cluster.default_hypervisor
662
663   @locking.ssynchronized(_config_lock, shared=1)
664   def GetHostKey(self):
665     """Return the rsa hostkey from the config.
666
667     @rtype: string
668     @return: the rsa hostkey
669
670     """
671     return self._config_data.cluster.rsahostkeypub
672
673   @locking.ssynchronized(_config_lock)
674   def AddInstance(self, instance):
675     """Add an instance to the config.
676
677     This should be used after creating a new instance.
678
679     @type instance: L{objects.Instance}
680     @param instance: the instance object
681
682     """
683     if not isinstance(instance, objects.Instance):
684       raise errors.ProgrammerError("Invalid type passed to AddInstance")
685
686     if instance.disk_template != constants.DT_DISKLESS:
687       all_lvs = instance.MapLVsByNode()
688       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
689
690     instance.serial_no = 1
691     self._config_data.instances[instance.name] = instance
692     self._config_data.cluster.serial_no += 1
693     self._UnlockedReleaseDRBDMinors(instance.name)
694     for nic in instance.nics:
695       self._temporary_macs.discard(nic.mac)
696     self._WriteConfig()
697
698   def _SetInstanceStatus(self, instance_name, status):
699     """Set the instance's status to a given value.
700
701     """
702     assert isinstance(status, bool), \
703            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
704
705     if instance_name not in self._config_data.instances:
706       raise errors.ConfigurationError("Unknown instance '%s'" %
707                                       instance_name)
708     instance = self._config_data.instances[instance_name]
709     if instance.admin_up != status:
710       instance.admin_up = status
711       instance.serial_no += 1
712       self._WriteConfig()
713
714   @locking.ssynchronized(_config_lock)
715   def MarkInstanceUp(self, instance_name):
716     """Mark the instance status to up in the config.
717
718     """
719     self._SetInstanceStatus(instance_name, True)
720
721   @locking.ssynchronized(_config_lock)
722   def RemoveInstance(self, instance_name):
723     """Remove the instance from the configuration.
724
725     """
726     if instance_name not in self._config_data.instances:
727       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
728     del self._config_data.instances[instance_name]
729     self._config_data.cluster.serial_no += 1
730     self._WriteConfig()
731
732   @locking.ssynchronized(_config_lock)
733   def RenameInstance(self, old_name, new_name):
734     """Rename an instance.
735
736     This needs to be done in ConfigWriter and not by RemoveInstance
737     combined with AddInstance as only we can guarantee an atomic
738     rename.
739
740     """
741     if old_name not in self._config_data.instances:
742       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
743     inst = self._config_data.instances[old_name]
744     del self._config_data.instances[old_name]
745     inst.name = new_name
746
747     for disk in inst.disks:
748       if disk.dev_type == constants.LD_FILE:
749         # rename the file paths in logical and physical id
750         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
751         disk.physical_id = disk.logical_id = (disk.logical_id[0],
752                                               os.path.join(file_storage_dir,
753                                                            inst.name,
754                                                            disk.iv_name))
755
756     self._config_data.instances[inst.name] = inst
757     self._WriteConfig()
758
759   @locking.ssynchronized(_config_lock)
760   def MarkInstanceDown(self, instance_name):
761     """Mark the status of an instance to down in the configuration.
762
763     """
764     self._SetInstanceStatus(instance_name, False)
765
766   def _UnlockedGetInstanceList(self):
767     """Get the list of instances.
768
769     This function is for internal use, when the config lock is already held.
770
771     """
772     return self._config_data.instances.keys()
773
774   @locking.ssynchronized(_config_lock, shared=1)
775   def GetInstanceList(self):
776     """Get the list of instances.
777
778     @return: array of instances, ex. ['instance2.example.com',
779         'instance1.example.com']
780
781     """
782     return self._UnlockedGetInstanceList()
783
784   @locking.ssynchronized(_config_lock, shared=1)
785   def ExpandInstanceName(self, short_name):
786     """Attempt to expand an incomplete instance name.
787
788     """
789     return utils.MatchNameComponent(short_name,
790                                     self._config_data.instances.keys())
791
792   def _UnlockedGetInstanceInfo(self, instance_name):
793     """Returns informations about an instance.
794
795     This function is for internal use, when the config lock is already held.
796
797     """
798     if instance_name not in self._config_data.instances:
799       return None
800
801     return self._config_data.instances[instance_name]
802
803   @locking.ssynchronized(_config_lock, shared=1)
804   def GetInstanceInfo(self, instance_name):
805     """Returns informations about an instance.
806
807     It takes the information from the configuration file. Other informations of
808     an instance are taken from the live systems.
809
810     @param instance_name: name of the instance, e.g.
811         I{instance1.example.com}
812
813     @rtype: L{objects.Instance}
814     @return: the instance object
815
816     """
817     return self._UnlockedGetInstanceInfo(instance_name)
818
819   @locking.ssynchronized(_config_lock, shared=1)
820   def GetAllInstancesInfo(self):
821     """Get the configuration of all instances.
822
823     @rtype: dict
824     @returns: dict of (instance, instance_info), where instance_info is what
825               would GetInstanceInfo return for the node
826
827     """
828     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
829                     for instance in self._UnlockedGetInstanceList()])
830     return my_dict
831
832   @locking.ssynchronized(_config_lock)
833   def AddNode(self, node):
834     """Add a node to the configuration.
835
836     @type node: L{objects.Node}
837     @param node: a Node instance
838
839     """
840     logging.info("Adding node %s to configuration" % node.name)
841
842     node.serial_no = 1
843     self._config_data.nodes[node.name] = node
844     self._config_data.cluster.serial_no += 1
845     self._WriteConfig()
846
847   @locking.ssynchronized(_config_lock)
848   def RemoveNode(self, node_name):
849     """Remove a node from the configuration.
850
851     """
852     logging.info("Removing node %s from configuration" % node_name)
853
854     if node_name not in self._config_data.nodes:
855       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
856
857     del self._config_data.nodes[node_name]
858     self._config_data.cluster.serial_no += 1
859     self._WriteConfig()
860
861   @locking.ssynchronized(_config_lock, shared=1)
862   def ExpandNodeName(self, short_name):
863     """Attempt to expand an incomplete instance name.
864
865     """
866     return utils.MatchNameComponent(short_name,
867                                     self._config_data.nodes.keys())
868
869   def _UnlockedGetNodeInfo(self, node_name):
870     """Get the configuration of a node, as stored in the config.
871
872     This function is for internal use, when the config lock is already
873     held.
874
875     @param node_name: the node name, e.g. I{node1.example.com}
876
877     @rtype: L{objects.Node}
878     @return: the node object
879
880     """
881     if node_name not in self._config_data.nodes:
882       return None
883
884     return self._config_data.nodes[node_name]
885
886
887   @locking.ssynchronized(_config_lock, shared=1)
888   def GetNodeInfo(self, node_name):
889     """Get the configuration of a node, as stored in the config.
890
891     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
892
893     @param node_name: the node name, e.g. I{node1.example.com}
894
895     @rtype: L{objects.Node}
896     @return: the node object
897
898     """
899     return self._UnlockedGetNodeInfo(node_name)
900
901   def _UnlockedGetNodeList(self):
902     """Return the list of nodes which are in the configuration.
903
904     This function is for internal use, when the config lock is already
905     held.
906
907     @rtype: list
908
909     """
910     return self._config_data.nodes.keys()
911
912
913   @locking.ssynchronized(_config_lock, shared=1)
914   def GetNodeList(self):
915     """Return the list of nodes which are in the configuration.
916
917     """
918     return self._UnlockedGetNodeList()
919
920   @locking.ssynchronized(_config_lock, shared=1)
921   def GetOnlineNodeList(self):
922     """Return the list of nodes which are online.
923
924     """
925     all_nodes = [self._UnlockedGetNodeInfo(node)
926                  for node in self._UnlockedGetNodeList()]
927     return [node.name for node in all_nodes if not node.offline]
928
929   @locking.ssynchronized(_config_lock, shared=1)
930   def GetAllNodesInfo(self):
931     """Get the configuration of all nodes.
932
933     @rtype: dict
934     @return: dict of (node, node_info), where node_info is what
935               would GetNodeInfo return for the node
936
937     """
938     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
939                     for node in self._UnlockedGetNodeList()])
940     return my_dict
941
942   def _UnlockedGetMasterCandidateStats(self):
943     """Get the number of current and maximum desired and possible candidates.
944
945     @rtype: tuple
946     @return: tuple of (current, desired and possible)
947
948     """
949     mc_now = mc_max = 0
950     for node in self._config_data.nodes.itervalues():
951       if not (node.offline or node.drained):
952         mc_max += 1
953       if node.master_candidate:
954         mc_now += 1
955     mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
956     return (mc_now, mc_max)
957
958   @locking.ssynchronized(_config_lock, shared=1)
959   def GetMasterCandidateStats(self):
960     """Get the number of current and maximum possible candidates.
961
962     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
963
964     @rtype: tuple
965     @return: tuple of (current, max)
966
967     """
968     return self._UnlockedGetMasterCandidateStats()
969
970   @locking.ssynchronized(_config_lock)
971   def MaintainCandidatePool(self):
972     """Try to grow the candidate pool to the desired size.
973
974     @rtype: list
975     @return: list with the adjusted nodes (L{objects.Node} instances)
976
977     """
978     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
979     mod_list = []
980     if mc_now < mc_max:
981       node_list = self._config_data.nodes.keys()
982       random.shuffle(node_list)
983       for name in node_list:
984         if mc_now >= mc_max:
985           break
986         node = self._config_data.nodes[name]
987         if node.master_candidate or node.offline or node.drained:
988           continue
989         mod_list.append(node)
990         node.master_candidate = True
991         node.serial_no += 1
992         mc_now += 1
993       if mc_now != mc_max:
994         # this should not happen
995         logging.warning("Warning: MaintainCandidatePool didn't manage to"
996                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
997       if mod_list:
998         self._config_data.cluster.serial_no += 1
999         self._WriteConfig()
1000
1001     return mod_list
1002
1003   def _BumpSerialNo(self):
1004     """Bump up the serial number of the config.
1005
1006     """
1007     self._config_data.serial_no += 1
1008
1009   def _OpenConfig(self):
1010     """Read the config data from disk.
1011
1012     """
1013     f = open(self._cfg_file, 'r')
1014     try:
1015       try:
1016         data = objects.ConfigData.FromDict(serializer.Load(f.read()))
1017       except Exception, err:
1018         raise errors.ConfigurationError(err)
1019     finally:
1020       f.close()
1021
1022     # Make sure the configuration has the right version
1023     _ValidateConfig(data)
1024
1025     if (not hasattr(data, 'cluster') or
1026         not hasattr(data.cluster, 'rsahostkeypub')):
1027       raise errors.ConfigurationError("Incomplete configuration"
1028                                       " (missing cluster.rsahostkeypub)")
1029     self._config_data = data
1030     # reset the last serial as -1 so that the next write will cause
1031     # ssconf update
1032     self._last_cluster_serial = -1
1033
1034   def _DistributeConfig(self):
1035     """Distribute the configuration to the other nodes.
1036
1037     Currently, this only copies the configuration file. In the future,
1038     it could be used to encapsulate the 2/3-phase update mechanism.
1039
1040     """
1041     if self._offline:
1042       return True
1043     bad = False
1044
1045     node_list = []
1046     addr_list = []
1047     myhostname = self._my_hostname
1048     # we can skip checking whether _UnlockedGetNodeInfo returns None
1049     # since the node list comes from _UnlocketGetNodeList, and we are
1050     # called with the lock held, so no modifications should take place
1051     # in between
1052     for node_name in self._UnlockedGetNodeList():
1053       if node_name == myhostname:
1054         continue
1055       node_info = self._UnlockedGetNodeInfo(node_name)
1056       if not node_info.master_candidate:
1057         continue
1058       node_list.append(node_info.name)
1059       addr_list.append(node_info.primary_ip)
1060
1061     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1062                                             address_list=addr_list)
1063     for node in node_list:
1064       if not result[node]:
1065         logging.error("copy of file %s to node %s failed",
1066                       self._cfg_file, node)
1067         bad = True
1068     return not bad
1069
1070   def _WriteConfig(self, destination=None):
1071     """Write the configuration data to persistent storage.
1072
1073     """
1074     config_errors = self._UnlockedVerifyConfig()
1075     if config_errors:
1076       raise errors.ConfigurationError("Configuration data is not"
1077                                       " consistent: %s" %
1078                                       (", ".join(config_errors)))
1079     if destination is None:
1080       destination = self._cfg_file
1081     self._BumpSerialNo()
1082     txt = serializer.Dump(self._config_data.ToDict())
1083     dir_name, file_name = os.path.split(destination)
1084     fd, name = tempfile.mkstemp('.newconfig', file_name, dir_name)
1085     f = os.fdopen(fd, 'w')
1086     try:
1087       f.write(txt)
1088       os.fsync(f.fileno())
1089     finally:
1090       f.close()
1091     # we don't need to do os.close(fd) as f.close() did it
1092     os.rename(name, destination)
1093     self.write_count += 1
1094
1095     # and redistribute the config file to master candidates
1096     self._DistributeConfig()
1097
1098     # Write ssconf files on all nodes (including locally)
1099     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1100       if not self._offline:
1101         rpc.RpcRunner.call_write_ssconf_files(self._UnlockedGetNodeList(),
1102                                               self._UnlockedGetSsconfValues())
1103       self._last_cluster_serial = self._config_data.cluster.serial_no
1104
1105   def _UnlockedGetSsconfValues(self):
1106     """Return the values needed by ssconf.
1107
1108     @rtype: dict
1109     @return: a dictionary with keys the ssconf names and values their
1110         associated value
1111
1112     """
1113     fn = "\n".join
1114     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1115     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1116     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1117
1118     instance_data = fn(instance_names)
1119     off_data = fn(node.name for node in node_info if node.offline)
1120     on_data = fn(node.name for node in node_info if not node.offline)
1121     mc_data = fn(node.name for node in node_info if node.master_candidate)
1122     node_data = fn(node_names)
1123
1124     cluster = self._config_data.cluster
1125     return {
1126       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1127       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1128       constants.SS_MASTER_CANDIDATES: mc_data,
1129       constants.SS_MASTER_IP: cluster.master_ip,
1130       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1131       constants.SS_MASTER_NODE: cluster.master_node,
1132       constants.SS_NODE_LIST: node_data,
1133       constants.SS_OFFLINE_NODES: off_data,
1134       constants.SS_ONLINE_NODES: on_data,
1135       constants.SS_INSTANCE_LIST: instance_data,
1136       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1137       }
1138
1139   @locking.ssynchronized(_config_lock)
1140   def InitConfig(self, version, cluster_config, master_node_config):
1141     """Create the initial cluster configuration.
1142
1143     It will contain the current node, which will also be the master
1144     node, and no instances.
1145
1146     @type version: int
1147     @param version: Configuration version
1148     @type cluster_config: objects.Cluster
1149     @param cluster_config: Cluster configuration
1150     @type master_node_config: objects.Node
1151     @param master_node_config: Master node configuration
1152
1153     """
1154     nodes = {
1155       master_node_config.name: master_node_config,
1156       }
1157
1158     self._config_data = objects.ConfigData(version=version,
1159                                            cluster=cluster_config,
1160                                            nodes=nodes,
1161                                            instances={},
1162                                            serial_no=1)
1163     self._WriteConfig()
1164
1165   @locking.ssynchronized(_config_lock, shared=1)
1166   def GetVGName(self):
1167     """Return the volume group name.
1168
1169     """
1170     return self._config_data.cluster.volume_group_name
1171
1172   @locking.ssynchronized(_config_lock)
1173   def SetVGName(self, vg_name):
1174     """Set the volume group name.
1175
1176     """
1177     self._config_data.cluster.volume_group_name = vg_name
1178     self._config_data.cluster.serial_no += 1
1179     self._WriteConfig()
1180
1181   @locking.ssynchronized(_config_lock, shared=1)
1182   def GetDefBridge(self):
1183     """Return the default bridge.
1184
1185     """
1186     return self._config_data.cluster.default_bridge
1187
1188   @locking.ssynchronized(_config_lock, shared=1)
1189   def GetMACPrefix(self):
1190     """Return the mac prefix.
1191
1192     """
1193     return self._config_data.cluster.mac_prefix
1194
1195   @locking.ssynchronized(_config_lock, shared=1)
1196   def GetClusterInfo(self):
1197     """Returns informations about the cluster
1198
1199     @rtype: L{objects.Cluster}
1200     @return: the cluster object
1201
1202     """
1203     return self._config_data.cluster
1204
1205   @locking.ssynchronized(_config_lock)
1206   def Update(self, target):
1207     """Notify function to be called after updates.
1208
1209     This function must be called when an object (as returned by
1210     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1211     caller wants the modifications saved to the backing store. Note
1212     that all modified objects will be saved, but the target argument
1213     is the one the caller wants to ensure that it's saved.
1214
1215     @param target: an instance of either L{objects.Cluster},
1216         L{objects.Node} or L{objects.Instance} which is existing in
1217         the cluster
1218
1219     """
1220     if self._config_data is None:
1221       raise errors.ProgrammerError("Configuration file not read,"
1222                                    " cannot save.")
1223     update_serial = False
1224     if isinstance(target, objects.Cluster):
1225       test = target == self._config_data.cluster
1226     elif isinstance(target, objects.Node):
1227       test = target in self._config_data.nodes.values()
1228       update_serial = True
1229     elif isinstance(target, objects.Instance):
1230       test = target in self._config_data.instances.values()
1231     else:
1232       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1233                                    " ConfigWriter.Update" % type(target))
1234     if not test:
1235       raise errors.ConfigurationError("Configuration updated since object"
1236                                       " has been read or unknown object")
1237     target.serial_no += 1
1238
1239     if update_serial:
1240       # for node updates, we need to increase the cluster serial too
1241       self._config_data.cluster.serial_no += 1
1242
1243     if isinstance(target, objects.Instance):
1244       self._UnlockedReleaseDRBDMinors(target.name)
1245       for nic in target.nics:
1246         self._temporary_macs.discard(nic.mac)
1247
1248     self._WriteConfig()