Simplify handling of regular fields in LUQuery*
[ganeti-local] / lib / config.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Configuration management for Ganeti
23
24 This module provides the interface to the Ganeti cluster configuration.
25
26 The configuration data is stored on every node but is updated on the master
27 only. After each update, the master distributes the data to the other nodes.
28
29 Currently, the data storage format is JSON. YAML was slow and consuming too
30 much memory.
31
32 """
33
34 import os
35 import tempfile
36 import random
37 import logging
38 import time
39
40 from ganeti import errors
41 from ganeti import locking
42 from ganeti import utils
43 from ganeti import constants
44 from ganeti import rpc
45 from ganeti import objects
46 from ganeti import serializer
47
48
49 _config_lock = locking.SharedLock()
50
51
52 def _ValidateConfig(data):
53   """Verifies that a configuration objects looks valid.
54
55   This only verifies the version of the configuration.
56
57   @raise errors.ConfigurationError: if the version differs from what
58       we expect
59
60   """
61   if data.version != constants.CONFIG_VERSION:
62     raise errors.ConfigurationError("Cluster configuration version"
63                                     " mismatch, got %s instead of %s" %
64                                     (data.version,
65                                      constants.CONFIG_VERSION))
66
67
68 class ConfigWriter:
69   """The interface to the cluster configuration.
70
71   """
72   def __init__(self, cfg_file=None, offline=False):
73     self.write_count = 0
74     self._lock = _config_lock
75     self._config_data = None
76     self._offline = offline
77     if cfg_file is None:
78       self._cfg_file = constants.CLUSTER_CONF_FILE
79     else:
80       self._cfg_file = cfg_file
81     self._temporary_ids = set()
82     self._temporary_drbds = {}
83     self._temporary_macs = set()
84     # Note: in order to prevent errors when resolving our name in
85     # _DistributeConfig, we compute it here once and reuse it; it's
86     # better to raise an error before starting to modify the config
87     # file than after it was modified
88     self._my_hostname = utils.HostInfo().name
89     self._last_cluster_serial = -1
90     self._OpenConfig()
91
92   # this method needs to be static, so that we can call it on the class
93   @staticmethod
94   def IsCluster():
95     """Check if the cluster is configured.
96
97     """
98     return os.path.exists(constants.CLUSTER_CONF_FILE)
99
100   @locking.ssynchronized(_config_lock, shared=1)
101   def GenerateMAC(self):
102     """Generate a MAC for an instance.
103
104     This should check the current instances for duplicates.
105
106     """
107     prefix = self._config_data.cluster.mac_prefix
108     all_macs = self._AllMACs()
109     retries = 64
110     while retries > 0:
111       byte1 = random.randrange(0, 256)
112       byte2 = random.randrange(0, 256)
113       byte3 = random.randrange(0, 256)
114       mac = "%s:%02x:%02x:%02x" % (prefix, byte1, byte2, byte3)
115       if mac not in all_macs and mac not in self._temporary_macs:
116         break
117       retries -= 1
118     else:
119       raise errors.ConfigurationError("Can't generate unique MAC")
120     self._temporary_macs.add(mac)
121     return mac
122
123   @locking.ssynchronized(_config_lock, shared=1)
124   def IsMacInUse(self, mac):
125     """Predicate: check if the specified MAC is in use in the Ganeti cluster.
126
127     This only checks instances managed by this cluster, it does not
128     check for potential collisions elsewhere.
129
130     """
131     all_macs = self._AllMACs()
132     return mac in all_macs or mac in self._temporary_macs
133
134   @locking.ssynchronized(_config_lock, shared=1)
135   def GenerateDRBDSecret(self):
136     """Generate a DRBD secret.
137
138     This checks the current disks for duplicates.
139
140     """
141     all_secrets = self._AllDRBDSecrets()
142     retries = 64
143     while retries > 0:
144       secret = utils.GenerateSecret()
145       if secret not in all_secrets:
146         break
147       retries -= 1
148     else:
149       raise errors.ConfigurationError("Can't generate unique DRBD secret")
150     return secret
151
152   def _AllLVs(self):
153     """Compute the list of all LVs.
154
155     """
156     lvnames = set()
157     for instance in self._config_data.instances.values():
158       node_data = instance.MapLVsByNode()
159       for lv_list in node_data.values():
160         lvnames.update(lv_list)
161     return lvnames
162
163   def _AllIDs(self, include_temporary):
164     """Compute the list of all UUIDs and names we have.
165
166     @type include_temporary: boolean
167     @param include_temporary: whether to include the _temporary_ids set
168     @rtype: set
169     @return: a set of IDs
170
171     """
172     existing = set()
173     if include_temporary:
174       existing.update(self._temporary_ids)
175     existing.update(self._AllLVs())
176     existing.update(self._config_data.instances.keys())
177     existing.update(self._config_data.nodes.keys())
178     existing.update([i.uuid for i in self._AllUUIDObjects() if i.uuid])
179     return existing
180
181   @locking.ssynchronized(_config_lock, shared=1)
182   def GenerateUniqueID(self, exceptions=None):
183     """Generate an unique disk name.
184
185     This checks the current node, instances and disk names for
186     duplicates.
187
188     @param exceptions: a list with some other names which should be checked
189         for uniqueness (used for example when you want to get
190         more than one id at one time without adding each one in
191         turn to the config file)
192
193     @rtype: string
194     @return: the unique id
195
196     """
197     existing = self._AllIDs(include_temporary=True)
198     if exceptions is not None:
199       existing.update(exceptions)
200     retries = 64
201     while retries > 0:
202       unique_id = utils.NewUUID()
203       if unique_id not in existing and unique_id is not None:
204         break
205     else:
206       raise errors.ConfigurationError("Not able generate an unique ID"
207                                       " (last tried ID: %s" % unique_id)
208     self._temporary_ids.add(unique_id)
209     return unique_id
210
211   def _CleanupTemporaryIDs(self):
212     """Cleanups the _temporary_ids structure.
213
214     """
215     existing = self._AllIDs(include_temporary=False)
216     self._temporary_ids = self._temporary_ids - existing
217
218   def _AllMACs(self):
219     """Return all MACs present in the config.
220
221     @rtype: list
222     @return: the list of all MACs
223
224     """
225     result = []
226     for instance in self._config_data.instances.values():
227       for nic in instance.nics:
228         result.append(nic.mac)
229
230     return result
231
232   def _AllDRBDSecrets(self):
233     """Return all DRBD secrets present in the config.
234
235     @rtype: list
236     @return: the list of all DRBD secrets
237
238     """
239     def helper(disk, result):
240       """Recursively gather secrets from this disk."""
241       if disk.dev_type == constants.DT_DRBD8:
242         result.append(disk.logical_id[5])
243       if disk.children:
244         for child in disk.children:
245           helper(child, result)
246
247     result = []
248     for instance in self._config_data.instances.values():
249       for disk in instance.disks:
250         helper(disk, result)
251
252     return result
253
254   def _CheckDiskIDs(self, disk, l_ids, p_ids):
255     """Compute duplicate disk IDs
256
257     @type disk: L{objects.Disk}
258     @param disk: the disk at which to start searching
259     @type l_ids: list
260     @param l_ids: list of current logical ids
261     @type p_ids: list
262     @param p_ids: list of current physical ids
263     @rtype: list
264     @return: a list of error messages
265
266     """
267     result = []
268     if disk.logical_id is not None:
269       if disk.logical_id in l_ids:
270         result.append("duplicate logical id %s" % str(disk.logical_id))
271       else:
272         l_ids.append(disk.logical_id)
273     if disk.physical_id is not None:
274       if disk.physical_id in p_ids:
275         result.append("duplicate physical id %s" % str(disk.physical_id))
276       else:
277         p_ids.append(disk.physical_id)
278
279     if disk.children:
280       for child in disk.children:
281         result.extend(self._CheckDiskIDs(child, l_ids, p_ids))
282     return result
283
284   def _UnlockedVerifyConfig(self):
285     """Verify function.
286
287     @rtype: list
288     @return: a list of error messages; a non-empty list signifies
289         configuration errors
290
291     """
292     result = []
293     seen_macs = []
294     ports = {}
295     data = self._config_data
296     seen_lids = []
297     seen_pids = []
298
299     # global cluster checks
300     if not data.cluster.enabled_hypervisors:
301       result.append("enabled hypervisors list doesn't have any entries")
302     invalid_hvs = set(data.cluster.enabled_hypervisors) - constants.HYPER_TYPES
303     if invalid_hvs:
304       result.append("enabled hypervisors contains invalid entries: %s" %
305                     invalid_hvs)
306
307     if data.cluster.master_node not in data.nodes:
308       result.append("cluster has invalid primary node '%s'" %
309                     data.cluster.master_node)
310
311     # per-instance checks
312     for instance_name in data.instances:
313       instance = data.instances[instance_name]
314       if instance.primary_node not in data.nodes:
315         result.append("instance '%s' has invalid primary node '%s'" %
316                       (instance_name, instance.primary_node))
317       for snode in instance.secondary_nodes:
318         if snode not in data.nodes:
319           result.append("instance '%s' has invalid secondary node '%s'" %
320                         (instance_name, snode))
321       for idx, nic in enumerate(instance.nics):
322         if nic.mac in seen_macs:
323           result.append("instance '%s' has NIC %d mac %s duplicate" %
324                         (instance_name, idx, nic.mac))
325         else:
326           seen_macs.append(nic.mac)
327
328       # gather the drbd ports for duplicate checks
329       for dsk in instance.disks:
330         if dsk.dev_type in constants.LDS_DRBD:
331           tcp_port = dsk.logical_id[2]
332           if tcp_port not in ports:
333             ports[tcp_port] = []
334           ports[tcp_port].append((instance.name, "drbd disk %s" % dsk.iv_name))
335       # gather network port reservation
336       net_port = getattr(instance, "network_port", None)
337       if net_port is not None:
338         if net_port not in ports:
339           ports[net_port] = []
340         ports[net_port].append((instance.name, "network port"))
341
342       # instance disk verify
343       for idx, disk in enumerate(instance.disks):
344         result.extend(["instance '%s' disk %d error: %s" %
345                        (instance.name, idx, msg) for msg in disk.Verify()])
346         result.extend(self._CheckDiskIDs(disk, seen_lids, seen_pids))
347
348     # cluster-wide pool of free ports
349     for free_port in data.cluster.tcpudp_port_pool:
350       if free_port not in ports:
351         ports[free_port] = []
352       ports[free_port].append(("cluster", "port marked as free"))
353
354     # compute tcp/udp duplicate ports
355     keys = ports.keys()
356     keys.sort()
357     for pnum in keys:
358       pdata = ports[pnum]
359       if len(pdata) > 1:
360         txt = ", ".join(["%s/%s" % val for val in pdata])
361         result.append("tcp/udp port %s has duplicates: %s" % (pnum, txt))
362
363     # highest used tcp port check
364     if keys:
365       if keys[-1] > data.cluster.highest_used_port:
366         result.append("Highest used port mismatch, saved %s, computed %s" %
367                       (data.cluster.highest_used_port, keys[-1]))
368
369     if not data.nodes[data.cluster.master_node].master_candidate:
370       result.append("Master node is not a master candidate")
371
372     # master candidate checks
373     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
374     if mc_now < mc_max:
375       result.append("Not enough master candidates: actual %d, target %d" %
376                     (mc_now, mc_max))
377
378     # node checks
379     for node in data.nodes.values():
380       if [node.master_candidate, node.drained, node.offline].count(True) > 1:
381         result.append("Node %s state is invalid: master_candidate=%s,"
382                       " drain=%s, offline=%s" %
383                       (node.name, node.master_candidate, node.drain,
384                        node.offline))
385
386     # drbd minors check
387     d_map, duplicates = self._UnlockedComputeDRBDMap()
388     for node, minor, instance_a, instance_b in duplicates:
389       result.append("DRBD minor %d on node %s is assigned twice to instances"
390                     " %s and %s" % (minor, node, instance_a, instance_b))
391
392     return result
393
394   @locking.ssynchronized(_config_lock, shared=1)
395   def VerifyConfig(self):
396     """Verify function.
397
398     This is just a wrapper over L{_UnlockedVerifyConfig}.
399
400     @rtype: list
401     @return: a list of error messages; a non-empty list signifies
402         configuration errors
403
404     """
405     return self._UnlockedVerifyConfig()
406
407   def _UnlockedSetDiskID(self, disk, node_name):
408     """Convert the unique ID to the ID needed on the target nodes.
409
410     This is used only for drbd, which needs ip/port configuration.
411
412     The routine descends down and updates its children also, because
413     this helps when the only the top device is passed to the remote
414     node.
415
416     This function is for internal use, when the config lock is already held.
417
418     """
419     if disk.children:
420       for child in disk.children:
421         self._UnlockedSetDiskID(child, node_name)
422
423     if disk.logical_id is None and disk.physical_id is not None:
424       return
425     if disk.dev_type == constants.LD_DRBD8:
426       pnode, snode, port, pminor, sminor, secret = disk.logical_id
427       if node_name not in (pnode, snode):
428         raise errors.ConfigurationError("DRBD device not knowing node %s" %
429                                         node_name)
430       pnode_info = self._UnlockedGetNodeInfo(pnode)
431       snode_info = self._UnlockedGetNodeInfo(snode)
432       if pnode_info is None or snode_info is None:
433         raise errors.ConfigurationError("Can't find primary or secondary node"
434                                         " for %s" % str(disk))
435       p_data = (pnode_info.secondary_ip, port)
436       s_data = (snode_info.secondary_ip, port)
437       if pnode == node_name:
438         disk.physical_id = p_data + s_data + (pminor, secret)
439       else: # it must be secondary, we tested above
440         disk.physical_id = s_data + p_data + (sminor, secret)
441     else:
442       disk.physical_id = disk.logical_id
443     return
444
445   @locking.ssynchronized(_config_lock)
446   def SetDiskID(self, disk, node_name):
447     """Convert the unique ID to the ID needed on the target nodes.
448
449     This is used only for drbd, which needs ip/port configuration.
450
451     The routine descends down and updates its children also, because
452     this helps when the only the top device is passed to the remote
453     node.
454
455     """
456     return self._UnlockedSetDiskID(disk, node_name)
457
458   @locking.ssynchronized(_config_lock)
459   def AddTcpUdpPort(self, port):
460     """Adds a new port to the available port pool.
461
462     """
463     if not isinstance(port, int):
464       raise errors.ProgrammerError("Invalid type passed for port")
465
466     self._config_data.cluster.tcpudp_port_pool.add(port)
467     self._WriteConfig()
468
469   @locking.ssynchronized(_config_lock, shared=1)
470   def GetPortList(self):
471     """Returns a copy of the current port list.
472
473     """
474     return self._config_data.cluster.tcpudp_port_pool.copy()
475
476   @locking.ssynchronized(_config_lock)
477   def AllocatePort(self):
478     """Allocate a port.
479
480     The port will be taken from the available port pool or from the
481     default port range (and in this case we increase
482     highest_used_port).
483
484     """
485     # If there are TCP/IP ports configured, we use them first.
486     if self._config_data.cluster.tcpudp_port_pool:
487       port = self._config_data.cluster.tcpudp_port_pool.pop()
488     else:
489       port = self._config_data.cluster.highest_used_port + 1
490       if port >= constants.LAST_DRBD_PORT:
491         raise errors.ConfigurationError("The highest used port is greater"
492                                         " than %s. Aborting." %
493                                         constants.LAST_DRBD_PORT)
494       self._config_data.cluster.highest_used_port = port
495
496     self._WriteConfig()
497     return port
498
499   def _UnlockedComputeDRBDMap(self):
500     """Compute the used DRBD minor/nodes.
501
502     @rtype: (dict, list)
503     @return: dictionary of node_name: dict of minor: instance_name;
504         the returned dict will have all the nodes in it (even if with
505         an empty list), and a list of duplicates; if the duplicates
506         list is not empty, the configuration is corrupted and its caller
507         should raise an exception
508
509     """
510     def _AppendUsedPorts(instance_name, disk, used):
511       duplicates = []
512       if disk.dev_type == constants.LD_DRBD8 and len(disk.logical_id) >= 5:
513         node_a, node_b, _, minor_a, minor_b = disk.logical_id[:5]
514         for node, port in ((node_a, minor_a), (node_b, minor_b)):
515           assert node in used, ("Node '%s' of instance '%s' not found"
516                                 " in node list" % (node, instance_name))
517           if port in used[node]:
518             duplicates.append((node, port, instance_name, used[node][port]))
519           else:
520             used[node][port] = instance_name
521       if disk.children:
522         for child in disk.children:
523           duplicates.extend(_AppendUsedPorts(instance_name, child, used))
524       return duplicates
525
526     duplicates = []
527     my_dict = dict((node, {}) for node in self._config_data.nodes)
528     for instance in self._config_data.instances.itervalues():
529       for disk in instance.disks:
530         duplicates.extend(_AppendUsedPorts(instance.name, disk, my_dict))
531     for (node, minor), instance in self._temporary_drbds.iteritems():
532       if minor in my_dict[node] and my_dict[node][minor] != instance:
533         duplicates.append((node, minor, instance, my_dict[node][minor]))
534       else:
535         my_dict[node][minor] = instance
536     return my_dict, duplicates
537
538   @locking.ssynchronized(_config_lock)
539   def ComputeDRBDMap(self):
540     """Compute the used DRBD minor/nodes.
541
542     This is just a wrapper over L{_UnlockedComputeDRBDMap}.
543
544     @return: dictionary of node_name: dict of minor: instance_name;
545         the returned dict will have all the nodes in it (even if with
546         an empty list).
547
548     """
549     d_map, duplicates = self._UnlockedComputeDRBDMap()
550     if duplicates:
551       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
552                                       str(duplicates))
553     return d_map
554
555   @locking.ssynchronized(_config_lock)
556   def AllocateDRBDMinor(self, nodes, instance):
557     """Allocate a drbd minor.
558
559     The free minor will be automatically computed from the existing
560     devices. A node can be given multiple times in order to allocate
561     multiple minors. The result is the list of minors, in the same
562     order as the passed nodes.
563
564     @type instance: string
565     @param instance: the instance for which we allocate minors
566
567     """
568     assert isinstance(instance, basestring), \
569            "Invalid argument '%s' passed to AllocateDRBDMinor" % instance
570
571     d_map, duplicates = self._UnlockedComputeDRBDMap()
572     if duplicates:
573       raise errors.ConfigurationError("Duplicate DRBD ports detected: %s" %
574                                       str(duplicates))
575     result = []
576     for nname in nodes:
577       ndata = d_map[nname]
578       if not ndata:
579         # no minors used, we can start at 0
580         result.append(0)
581         ndata[0] = instance
582         self._temporary_drbds[(nname, 0)] = instance
583         continue
584       keys = ndata.keys()
585       keys.sort()
586       ffree = utils.FirstFree(keys)
587       if ffree is None:
588         # return the next minor
589         # TODO: implement high-limit check
590         minor = keys[-1] + 1
591       else:
592         minor = ffree
593       # double-check minor against current instances
594       assert minor not in d_map[nname], \
595              ("Attempt to reuse allocated DRBD minor %d on node %s,"
596               " already allocated to instance %s" %
597               (minor, nname, d_map[nname][minor]))
598       ndata[minor] = instance
599       # double-check minor against reservation
600       r_key = (nname, minor)
601       assert r_key not in self._temporary_drbds, \
602              ("Attempt to reuse reserved DRBD minor %d on node %s,"
603               " reserved for instance %s" %
604               (minor, nname, self._temporary_drbds[r_key]))
605       self._temporary_drbds[r_key] = instance
606       result.append(minor)
607     logging.debug("Request to allocate drbd minors, input: %s, returning %s",
608                   nodes, result)
609     return result
610
611   def _UnlockedReleaseDRBDMinors(self, instance):
612     """Release temporary drbd minors allocated for a given instance.
613
614     @type instance: string
615     @param instance: the instance for which temporary minors should be
616                      released
617
618     """
619     assert isinstance(instance, basestring), \
620            "Invalid argument passed to ReleaseDRBDMinors"
621     for key, name in self._temporary_drbds.items():
622       if name == instance:
623         del self._temporary_drbds[key]
624
625   @locking.ssynchronized(_config_lock)
626   def ReleaseDRBDMinors(self, instance):
627     """Release temporary drbd minors allocated for a given instance.
628
629     This should be called on the error paths, on the success paths
630     it's automatically called by the ConfigWriter add and update
631     functions.
632
633     This function is just a wrapper over L{_UnlockedReleaseDRBDMinors}.
634
635     @type instance: string
636     @param instance: the instance for which temporary minors should be
637                      released
638
639     """
640     self._UnlockedReleaseDRBDMinors(instance)
641
642   @locking.ssynchronized(_config_lock, shared=1)
643   def GetConfigVersion(self):
644     """Get the configuration version.
645
646     @return: Config version
647
648     """
649     return self._config_data.version
650
651   @locking.ssynchronized(_config_lock, shared=1)
652   def GetClusterName(self):
653     """Get cluster name.
654
655     @return: Cluster name
656
657     """
658     return self._config_data.cluster.cluster_name
659
660   @locking.ssynchronized(_config_lock, shared=1)
661   def GetMasterNode(self):
662     """Get the hostname of the master node for this cluster.
663
664     @return: Master hostname
665
666     """
667     return self._config_data.cluster.master_node
668
669   @locking.ssynchronized(_config_lock, shared=1)
670   def GetMasterIP(self):
671     """Get the IP of the master node for this cluster.
672
673     @return: Master IP
674
675     """
676     return self._config_data.cluster.master_ip
677
678   @locking.ssynchronized(_config_lock, shared=1)
679   def GetMasterNetdev(self):
680     """Get the master network device for this cluster.
681
682     """
683     return self._config_data.cluster.master_netdev
684
685   @locking.ssynchronized(_config_lock, shared=1)
686   def GetFileStorageDir(self):
687     """Get the file storage dir for this cluster.
688
689     """
690     return self._config_data.cluster.file_storage_dir
691
692   @locking.ssynchronized(_config_lock, shared=1)
693   def GetHypervisorType(self):
694     """Get the hypervisor type for this cluster.
695
696     """
697     return self._config_data.cluster.enabled_hypervisors[0]
698
699   @locking.ssynchronized(_config_lock, shared=1)
700   def GetHostKey(self):
701     """Return the rsa hostkey from the config.
702
703     @rtype: string
704     @return: the rsa hostkey
705
706     """
707     return self._config_data.cluster.rsahostkeypub
708
709   @locking.ssynchronized(_config_lock)
710   def AddInstance(self, instance):
711     """Add an instance to the config.
712
713     This should be used after creating a new instance.
714
715     @type instance: L{objects.Instance}
716     @param instance: the instance object
717
718     """
719     if not isinstance(instance, objects.Instance):
720       raise errors.ProgrammerError("Invalid type passed to AddInstance")
721
722     if instance.disk_template != constants.DT_DISKLESS:
723       all_lvs = instance.MapLVsByNode()
724       logging.info("Instance '%s' DISK_LAYOUT: %s", instance.name, all_lvs)
725
726     all_macs = self._AllMACs()
727     for nic in instance.nics:
728       if nic.mac in all_macs:
729         raise errors.ConfigurationError("Cannot add instance %s:"
730           " MAC address '%s' already in use." % (instance.name, nic.mac))
731
732     instance.serial_no = 1
733     instance.ctime = instance.mtime = time.time()
734     self._config_data.instances[instance.name] = instance
735     self._config_data.cluster.serial_no += 1
736     self._UnlockedReleaseDRBDMinors(instance.name)
737     for nic in instance.nics:
738       self._temporary_macs.discard(nic.mac)
739     self._WriteConfig()
740
741   def _SetInstanceStatus(self, instance_name, status):
742     """Set the instance's status to a given value.
743
744     """
745     assert isinstance(status, bool), \
746            "Invalid status '%s' passed to SetInstanceStatus" % (status,)
747
748     if instance_name not in self._config_data.instances:
749       raise errors.ConfigurationError("Unknown instance '%s'" %
750                                       instance_name)
751     instance = self._config_data.instances[instance_name]
752     if instance.admin_up != status:
753       instance.admin_up = status
754       instance.serial_no += 1
755       instance.mtime = time.time()
756       self._WriteConfig()
757
758   @locking.ssynchronized(_config_lock)
759   def MarkInstanceUp(self, instance_name):
760     """Mark the instance status to up in the config.
761
762     """
763     self._SetInstanceStatus(instance_name, True)
764
765   @locking.ssynchronized(_config_lock)
766   def RemoveInstance(self, instance_name):
767     """Remove the instance from the configuration.
768
769     """
770     if instance_name not in self._config_data.instances:
771       raise errors.ConfigurationError("Unknown instance '%s'" % instance_name)
772     del self._config_data.instances[instance_name]
773     self._config_data.cluster.serial_no += 1
774     self._WriteConfig()
775
776   @locking.ssynchronized(_config_lock)
777   def RenameInstance(self, old_name, new_name):
778     """Rename an instance.
779
780     This needs to be done in ConfigWriter and not by RemoveInstance
781     combined with AddInstance as only we can guarantee an atomic
782     rename.
783
784     """
785     if old_name not in self._config_data.instances:
786       raise errors.ConfigurationError("Unknown instance '%s'" % old_name)
787     inst = self._config_data.instances[old_name]
788     del self._config_data.instances[old_name]
789     inst.name = new_name
790
791     for disk in inst.disks:
792       if disk.dev_type == constants.LD_FILE:
793         # rename the file paths in logical and physical id
794         file_storage_dir = os.path.dirname(os.path.dirname(disk.logical_id[1]))
795         disk.physical_id = disk.logical_id = (disk.logical_id[0],
796                                               os.path.join(file_storage_dir,
797                                                            inst.name,
798                                                            disk.iv_name))
799
800     self._config_data.instances[inst.name] = inst
801     self._WriteConfig()
802
803   @locking.ssynchronized(_config_lock)
804   def MarkInstanceDown(self, instance_name):
805     """Mark the status of an instance to down in the configuration.
806
807     """
808     self._SetInstanceStatus(instance_name, False)
809
810   def _UnlockedGetInstanceList(self):
811     """Get the list of instances.
812
813     This function is for internal use, when the config lock is already held.
814
815     """
816     return self._config_data.instances.keys()
817
818   @locking.ssynchronized(_config_lock, shared=1)
819   def GetInstanceList(self):
820     """Get the list of instances.
821
822     @return: array of instances, ex. ['instance2.example.com',
823         'instance1.example.com']
824
825     """
826     return self._UnlockedGetInstanceList()
827
828   @locking.ssynchronized(_config_lock, shared=1)
829   def ExpandInstanceName(self, short_name):
830     """Attempt to expand an incomplete instance name.
831
832     """
833     return utils.MatchNameComponent(short_name,
834                                     self._config_data.instances.keys())
835
836   def _UnlockedGetInstanceInfo(self, instance_name):
837     """Returns information about an instance.
838
839     This function is for internal use, when the config lock is already held.
840
841     """
842     if instance_name not in self._config_data.instances:
843       return None
844
845     return self._config_data.instances[instance_name]
846
847   @locking.ssynchronized(_config_lock, shared=1)
848   def GetInstanceInfo(self, instance_name):
849     """Returns information about an instance.
850
851     It takes the information from the configuration file. Other information of
852     an instance are taken from the live systems.
853
854     @param instance_name: name of the instance, e.g.
855         I{instance1.example.com}
856
857     @rtype: L{objects.Instance}
858     @return: the instance object
859
860     """
861     return self._UnlockedGetInstanceInfo(instance_name)
862
863   @locking.ssynchronized(_config_lock, shared=1)
864   def GetAllInstancesInfo(self):
865     """Get the configuration of all instances.
866
867     @rtype: dict
868     @return: dict of (instance, instance_info), where instance_info is what
869               would GetInstanceInfo return for the node
870
871     """
872     my_dict = dict([(instance, self._UnlockedGetInstanceInfo(instance))
873                     for instance in self._UnlockedGetInstanceList()])
874     return my_dict
875
876   @locking.ssynchronized(_config_lock)
877   def AddNode(self, node):
878     """Add a node to the configuration.
879
880     @type node: L{objects.Node}
881     @param node: a Node instance
882
883     """
884     logging.info("Adding node %s to configuration" % node.name)
885
886     node.serial_no = 1
887     node.ctime = node.mtime = time.time()
888     self._config_data.nodes[node.name] = node
889     self._config_data.cluster.serial_no += 1
890     self._WriteConfig()
891
892   @locking.ssynchronized(_config_lock)
893   def RemoveNode(self, node_name):
894     """Remove a node from the configuration.
895
896     """
897     logging.info("Removing node %s from configuration" % node_name)
898
899     if node_name not in self._config_data.nodes:
900       raise errors.ConfigurationError("Unknown node '%s'" % node_name)
901
902     del self._config_data.nodes[node_name]
903     self._config_data.cluster.serial_no += 1
904     self._WriteConfig()
905
906   @locking.ssynchronized(_config_lock, shared=1)
907   def ExpandNodeName(self, short_name):
908     """Attempt to expand an incomplete instance name.
909
910     """
911     return utils.MatchNameComponent(short_name,
912                                     self._config_data.nodes.keys())
913
914   def _UnlockedGetNodeInfo(self, node_name):
915     """Get the configuration of a node, as stored in the config.
916
917     This function is for internal use, when the config lock is already
918     held.
919
920     @param node_name: the node name, e.g. I{node1.example.com}
921
922     @rtype: L{objects.Node}
923     @return: the node object
924
925     """
926     if node_name not in self._config_data.nodes:
927       return None
928
929     return self._config_data.nodes[node_name]
930
931
932   @locking.ssynchronized(_config_lock, shared=1)
933   def GetNodeInfo(self, node_name):
934     """Get the configuration of a node, as stored in the config.
935
936     This is just a locked wrapper over L{_UnlockedGetNodeInfo}.
937
938     @param node_name: the node name, e.g. I{node1.example.com}
939
940     @rtype: L{objects.Node}
941     @return: the node object
942
943     """
944     return self._UnlockedGetNodeInfo(node_name)
945
946   def _UnlockedGetNodeList(self):
947     """Return the list of nodes which are in the configuration.
948
949     This function is for internal use, when the config lock is already
950     held.
951
952     @rtype: list
953
954     """
955     return self._config_data.nodes.keys()
956
957
958   @locking.ssynchronized(_config_lock, shared=1)
959   def GetNodeList(self):
960     """Return the list of nodes which are in the configuration.
961
962     """
963     return self._UnlockedGetNodeList()
964
965   @locking.ssynchronized(_config_lock, shared=1)
966   def GetOnlineNodeList(self):
967     """Return the list of nodes which are online.
968
969     """
970     all_nodes = [self._UnlockedGetNodeInfo(node)
971                  for node in self._UnlockedGetNodeList()]
972     return [node.name for node in all_nodes if not node.offline]
973
974   @locking.ssynchronized(_config_lock, shared=1)
975   def GetAllNodesInfo(self):
976     """Get the configuration of all nodes.
977
978     @rtype: dict
979     @return: dict of (node, node_info), where node_info is what
980               would GetNodeInfo return for the node
981
982     """
983     my_dict = dict([(node, self._UnlockedGetNodeInfo(node))
984                     for node in self._UnlockedGetNodeList()])
985     return my_dict
986
987   def _UnlockedGetMasterCandidateStats(self, exceptions=None):
988     """Get the number of current and maximum desired and possible candidates.
989
990     @type exceptions: list
991     @param exceptions: if passed, list of nodes that should be ignored
992     @rtype: tuple
993     @return: tuple of (current, desired and possible)
994
995     """
996     mc_now = mc_max = 0
997     for node in self._config_data.nodes.values():
998       if exceptions and node.name in exceptions:
999         continue
1000       if not (node.offline or node.drained):
1001         mc_max += 1
1002       if node.master_candidate:
1003         mc_now += 1
1004     mc_max = min(mc_max, self._config_data.cluster.candidate_pool_size)
1005     return (mc_now, mc_max)
1006
1007   @locking.ssynchronized(_config_lock, shared=1)
1008   def GetMasterCandidateStats(self, exceptions=None):
1009     """Get the number of current and maximum possible candidates.
1010
1011     This is just a wrapper over L{_UnlockedGetMasterCandidateStats}.
1012
1013     @type exceptions: list
1014     @param exceptions: if passed, list of nodes that should be ignored
1015     @rtype: tuple
1016     @return: tuple of (current, max)
1017
1018     """
1019     return self._UnlockedGetMasterCandidateStats(exceptions)
1020
1021   @locking.ssynchronized(_config_lock)
1022   def MaintainCandidatePool(self):
1023     """Try to grow the candidate pool to the desired size.
1024
1025     @rtype: list
1026     @return: list with the adjusted nodes (L{objects.Node} instances)
1027
1028     """
1029     mc_now, mc_max = self._UnlockedGetMasterCandidateStats()
1030     mod_list = []
1031     if mc_now < mc_max:
1032       node_list = self._config_data.nodes.keys()
1033       random.shuffle(node_list)
1034       for name in node_list:
1035         if mc_now >= mc_max:
1036           break
1037         node = self._config_data.nodes[name]
1038         if node.master_candidate or node.offline or node.drained:
1039           continue
1040         mod_list.append(node)
1041         node.master_candidate = True
1042         node.serial_no += 1
1043         mc_now += 1
1044       if mc_now != mc_max:
1045         # this should not happen
1046         logging.warning("Warning: MaintainCandidatePool didn't manage to"
1047                         " fill the candidate pool (%d/%d)", mc_now, mc_max)
1048       if mod_list:
1049         self._config_data.cluster.serial_no += 1
1050         self._WriteConfig()
1051
1052     return mod_list
1053
1054   def _BumpSerialNo(self):
1055     """Bump up the serial number of the config.
1056
1057     """
1058     self._config_data.serial_no += 1
1059     self._config_data.mtime = time.time()
1060
1061   def _AllUUIDObjects(self):
1062     """Returns all objects with uuid attributes.
1063
1064     """
1065     return (self._config_data.instances.values() +
1066             self._config_data.nodes.values() +
1067             [self._config_data.cluster])
1068
1069   def _OpenConfig(self):
1070     """Read the config data from disk.
1071
1072     """
1073     raw_data = utils.ReadFile(self._cfg_file)
1074
1075     try:
1076       data = objects.ConfigData.FromDict(serializer.Load(raw_data))
1077     except Exception, err:
1078       raise errors.ConfigurationError(err)
1079
1080     # Make sure the configuration has the right version
1081     _ValidateConfig(data)
1082
1083     if (not hasattr(data, 'cluster') or
1084         not hasattr(data.cluster, 'rsahostkeypub')):
1085       raise errors.ConfigurationError("Incomplete configuration"
1086                                       " (missing cluster.rsahostkeypub)")
1087
1088     # Upgrade configuration if needed
1089     data.UpgradeConfig()
1090
1091     self._config_data = data
1092     # reset the last serial as -1 so that the next write will cause
1093     # ssconf update
1094     self._last_cluster_serial = -1
1095
1096     # And finally run our (custom) config upgrade sequence
1097     self._UpgradeConfig()
1098
1099   def _UpgradeConfig(self):
1100     """Run upgrade steps that cannot be done purely in the objects.
1101
1102     This is because some data elements need uniqueness across the
1103     whole configuration, etc.
1104
1105     @warning: this function will call L{_WriteConfig()}, so it needs
1106         to either be called with the lock held or from a safe place
1107         (the constructor)
1108
1109     """
1110     modified = False
1111     for item in self._AllUUIDObjects():
1112       if item.uuid is None:
1113         item.uuid = self.GenerateUniqueID()
1114         modified = True
1115     if modified:
1116       self._WriteConfig()
1117
1118   def _DistributeConfig(self):
1119     """Distribute the configuration to the other nodes.
1120
1121     Currently, this only copies the configuration file. In the future,
1122     it could be used to encapsulate the 2/3-phase update mechanism.
1123
1124     """
1125     if self._offline:
1126       return True
1127     bad = False
1128
1129     node_list = []
1130     addr_list = []
1131     myhostname = self._my_hostname
1132     # we can skip checking whether _UnlockedGetNodeInfo returns None
1133     # since the node list comes from _UnlocketGetNodeList, and we are
1134     # called with the lock held, so no modifications should take place
1135     # in between
1136     for node_name in self._UnlockedGetNodeList():
1137       if node_name == myhostname:
1138         continue
1139       node_info = self._UnlockedGetNodeInfo(node_name)
1140       if not node_info.master_candidate:
1141         continue
1142       node_list.append(node_info.name)
1143       addr_list.append(node_info.primary_ip)
1144
1145     result = rpc.RpcRunner.call_upload_file(node_list, self._cfg_file,
1146                                             address_list=addr_list)
1147     for to_node, to_result in result.items():
1148       msg = to_result.fail_msg
1149       if msg:
1150         msg = ("Copy of file %s to node %s failed: %s" %
1151                (self._cfg_file, to_node, msg))
1152         logging.error(msg)
1153         bad = True
1154     return not bad
1155
1156   def _WriteConfig(self, destination=None):
1157     """Write the configuration data to persistent storage.
1158
1159     """
1160     # first, cleanup the _temporary_ids set, if an ID is now in the
1161     # other objects it should be discarded to prevent unbounded growth
1162     # of that structure
1163     self._CleanupTemporaryIDs()
1164     config_errors = self._UnlockedVerifyConfig()
1165     if config_errors:
1166       raise errors.ConfigurationError("Configuration data is not"
1167                                       " consistent: %s" %
1168                                       (", ".join(config_errors)))
1169     if destination is None:
1170       destination = self._cfg_file
1171     self._BumpSerialNo()
1172     txt = serializer.Dump(self._config_data.ToDict())
1173
1174     utils.WriteFile(destination, data=txt)
1175
1176     self.write_count += 1
1177
1178     # and redistribute the config file to master candidates
1179     self._DistributeConfig()
1180
1181     # Write ssconf files on all nodes (including locally)
1182     if self._last_cluster_serial < self._config_data.cluster.serial_no:
1183       if not self._offline:
1184         result = rpc.RpcRunner.call_write_ssconf_files(\
1185           self._UnlockedGetNodeList(),
1186           self._UnlockedGetSsconfValues())
1187         for nname, nresu in result.items():
1188           msg = nresu.fail_msg
1189           if msg:
1190             logging.warning("Error while uploading ssconf files to"
1191                             " node %s: %s", nname, msg)
1192       self._last_cluster_serial = self._config_data.cluster.serial_no
1193
1194   def _UnlockedGetSsconfValues(self):
1195     """Return the values needed by ssconf.
1196
1197     @rtype: dict
1198     @return: a dictionary with keys the ssconf names and values their
1199         associated value
1200
1201     """
1202     fn = "\n".join
1203     instance_names = utils.NiceSort(self._UnlockedGetInstanceList())
1204     node_names = utils.NiceSort(self._UnlockedGetNodeList())
1205     node_info = [self._UnlockedGetNodeInfo(name) for name in node_names]
1206     node_pri_ips = ["%s %s" % (ninfo.name, ninfo.primary_ip)
1207                     for ninfo in node_info]
1208     node_snd_ips = ["%s %s" % (ninfo.name, ninfo.secondary_ip)
1209                     for ninfo in node_info]
1210
1211     instance_data = fn(instance_names)
1212     off_data = fn(node.name for node in node_info if node.offline)
1213     on_data = fn(node.name for node in node_info if not node.offline)
1214     mc_data = fn(node.name for node in node_info if node.master_candidate)
1215     mc_ips_data = fn(node.primary_ip for node in node_info
1216                      if node.master_candidate)
1217     node_data = fn(node_names)
1218     node_pri_ips_data = fn(node_pri_ips)
1219     node_snd_ips_data = fn(node_snd_ips)
1220
1221     cluster = self._config_data.cluster
1222     cluster_tags = fn(cluster.GetTags())
1223     return {
1224       constants.SS_CLUSTER_NAME: cluster.cluster_name,
1225       constants.SS_CLUSTER_TAGS: cluster_tags,
1226       constants.SS_FILE_STORAGE_DIR: cluster.file_storage_dir,
1227       constants.SS_MASTER_CANDIDATES: mc_data,
1228       constants.SS_MASTER_CANDIDATES_IPS: mc_ips_data,
1229       constants.SS_MASTER_IP: cluster.master_ip,
1230       constants.SS_MASTER_NETDEV: cluster.master_netdev,
1231       constants.SS_MASTER_NODE: cluster.master_node,
1232       constants.SS_NODE_LIST: node_data,
1233       constants.SS_NODE_PRIMARY_IPS: node_pri_ips_data,
1234       constants.SS_NODE_SECONDARY_IPS: node_snd_ips_data,
1235       constants.SS_OFFLINE_NODES: off_data,
1236       constants.SS_ONLINE_NODES: on_data,
1237       constants.SS_INSTANCE_LIST: instance_data,
1238       constants.SS_RELEASE_VERSION: constants.RELEASE_VERSION,
1239       }
1240
1241   @locking.ssynchronized(_config_lock, shared=1)
1242   def GetVGName(self):
1243     """Return the volume group name.
1244
1245     """
1246     return self._config_data.cluster.volume_group_name
1247
1248   @locking.ssynchronized(_config_lock)
1249   def SetVGName(self, vg_name):
1250     """Set the volume group name.
1251
1252     """
1253     self._config_data.cluster.volume_group_name = vg_name
1254     self._config_data.cluster.serial_no += 1
1255     self._WriteConfig()
1256
1257   @locking.ssynchronized(_config_lock, shared=1)
1258   def GetMACPrefix(self):
1259     """Return the mac prefix.
1260
1261     """
1262     return self._config_data.cluster.mac_prefix
1263
1264   @locking.ssynchronized(_config_lock, shared=1)
1265   def GetClusterInfo(self):
1266     """Returns information about the cluster
1267
1268     @rtype: L{objects.Cluster}
1269     @return: the cluster object
1270
1271     """
1272     return self._config_data.cluster
1273
1274   @locking.ssynchronized(_config_lock)
1275   def Update(self, target):
1276     """Notify function to be called after updates.
1277
1278     This function must be called when an object (as returned by
1279     GetInstanceInfo, GetNodeInfo, GetCluster) has been updated and the
1280     caller wants the modifications saved to the backing store. Note
1281     that all modified objects will be saved, but the target argument
1282     is the one the caller wants to ensure that it's saved.
1283
1284     @param target: an instance of either L{objects.Cluster},
1285         L{objects.Node} or L{objects.Instance} which is existing in
1286         the cluster
1287
1288     """
1289     if self._config_data is None:
1290       raise errors.ProgrammerError("Configuration file not read,"
1291                                    " cannot save.")
1292     update_serial = False
1293     if isinstance(target, objects.Cluster):
1294       test = target == self._config_data.cluster
1295     elif isinstance(target, objects.Node):
1296       test = target in self._config_data.nodes.values()
1297       update_serial = True
1298     elif isinstance(target, objects.Instance):
1299       test = target in self._config_data.instances.values()
1300     else:
1301       raise errors.ProgrammerError("Invalid object type (%s) passed to"
1302                                    " ConfigWriter.Update" % type(target))
1303     if not test:
1304       raise errors.ConfigurationError("Configuration updated since object"
1305                                       " has been read or unknown object")
1306     target.serial_no += 1
1307     target.mtime = now = time.time()
1308
1309     if update_serial:
1310       # for node updates, we need to increase the cluster serial too
1311       self._config_data.cluster.serial_no += 1
1312       self._config_data.cluster.mtime = now
1313
1314     if isinstance(target, objects.Instance):
1315       self._UnlockedReleaseDRBDMinors(target.name)
1316       for nic in target.nics:
1317         self._temporary_macs.discard(nic.mac)
1318
1319     self._WriteConfig()