Enable lockless node queries
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36 import zlib
37 import base64
38
39 from ganeti import errors
40 from ganeti import utils
41 from ganeti import ssh
42 from ganeti import hypervisor
43 from ganeti import constants
44 from ganeti import bdev
45 from ganeti import objects
46 from ganeti import ssconf
47
48
49 def _GetConfig():
50   """Simple wrapper to return a SimpleStore.
51
52   @rtype: L{ssconf.SimpleStore}
53   @return: a SimpleStore instance
54
55   """
56   return ssconf.SimpleStore()
57
58
59 def _GetSshRunner(cluster_name):
60   """Simple wrapper to return an SshRunner.
61
62   @type cluster_name: str
63   @param cluster_name: the cluster name, which is needed
64       by the SshRunner constructor
65   @rtype: L{ssh.SshRunner}
66   @return: an SshRunner instance
67
68   """
69   return ssh.SshRunner(cluster_name)
70
71
72 def _Decompress(data):
73   """Unpacks data compressed by the RPC client.
74
75   @type data: list or tuple
76   @param data: Data sent by RPC client
77   @rtype: str
78   @return: Decompressed data
79
80   """
81   assert isinstance(data, (list, tuple))
82   assert len(data) == 2
83   (encoding, content) = data
84   if encoding == constants.RPC_ENCODING_NONE:
85     return content
86   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87     return zlib.decompress(base64.b64decode(content))
88   else:
89     raise AssertionError("Unknown data encoding")
90
91
92 def _CleanDirectory(path, exclude=None):
93   """Removes all regular files in a directory.
94
95   @type path: str
96   @param path: the directory to clean
97   @type exclude: list
98   @param exclude: list of files to be excluded, defaults
99       to the empty list
100
101   """
102   if not os.path.isdir(path):
103     return
104   if exclude is None:
105     exclude = []
106   else:
107     # Normalize excluded paths
108     exclude = [os.path.normpath(i) for i in exclude]
109
110   for rel_name in utils.ListVisibleFiles(path):
111     full_name = os.path.normpath(os.path.join(path, rel_name))
112     if full_name in exclude:
113       continue
114     if os.path.isfile(full_name) and not os.path.islink(full_name):
115       utils.RemoveFile(full_name)
116
117
118 def JobQueuePurge():
119   """Removes job queue files and archived jobs.
120
121   @rtype: None
122
123   """
124   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
125   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
126
127
128 def GetMasterInfo():
129   """Returns master information.
130
131   This is an utility function to compute master information, either
132   for consumption here or from the node daemon.
133
134   @rtype: tuple
135   @return: (master_netdev, master_ip, master_name) if we have a good
136       configuration, otherwise (None, None, None)
137
138   """
139   try:
140     cfg = _GetConfig()
141     master_netdev = cfg.GetMasterNetdev()
142     master_ip = cfg.GetMasterIP()
143     master_node = cfg.GetMasterNode()
144   except errors.ConfigurationError, err:
145     logging.exception("Cluster configuration incomplete")
146     return (None, None, None)
147   return (master_netdev, master_ip, master_node)
148
149
150 def StartMaster(start_daemons):
151   """Activate local node as master node.
152
153   The function will always try activate the IP address of the master
154   (unless someone else has it). It will also start the master daemons,
155   based on the start_daemons parameter.
156
157   @type start_daemons: boolean
158   @param start_daemons: whther to also start the master
159       daemons (ganeti-masterd and ganeti-rapi)
160   @rtype: None
161
162   """
163   ok = True
164   master_netdev, master_ip, _ = GetMasterInfo()
165   if not master_netdev:
166     return False
167
168   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
169     if utils.OwnIpAddress(master_ip):
170       # we already have the ip:
171       logging.debug("Already started")
172     else:
173       logging.error("Someone else has the master ip, not activating")
174       ok = False
175   else:
176     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
177                            "dev", master_netdev, "label",
178                            "%s:0" % master_netdev])
179     if result.failed:
180       logging.error("Can't activate master IP: %s", result.output)
181       ok = False
182
183     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
184                            "-s", master_ip, master_ip])
185     # we'll ignore the exit code of arping
186
187   # and now start the master and rapi daemons
188   if start_daemons:
189     for daemon in 'ganeti-masterd', 'ganeti-rapi':
190       result = utils.RunCmd([daemon])
191       if result.failed:
192         logging.error("Can't start daemon %s: %s", daemon, result.output)
193         ok = False
194   return ok
195
196
197 def StopMaster(stop_daemons):
198   """Deactivate this node as master.
199
200   The function will always try to deactivate the IP address of the
201   master. It will also stop the master daemons depending on the
202   stop_daemons parameter.
203
204   @type stop_daemons: boolean
205   @param stop_daemons: whether to also stop the master daemons
206       (ganeti-masterd and ganeti-rapi)
207   @rtype: None
208
209   """
210   master_netdev, master_ip, _ = GetMasterInfo()
211   if not master_netdev:
212     return False
213
214   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
215                          "dev", master_netdev])
216   if result.failed:
217     logging.error("Can't remove the master IP, error: %s", result.output)
218     # but otherwise ignore the failure
219
220   if stop_daemons:
221     # stop/kill the rapi and the master daemon
222     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
223       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
224
225   return True
226
227
228 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
229   """Joins this node to the cluster.
230
231   This does the following:
232       - updates the hostkeys of the machine (rsa and dsa)
233       - adds the ssh private key to the user
234       - adds the ssh public key to the users' authorized_keys file
235
236   @type dsa: str
237   @param dsa: the DSA private key to write
238   @type dsapub: str
239   @param dsapub: the DSA public key to write
240   @type rsa: str
241   @param rsa: the RSA private key to write
242   @type rsapub: str
243   @param rsapub: the RSA public key to write
244   @type sshkey: str
245   @param sshkey: the SSH private key to write
246   @type sshpub: str
247   @param sshpub: the SSH public key to write
248   @rtype: boolean
249   @return: the success of the operation
250
251   """
252   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
253                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
254                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
255                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
256   for name, content, mode in sshd_keys:
257     utils.WriteFile(name, data=content, mode=mode)
258
259   try:
260     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
261                                                     mkdir=True)
262   except errors.OpExecError, err:
263     msg = "Error while processing user ssh files"
264     logging.exception(msg)
265     return (False, "%s: %s" % (msg, err))
266
267   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
268     utils.WriteFile(name, data=content, mode=0600)
269
270   utils.AddAuthorizedKey(auth_keys, sshpub)
271
272   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
273
274   return (True, "Node added successfully")
275
276
277 def LeaveCluster():
278   """Cleans up and remove the current node.
279
280   This function cleans up and prepares the current node to be removed
281   from the cluster.
282
283   If processing is successful, then it raises an
284   L{errors.QuitGanetiException} which is used as a special case to
285   shutdown the node daemon.
286
287   """
288   _CleanDirectory(constants.DATA_DIR)
289   JobQueuePurge()
290
291   try:
292     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
293   except errors.OpExecError:
294     logging.exception("Error while processing ssh files")
295     return
296
297   f = open(pub_key, 'r')
298   try:
299     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
300   finally:
301     f.close()
302
303   utils.RemoveFile(priv_key)
304   utils.RemoveFile(pub_key)
305
306   # Return a reassuring string to the caller, and quit
307   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
308
309
310 def GetNodeInfo(vgname, hypervisor_type):
311   """Gives back a hash with different informations about the node.
312
313   @type vgname: C{string}
314   @param vgname: the name of the volume group to ask for disk space information
315   @type hypervisor_type: C{str}
316   @param hypervisor_type: the name of the hypervisor to ask for
317       memory information
318   @rtype: C{dict}
319   @return: dictionary with the following keys:
320       - vg_size is the size of the configured volume group in MiB
321       - vg_free is the free size of the volume group in MiB
322       - memory_dom0 is the memory allocated for domain0 in MiB
323       - memory_free is the currently available (free) ram in MiB
324       - memory_total is the total number of ram in MiB
325
326   """
327   outputarray = {}
328   vginfo = _GetVGInfo(vgname)
329   outputarray['vg_size'] = vginfo['vg_size']
330   outputarray['vg_free'] = vginfo['vg_free']
331
332   hyper = hypervisor.GetHypervisor(hypervisor_type)
333   hyp_info = hyper.GetNodeInfo()
334   if hyp_info is not None:
335     outputarray.update(hyp_info)
336
337   f = open("/proc/sys/kernel/random/boot_id", 'r')
338   try:
339     outputarray["bootid"] = f.read(128).rstrip("\n")
340   finally:
341     f.close()
342
343   return outputarray
344
345
346 def VerifyNode(what, cluster_name):
347   """Verify the status of the local node.
348
349   Based on the input L{what} parameter, various checks are done on the
350   local node.
351
352   If the I{filelist} key is present, this list of
353   files is checksummed and the file/checksum pairs are returned.
354
355   If the I{nodelist} key is present, we check that we have
356   connectivity via ssh with the target nodes (and check the hostname
357   report).
358
359   If the I{node-net-test} key is present, we check that we have
360   connectivity to the given nodes via both primary IP and, if
361   applicable, secondary IPs.
362
363   @type what: C{dict}
364   @param what: a dictionary of things to check:
365       - filelist: list of files for which to compute checksums
366       - nodelist: list of nodes we should check ssh communication with
367       - node-net-test: list of nodes we should check node daemon port
368         connectivity with
369       - hypervisor: list with hypervisors to run the verify for
370   @rtype: dict
371   @return: a dictionary with the same keys as the input dict, and
372       values representing the result of the checks
373
374   """
375   result = {}
376
377   if constants.NV_HYPERVISOR in what:
378     result[constants.NV_HYPERVISOR] = tmp = {}
379     for hv_name in what[constants.NV_HYPERVISOR]:
380       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
381
382   if constants.NV_FILELIST in what:
383     result[constants.NV_FILELIST] = utils.FingerprintFiles(
384       what[constants.NV_FILELIST])
385
386   if constants.NV_NODELIST in what:
387     result[constants.NV_NODELIST] = tmp = {}
388     random.shuffle(what[constants.NV_NODELIST])
389     for node in what[constants.NV_NODELIST]:
390       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
391       if not success:
392         tmp[node] = message
393
394   if constants.NV_NODENETTEST in what:
395     result[constants.NV_NODENETTEST] = tmp = {}
396     my_name = utils.HostInfo().name
397     my_pip = my_sip = None
398     for name, pip, sip in what[constants.NV_NODENETTEST]:
399       if name == my_name:
400         my_pip = pip
401         my_sip = sip
402         break
403     if not my_pip:
404       tmp[my_name] = ("Can't find my own primary/secondary IP"
405                       " in the node list")
406     else:
407       port = utils.GetNodeDaemonPort()
408       for name, pip, sip in what[constants.NV_NODENETTEST]:
409         fail = []
410         if not utils.TcpPing(pip, port, source=my_pip):
411           fail.append("primary")
412         if sip != pip:
413           if not utils.TcpPing(sip, port, source=my_sip):
414             fail.append("secondary")
415         if fail:
416           tmp[name] = ("failure using the %s interface(s)" %
417                        " and ".join(fail))
418
419   if constants.NV_LVLIST in what:
420     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
421
422   if constants.NV_INSTANCELIST in what:
423     result[constants.NV_INSTANCELIST] = GetInstanceList(
424       what[constants.NV_INSTANCELIST])
425
426   if constants.NV_VGLIST in what:
427     result[constants.NV_VGLIST] = ListVolumeGroups()
428
429   if constants.NV_VERSION in what:
430     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
431                                     constants.RELEASE_VERSION)
432
433   if constants.NV_HVINFO in what:
434     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
435     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
436
437   if constants.NV_DRBDLIST in what:
438     try:
439       used_minors = bdev.DRBD8.GetUsedDevs().keys()
440     except errors.BlockDeviceError:
441       logging.warning("Can't get used minors list", exc_info=True)
442       used_minors = []
443     result[constants.NV_DRBDLIST] = used_minors
444
445   return result
446
447
448 def GetVolumeList(vg_name):
449   """Compute list of logical volumes and their size.
450
451   @type vg_name: str
452   @param vg_name: the volume group whose LVs we should list
453   @rtype: dict
454   @return:
455       dictionary of all partions (key) with value being a tuple of
456       their size (in MiB), inactive and online status::
457
458         {'test1': ('20.06', True, True)}
459
460       in case of errors, a string is returned with the error
461       details.
462
463   """
464   lvs = {}
465   sep = '|'
466   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
467                          "--separator=%s" % sep,
468                          "-olv_name,lv_size,lv_attr", vg_name])
469   if result.failed:
470     logging.error("Failed to list logical volumes, lvs output: %s",
471                   result.output)
472     return result.output
473
474   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
475   for line in result.stdout.splitlines():
476     line = line.strip()
477     match = valid_line_re.match(line)
478     if not match:
479       logging.error("Invalid line returned from lvs output: '%s'", line)
480       continue
481     name, size, attr = match.groups()
482     inactive = attr[4] == '-'
483     online = attr[5] == 'o'
484     lvs[name] = (size, inactive, online)
485
486   return lvs
487
488
489 def ListVolumeGroups():
490   """List the volume groups and their size.
491
492   @rtype: dict
493   @return: dictionary with keys volume name and values the
494       size of the volume
495
496   """
497   return utils.ListVolumeGroups()
498
499
500 def NodeVolumes():
501   """List all volumes on this node.
502
503   @rtype: list
504   @return:
505     A list of dictionaries, each having four keys:
506       - name: the logical volume name,
507       - size: the size of the logical volume
508       - dev: the physical device on which the LV lives
509       - vg: the volume group to which it belongs
510
511     In case of errors, we return an empty list and log the
512     error.
513
514     Note that since a logical volume can live on multiple physical
515     volumes, the resulting list might include a logical volume
516     multiple times.
517
518   """
519   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
520                          "--separator=|",
521                          "--options=lv_name,lv_size,devices,vg_name"])
522   if result.failed:
523     logging.error("Failed to list logical volumes, lvs output: %s",
524                   result.output)
525     return []
526
527   def parse_dev(dev):
528     if '(' in dev:
529       return dev.split('(')[0]
530     else:
531       return dev
532
533   def map_line(line):
534     return {
535       'name': line[0].strip(),
536       'size': line[1].strip(),
537       'dev': parse_dev(line[2].strip()),
538       'vg': line[3].strip(),
539     }
540
541   return [map_line(line.split('|')) for line in result.stdout.splitlines()
542           if line.count('|') >= 3]
543
544
545 def BridgesExist(bridges_list):
546   """Check if a list of bridges exist on the current node.
547
548   @rtype: boolean
549   @return: C{True} if all of them exist, C{False} otherwise
550
551   """
552   for bridge in bridges_list:
553     if not utils.BridgeExists(bridge):
554       return False
555
556   return True
557
558
559 def GetInstanceList(hypervisor_list):
560   """Provides a list of instances.
561
562   @type hypervisor_list: list
563   @param hypervisor_list: the list of hypervisors to query information
564
565   @rtype: list
566   @return: a list of all running instances on the current node
567     - instance1.example.com
568     - instance2.example.com
569
570   """
571   results = []
572   for hname in hypervisor_list:
573     try:
574       names = hypervisor.GetHypervisor(hname).ListInstances()
575       results.extend(names)
576     except errors.HypervisorError, err:
577       logging.exception("Error enumerating instances for hypevisor %s", hname)
578       # FIXME: should we somehow not propagate this to the master?
579       raise
580
581   return results
582
583
584 def GetInstanceInfo(instance, hname):
585   """Gives back the informations about an instance as a dictionary.
586
587   @type instance: string
588   @param instance: the instance name
589   @type hname: string
590   @param hname: the hypervisor type of the instance
591
592   @rtype: dict
593   @return: dictionary with the following keys:
594       - memory: memory size of instance (int)
595       - state: xen state of instance (string)
596       - time: cpu time of instance (float)
597
598   """
599   output = {}
600
601   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
602   if iinfo is not None:
603     output['memory'] = iinfo[2]
604     output['state'] = iinfo[4]
605     output['time'] = iinfo[5]
606
607   return output
608
609
610 def GetInstanceMigratable(instance):
611   """Gives whether an instance can be migrated.
612
613   @type instance: L{objects.Instance}
614   @param instance: object representing the instance to be checked.
615
616   @rtype: tuple
617   @return: tuple of (result, description) where:
618       - result: whether the instance can be migrated or not
619       - description: a description of the issue, if relevant
620
621   """
622   hyper = hypervisor.GetHypervisor(instance.hypervisor)
623   if instance.name not in hyper.ListInstances():
624     return (False, 'not running')
625
626   for idx in range(len(instance.disks)):
627     link_name = _GetBlockDevSymlinkPath(instance.name, idx)
628     if not os.path.islink(link_name):
629       return (False, 'not restarted since ganeti 1.2.5')
630
631   return (True, '')
632
633
634 def GetAllInstancesInfo(hypervisor_list):
635   """Gather data about all instances.
636
637   This is the equivalent of L{GetInstanceInfo}, except that it
638   computes data for all instances at once, thus being faster if one
639   needs data about more than one instance.
640
641   @type hypervisor_list: list
642   @param hypervisor_list: list of hypervisors to query for instance data
643
644   @rtype: dict
645   @return: dictionary of instance: data, with data having the following keys:
646       - memory: memory size of instance (int)
647       - state: xen state of instance (string)
648       - time: cpu time of instance (float)
649       - vcpus: the number of vcpus
650
651   """
652   output = {}
653
654   for hname in hypervisor_list:
655     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
656     if iinfo:
657       for name, inst_id, memory, vcpus, state, times in iinfo:
658         value = {
659           'memory': memory,
660           'vcpus': vcpus,
661           'state': state,
662           'time': times,
663           }
664         if name in output and output[name] != value:
665           raise errors.HypervisorError("Instance %s running duplicate"
666                                        " with different parameters" % name)
667         output[name] = value
668
669   return output
670
671
672 def AddOSToInstance(instance):
673   """Add an OS to an instance.
674
675   @type instance: L{objects.Instance}
676   @param instance: Instance whose OS is to be installed
677   @rtype: boolean
678   @return: the success of the operation
679
680   """
681   inst_os = OSFromDisk(instance.os)
682
683   create_env = OSEnvironment(instance)
684
685   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
686                                      instance.name, int(time.time()))
687
688   result = utils.RunCmd([inst_os.create_script], env=create_env,
689                         cwd=inst_os.path, output=logfile,)
690   if result.failed:
691     logging.error("os create command '%s' returned error: %s, logfile: %s,"
692                   " output: %s", result.cmd, result.fail_reason, logfile,
693                   result.output)
694     lines = [utils.SafeEncode(val)
695              for val in utils.TailFile(logfile, lines=20)]
696     return (False, "OS create script failed (%s), last lines in the"
697             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
698
699   return (True, "Successfully installed")
700
701
702 def RunRenameInstance(instance, old_name):
703   """Run the OS rename script for an instance.
704
705   @type instance: L{objects.Instance}
706   @param instance: Instance whose OS is to be installed
707   @type old_name: string
708   @param old_name: previous instance name
709   @rtype: boolean
710   @return: the success of the operation
711
712   """
713   inst_os = OSFromDisk(instance.os)
714
715   rename_env = OSEnvironment(instance)
716   rename_env['OLD_INSTANCE_NAME'] = old_name
717
718   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
719                                            old_name,
720                                            instance.name, int(time.time()))
721
722   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
723                         cwd=inst_os.path, output=logfile)
724
725   if result.failed:
726     logging.error("os create command '%s' returned error: %s output: %s",
727                   result.cmd, result.fail_reason, result.output)
728     lines = [utils.SafeEncode(val)
729              for val in utils.TailFile(logfile, lines=20)]
730     return (False, "OS rename script failed (%s), last lines in the"
731             " log file:\n%s" % (result.fail_reason, "\n".join(lines)))
732
733   return (True, "Rename successful")
734
735
736 def _GetVGInfo(vg_name):
737   """Get informations about the volume group.
738
739   @type vg_name: str
740   @param vg_name: the volume group which we query
741   @rtype: dict
742   @return:
743     A dictionary with the following keys:
744       - C{vg_size} is the total size of the volume group in MiB
745       - C{vg_free} is the free size of the volume group in MiB
746       - C{pv_count} are the number of physical disks in that VG
747
748     If an error occurs during gathering of data, we return the same dict
749     with keys all set to None.
750
751   """
752   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
753
754   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
755                          "--nosuffix", "--units=m", "--separator=:", vg_name])
756
757   if retval.failed:
758     logging.error("volume group %s not present", vg_name)
759     return retdic
760   valarr = retval.stdout.strip().rstrip(':').split(':')
761   if len(valarr) == 3:
762     try:
763       retdic = {
764         "vg_size": int(round(float(valarr[0]), 0)),
765         "vg_free": int(round(float(valarr[1]), 0)),
766         "pv_count": int(valarr[2]),
767         }
768     except ValueError, err:
769       logging.exception("Fail to parse vgs output")
770   else:
771     logging.error("vgs output has the wrong number of fields (expected"
772                   " three): %s", str(valarr))
773   return retdic
774
775
776 def _GetBlockDevSymlinkPath(instance_name, idx):
777   return os.path.join(constants.DISK_LINKS_DIR,
778                       "%s:%d" % (instance_name, idx))
779
780
781 def _SymlinkBlockDev(instance_name, device_path, idx):
782   """Set up symlinks to a instance's block device.
783
784   This is an auxiliary function run when an instance is start (on the primary
785   node) or when an instance is migrated (on the target node).
786
787
788   @param instance_name: the name of the target instance
789   @param device_path: path of the physical block device, on the node
790   @param idx: the disk index
791   @return: absolute path to the disk's symlink
792
793   """
794   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
795   try:
796     os.symlink(device_path, link_name)
797   except OSError, err:
798     if err.errno == errno.EEXIST:
799       if (not os.path.islink(link_name) or
800           os.readlink(link_name) != device_path):
801         os.remove(link_name)
802         os.symlink(device_path, link_name)
803     else:
804       raise
805
806   return link_name
807
808
809 def _RemoveBlockDevLinks(instance_name, disks):
810   """Remove the block device symlinks belonging to the given instance.
811
812   """
813   for idx, disk in enumerate(disks):
814     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
815     if os.path.islink(link_name):
816       try:
817         os.remove(link_name)
818       except OSError:
819         logging.exception("Can't remove symlink '%s'", link_name)
820
821
822 def _GatherAndLinkBlockDevs(instance):
823   """Set up an instance's block device(s).
824
825   This is run on the primary node at instance startup. The block
826   devices must be already assembled.
827
828   @type instance: L{objects.Instance}
829   @param instance: the instance whose disks we shoul assemble
830   @rtype: list
831   @return: list of (disk_object, device_path)
832
833   """
834   block_devices = []
835   for idx, disk in enumerate(instance.disks):
836     device = _RecursiveFindBD(disk)
837     if device is None:
838       raise errors.BlockDeviceError("Block device '%s' is not set up." %
839                                     str(disk))
840     device.Open()
841     try:
842       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
843     except OSError, e:
844       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
845                                     e.strerror)
846
847     block_devices.append((disk, link_name))
848
849   return block_devices
850
851
852 def StartInstance(instance, extra_args):
853   """Start an instance.
854
855   @type instance: L{objects.Instance}
856   @param instance: the instance object
857   @rtype: boolean
858   @return: whether the startup was successful or not
859
860   """
861   running_instances = GetInstanceList([instance.hypervisor])
862
863   if instance.name in running_instances:
864     return (True, "Already running")
865
866   try:
867     block_devices = _GatherAndLinkBlockDevs(instance)
868     hyper = hypervisor.GetHypervisor(instance.hypervisor)
869     hyper.StartInstance(instance, block_devices, extra_args)
870   except errors.BlockDeviceError, err:
871     logging.exception("Failed to start instance")
872     return (False, "Block device error: %s" % str(err))
873   except errors.HypervisorError, err:
874     logging.exception("Failed to start instance")
875     _RemoveBlockDevLinks(instance.name, instance.disks)
876     return (False, "Hypervisor error: %s" % str(err))
877
878   return (True, "Instance started successfully")
879
880
881 def ShutdownInstance(instance):
882   """Shut an instance down.
883
884   @note: this functions uses polling with a hardcoded timeout.
885
886   @type instance: L{objects.Instance}
887   @param instance: the instance object
888   @rtype: boolean
889   @return: whether the startup was successful or not
890
891   """
892   hv_name = instance.hypervisor
893   running_instances = GetInstanceList([hv_name])
894
895   if instance.name not in running_instances:
896     return True
897
898   hyper = hypervisor.GetHypervisor(hv_name)
899   try:
900     hyper.StopInstance(instance)
901   except errors.HypervisorError, err:
902     logging.error("Failed to stop instance: %s" % err)
903     return False
904
905   # test every 10secs for 2min
906
907   time.sleep(1)
908   for dummy in range(11):
909     if instance.name not in GetInstanceList([hv_name]):
910       break
911     time.sleep(10)
912   else:
913     # the shutdown did not succeed
914     logging.error("Shutdown of '%s' unsuccessful, using destroy",
915                   instance.name)
916
917     try:
918       hyper.StopInstance(instance, force=True)
919     except errors.HypervisorError, err:
920       logging.exception("Failed to stop instance: %s" % err)
921       return False
922
923     time.sleep(1)
924     if instance.name in GetInstanceList([hv_name]):
925       logging.error("Could not shutdown instance '%s' even by destroy",
926                     instance.name)
927       return False
928
929   _RemoveBlockDevLinks(instance.name, instance.disks)
930
931   return True
932
933
934 def RebootInstance(instance, reboot_type, extra_args):
935   """Reboot an instance.
936
937   @type instance: L{objects.Instance}
938   @param instance: the instance object to reboot
939   @type reboot_type: str
940   @param reboot_type: the type of reboot, one the following
941     constants:
942       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
943         instance OS, do not recreate the VM
944       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
945         restart the VM (at the hypervisor level)
946       - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
947         is not accepted here, since that mode is handled
948         differently
949   @rtype: boolean
950   @return: the success of the operation
951
952   """
953   running_instances = GetInstanceList([instance.hypervisor])
954
955   if instance.name not in running_instances:
956     logging.error("Cannot reboot instance that is not running")
957     return False
958
959   hyper = hypervisor.GetHypervisor(instance.hypervisor)
960   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
961     try:
962       hyper.RebootInstance(instance)
963     except errors.HypervisorError, err:
964       logging.exception("Failed to soft reboot instance")
965       return False
966   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
967     try:
968       ShutdownInstance(instance)
969       StartInstance(instance, extra_args)
970     except errors.HypervisorError, err:
971       logging.exception("Failed to hard reboot instance")
972       return False
973   else:
974     raise errors.ParameterError("reboot_type invalid")
975
976   return True
977
978
979 def MigrationInfo(instance):
980   """Gather information about an instance to be migrated.
981
982   @type instance: L{objects.Instance}
983   @param instance: the instance definition
984
985   """
986   hyper = hypervisor.GetHypervisor(instance.hypervisor)
987   try:
988     info = hyper.MigrationInfo(instance)
989   except errors.HypervisorError, err:
990     msg = "Failed to fetch migration information"
991     logging.exception(msg)
992     return (False, '%s: %s' % (msg, err))
993   return (True, info)
994
995
996 def AcceptInstance(instance, info, target):
997   """Prepare the node to accept an instance.
998
999   @type instance: L{objects.Instance}
1000   @param instance: the instance definition
1001   @type info: string/data (opaque)
1002   @param info: migration information, from the source node
1003   @type target: string
1004   @param target: target host (usually ip), on this node
1005
1006   """
1007   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1008   try:
1009     hyper.AcceptInstance(instance, info, target)
1010   except errors.HypervisorError, err:
1011     msg = "Failed to accept instance"
1012     logging.exception(msg)
1013     return (False, '%s: %s' % (msg, err))
1014   return (True, "Accept successfull")
1015
1016
1017 def FinalizeMigration(instance, info, success):
1018   """Finalize any preparation to accept an instance.
1019
1020   @type instance: L{objects.Instance}
1021   @param instance: the instance definition
1022   @type info: string/data (opaque)
1023   @param info: migration information, from the source node
1024   @type success: boolean
1025   @param success: whether the migration was a success or a failure
1026
1027   """
1028   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1029   try:
1030     hyper.FinalizeMigration(instance, info, success)
1031   except errors.HypervisorError, err:
1032     msg = "Failed to finalize migration"
1033     logging.exception(msg)
1034     return (False, '%s: %s' % (msg, err))
1035   return (True, "Migration Finalized")
1036
1037
1038 def MigrateInstance(instance, target, live):
1039   """Migrates an instance to another node.
1040
1041   @type instance: L{objects.Instance}
1042   @param instance: the instance definition
1043   @type target: string
1044   @param target: the target node name
1045   @type live: boolean
1046   @param live: whether the migration should be done live or not (the
1047       interpretation of this parameter is left to the hypervisor)
1048   @rtype: tuple
1049   @return: a tuple of (success, msg) where:
1050       - succes is a boolean denoting the success/failure of the operation
1051       - msg is a string with details in case of failure
1052
1053   """
1054   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1055
1056   try:
1057     hyper.MigrateInstance(instance.name, target, live)
1058   except errors.HypervisorError, err:
1059     msg = "Failed to migrate instance"
1060     logging.exception(msg)
1061     return (False, "%s: %s" % (msg, err))
1062   return (True, "Migration successfull")
1063
1064
1065 def CreateBlockDevice(disk, size, owner, on_primary, info):
1066   """Creates a block device for an instance.
1067
1068   @type disk: L{objects.Disk}
1069   @param disk: the object describing the disk we should create
1070   @type size: int
1071   @param size: the size of the physical underlying device, in MiB
1072   @type owner: str
1073   @param owner: the name of the instance for which disk is created,
1074       used for device cache data
1075   @type on_primary: boolean
1076   @param on_primary:  indicates if it is the primary node or not
1077   @type info: string
1078   @param info: string that will be sent to the physical device
1079       creation, used for example to set (LVM) tags on LVs
1080
1081   @return: the new unique_id of the device (this can sometime be
1082       computed only after creation), or None. On secondary nodes,
1083       it's not required to return anything.
1084
1085   """
1086   clist = []
1087   if disk.children:
1088     for child in disk.children:
1089       crdev = _RecursiveAssembleBD(child, owner, on_primary)
1090       if on_primary or disk.AssembleOnSecondary():
1091         # we need the children open in case the device itself has to
1092         # be assembled
1093         crdev.Open()
1094       clist.append(crdev)
1095
1096   try:
1097     device = bdev.Create(disk.dev_type, disk.physical_id, clist, size)
1098   except errors.GenericError, err:
1099     return False, "Can't create block device: %s" % str(err)
1100
1101   if on_primary or disk.AssembleOnSecondary():
1102     if not device.Assemble():
1103       errorstring = "Can't assemble device after creation, very unusual event"
1104       logging.error(errorstring)
1105       return False, errorstring
1106     device.SetSyncSpeed(constants.SYNC_SPEED)
1107     if on_primary or disk.OpenOnSecondary():
1108       device.Open(force=True)
1109     DevCacheManager.UpdateCache(device.dev_path, owner,
1110                                 on_primary, disk.iv_name)
1111
1112   device.SetInfo(info)
1113
1114   physical_id = device.unique_id
1115   return True, physical_id
1116
1117
1118 def RemoveBlockDevice(disk):
1119   """Remove a block device.
1120
1121   @note: This is intended to be called recursively.
1122
1123   @type disk: L{objects.Disk}
1124   @param disk: the disk object we should remove
1125   @rtype: boolean
1126   @return: the success of the operation
1127
1128   """
1129   try:
1130     rdev = _RecursiveFindBD(disk)
1131   except errors.BlockDeviceError, err:
1132     # probably can't attach
1133     logging.info("Can't attach to device %s in remove", disk)
1134     rdev = None
1135   if rdev is not None:
1136     r_path = rdev.dev_path
1137     result = rdev.Remove()
1138     if result:
1139       DevCacheManager.RemoveCache(r_path)
1140   else:
1141     result = True
1142   if disk.children:
1143     for child in disk.children:
1144       result = result and RemoveBlockDevice(child)
1145   return result
1146
1147
1148 def _RecursiveAssembleBD(disk, owner, as_primary):
1149   """Activate a block device for an instance.
1150
1151   This is run on the primary and secondary nodes for an instance.
1152
1153   @note: this function is called recursively.
1154
1155   @type disk: L{objects.Disk}
1156   @param disk: the disk we try to assemble
1157   @type owner: str
1158   @param owner: the name of the instance which owns the disk
1159   @type as_primary: boolean
1160   @param as_primary: if we should make the block device
1161       read/write
1162
1163   @return: the assembled device or None (in case no device
1164       was assembled)
1165   @raise errors.BlockDeviceError: in case there is an error
1166       during the activation of the children or the device
1167       itself
1168
1169   """
1170   children = []
1171   if disk.children:
1172     mcn = disk.ChildrenNeeded()
1173     if mcn == -1:
1174       mcn = 0 # max number of Nones allowed
1175     else:
1176       mcn = len(disk.children) - mcn # max number of Nones
1177     for chld_disk in disk.children:
1178       try:
1179         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1180       except errors.BlockDeviceError, err:
1181         if children.count(None) >= mcn:
1182           raise
1183         cdev = None
1184         logging.debug("Error in child activation: %s", str(err))
1185       children.append(cdev)
1186
1187   if as_primary or disk.AssembleOnSecondary():
1188     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children)
1189     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1190     result = r_dev
1191     if as_primary or disk.OpenOnSecondary():
1192       r_dev.Open()
1193     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1194                                 as_primary, disk.iv_name)
1195
1196   else:
1197     result = True
1198   return result
1199
1200
1201 def AssembleBlockDevice(disk, owner, as_primary):
1202   """Activate a block device for an instance.
1203
1204   This is a wrapper over _RecursiveAssembleBD.
1205
1206   @rtype: str or boolean
1207   @return: a C{/dev/...} path for primary nodes, and
1208       C{True} for secondary nodes
1209
1210   """
1211   result = _RecursiveAssembleBD(disk, owner, as_primary)
1212   if isinstance(result, bdev.BlockDev):
1213     result = result.dev_path
1214   return result
1215
1216
1217 def ShutdownBlockDevice(disk):
1218   """Shut down a block device.
1219
1220   First, if the device is assembled (Attach() is successfull), then
1221   the device is shutdown. Then the children of the device are
1222   shutdown.
1223
1224   This function is called recursively. Note that we don't cache the
1225   children or such, as oppossed to assemble, shutdown of different
1226   devices doesn't require that the upper device was active.
1227
1228   @type disk: L{objects.Disk}
1229   @param disk: the description of the disk we should
1230       shutdown
1231   @rtype: boolean
1232   @return: the success of the operation
1233
1234   """
1235   r_dev = _RecursiveFindBD(disk)
1236   if r_dev is not None:
1237     r_path = r_dev.dev_path
1238     result = r_dev.Shutdown()
1239     if result:
1240       DevCacheManager.RemoveCache(r_path)
1241   else:
1242     result = True
1243   if disk.children:
1244     for child in disk.children:
1245       result = result and ShutdownBlockDevice(child)
1246   return result
1247
1248
1249 def MirrorAddChildren(parent_cdev, new_cdevs):
1250   """Extend a mirrored block device.
1251
1252   @type parent_cdev: L{objects.Disk}
1253   @param parent_cdev: the disk to which we should add children
1254   @type new_cdevs: list of L{objects.Disk}
1255   @param new_cdevs: the list of children which we should add
1256   @rtype: boolean
1257   @return: the success of the operation
1258
1259   """
1260   parent_bdev = _RecursiveFindBD(parent_cdev)
1261   if parent_bdev is None:
1262     logging.error("Can't find parent device")
1263     return False
1264   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1265   if new_bdevs.count(None) > 0:
1266     logging.error("Can't find new device(s) to add: %s:%s",
1267                   new_bdevs, new_cdevs)
1268     return False
1269   parent_bdev.AddChildren(new_bdevs)
1270   return True
1271
1272
1273 def MirrorRemoveChildren(parent_cdev, new_cdevs):
1274   """Shrink a mirrored block device.
1275
1276   @type parent_cdev: L{objects.Disk}
1277   @param parent_cdev: the disk from which we should remove children
1278   @type new_cdevs: list of L{objects.Disk}
1279   @param new_cdevs: the list of children which we should remove
1280   @rtype: boolean
1281   @return: the success of the operation
1282
1283   """
1284   parent_bdev = _RecursiveFindBD(parent_cdev)
1285   if parent_bdev is None:
1286     logging.error("Can't find parent in remove children: %s", parent_cdev)
1287     return False
1288   devs = []
1289   for disk in new_cdevs:
1290     rpath = disk.StaticDevPath()
1291     if rpath is None:
1292       bd = _RecursiveFindBD(disk)
1293       if bd is None:
1294         logging.error("Can't find dynamic device %s while removing children",
1295                       disk)
1296         return False
1297       else:
1298         devs.append(bd.dev_path)
1299     else:
1300       devs.append(rpath)
1301   parent_bdev.RemoveChildren(devs)
1302   return True
1303
1304
1305 def GetMirrorStatus(disks):
1306   """Get the mirroring status of a list of devices.
1307
1308   @type disks: list of L{objects.Disk}
1309   @param disks: the list of disks which we should query
1310   @rtype: disk
1311   @return:
1312       a list of (mirror_done, estimated_time) tuples, which
1313       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1314   @raise errors.BlockDeviceError: if any of the disks cannot be
1315       found
1316
1317   """
1318   stats = []
1319   for dsk in disks:
1320     rbd = _RecursiveFindBD(dsk)
1321     if rbd is None:
1322       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1323     stats.append(rbd.CombinedSyncStatus())
1324   return stats
1325
1326
1327 def _RecursiveFindBD(disk):
1328   """Check if a device is activated.
1329
1330   If so, return informations about the real device.
1331
1332   @type disk: L{objects.Disk}
1333   @param disk: the disk object we need to find
1334
1335   @return: None if the device can't be found,
1336       otherwise the device instance
1337
1338   """
1339   children = []
1340   if disk.children:
1341     for chdisk in disk.children:
1342       children.append(_RecursiveFindBD(chdisk))
1343
1344   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1345
1346
1347 def FindBlockDevice(disk):
1348   """Check if a device is activated.
1349
1350   If it is, return informations about the real device.
1351
1352   @type disk: L{objects.Disk}
1353   @param disk: the disk to find
1354   @rtype: None or tuple
1355   @return: None if the disk cannot be found, otherwise a
1356       tuple (device_path, major, minor, sync_percent,
1357       estimated_time, is_degraded)
1358
1359   """
1360   rbd = _RecursiveFindBD(disk)
1361   if rbd is None:
1362     return rbd
1363   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1364
1365
1366 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1367   """Write a file to the filesystem.
1368
1369   This allows the master to overwrite(!) a file. It will only perform
1370   the operation if the file belongs to a list of configuration files.
1371
1372   @type file_name: str
1373   @param file_name: the target file name
1374   @type data: str
1375   @param data: the new contents of the file
1376   @type mode: int
1377   @param mode: the mode to give the file (can be None)
1378   @type uid: int
1379   @param uid: the owner of the file (can be -1 for default)
1380   @type gid: int
1381   @param gid: the group of the file (can be -1 for default)
1382   @type atime: float
1383   @param atime: the atime to set on the file (can be None)
1384   @type mtime: float
1385   @param mtime: the mtime to set on the file (can be None)
1386   @rtype: boolean
1387   @return: the success of the operation; errors are logged
1388       in the node daemon log
1389
1390   """
1391   if not os.path.isabs(file_name):
1392     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1393                   file_name)
1394     return False
1395
1396   allowed_files = [
1397     constants.CLUSTER_CONF_FILE,
1398     constants.ETC_HOSTS,
1399     constants.SSH_KNOWN_HOSTS_FILE,
1400     constants.VNC_PASSWORD_FILE,
1401     ]
1402
1403   if file_name not in allowed_files:
1404     logging.error("Filename passed to UploadFile not in allowed"
1405                  " upload targets: '%s'", file_name)
1406     return False
1407
1408   raw_data = _Decompress(data)
1409
1410   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1411                   atime=atime, mtime=mtime)
1412   return True
1413
1414
1415 def WriteSsconfFiles(values):
1416   """Update all ssconf files.
1417
1418   Wrapper around the SimpleStore.WriteFiles.
1419
1420   """
1421   ssconf.SimpleStore().WriteFiles(values)
1422
1423
1424 def _ErrnoOrStr(err):
1425   """Format an EnvironmentError exception.
1426
1427   If the L{err} argument has an errno attribute, it will be looked up
1428   and converted into a textual C{E...} description. Otherwise the
1429   string representation of the error will be returned.
1430
1431   @type err: L{EnvironmentError}
1432   @param err: the exception to format
1433
1434   """
1435   if hasattr(err, 'errno'):
1436     detail = errno.errorcode[err.errno]
1437   else:
1438     detail = str(err)
1439   return detail
1440
1441
1442 def _OSOndiskVersion(name, os_dir):
1443   """Compute and return the API version of a given OS.
1444
1445   This function will try to read the API version of the OS given by
1446   the 'name' parameter and residing in the 'os_dir' directory.
1447
1448   @type name: str
1449   @param name: the OS name we should look for
1450   @type os_dir: str
1451   @param os_dir: the directory inwhich we should look for the OS
1452   @rtype: int or None
1453   @return:
1454       Either an integer denoting the version or None in the
1455       case when this is not a valid OS name.
1456   @raise errors.InvalidOS: if the OS cannot be found
1457
1458   """
1459   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1460
1461   try:
1462     st = os.stat(api_file)
1463   except EnvironmentError, err:
1464     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1465                            " found (%s)" % _ErrnoOrStr(err))
1466
1467   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1468     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1469                            " a regular file")
1470
1471   try:
1472     f = open(api_file)
1473     try:
1474       api_versions = f.readlines()
1475     finally:
1476       f.close()
1477   except EnvironmentError, err:
1478     raise errors.InvalidOS(name, os_dir, "error while reading the"
1479                            " API version (%s)" % _ErrnoOrStr(err))
1480
1481   api_versions = [version.strip() for version in api_versions]
1482   try:
1483     api_versions = [int(version) for version in api_versions]
1484   except (TypeError, ValueError), err:
1485     raise errors.InvalidOS(name, os_dir,
1486                            "API version is not integer (%s)" % str(err))
1487
1488   return api_versions
1489
1490
1491 def DiagnoseOS(top_dirs=None):
1492   """Compute the validity for all OSes.
1493
1494   @type top_dirs: list
1495   @param top_dirs: the list of directories in which to
1496       search (if not given defaults to
1497       L{constants.OS_SEARCH_PATH})
1498   @rtype: list of L{objects.OS}
1499   @return: an OS object for each name in all the given
1500       directories
1501
1502   """
1503   if top_dirs is None:
1504     top_dirs = constants.OS_SEARCH_PATH
1505
1506   result = []
1507   for dir_name in top_dirs:
1508     if os.path.isdir(dir_name):
1509       try:
1510         f_names = utils.ListVisibleFiles(dir_name)
1511       except EnvironmentError, err:
1512         logging.exception("Can't list the OS directory %s", dir_name)
1513         break
1514       for name in f_names:
1515         try:
1516           os_inst = OSFromDisk(name, base_dir=dir_name)
1517           result.append(os_inst)
1518         except errors.InvalidOS, err:
1519           result.append(objects.OS.FromInvalidOS(err))
1520
1521   return result
1522
1523
1524 def OSFromDisk(name, base_dir=None):
1525   """Create an OS instance from disk.
1526
1527   This function will return an OS instance if the given name is a
1528   valid OS name. Otherwise, it will raise an appropriate
1529   L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1530
1531   @type base_dir: string
1532   @keyword base_dir: Base directory containing OS installations.
1533                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1534   @rtype: L{objects.OS}
1535   @return: the OS instance if we find a valid one
1536   @raise errors.InvalidOS: if we don't find a valid OS
1537
1538   """
1539   if base_dir is None:
1540     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1541     if os_dir is None:
1542       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1543   else:
1544     os_dir = os.path.sep.join([base_dir, name])
1545
1546   api_versions = _OSOndiskVersion(name, os_dir)
1547
1548   if constants.OS_API_VERSION not in api_versions:
1549     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1550                            " (found %s want %s)"
1551                            % (api_versions, constants.OS_API_VERSION))
1552
1553   # OS Scripts dictionary, we will populate it with the actual script names
1554   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1555
1556   for script in os_scripts:
1557     os_scripts[script] = os.path.sep.join([os_dir, script])
1558
1559     try:
1560       st = os.stat(os_scripts[script])
1561     except EnvironmentError, err:
1562       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1563                              (script, _ErrnoOrStr(err)))
1564
1565     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1566       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1567                              script)
1568
1569     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1570       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1571                              script)
1572
1573
1574   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1575                     create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1576                     export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1577                     import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1578                     rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1579                     api_versions=api_versions)
1580
1581 def OSEnvironment(instance, debug=0):
1582   """Calculate the environment for an os script.
1583
1584   @type instance: L{objects.Instance}
1585   @param instance: target instance for the os script run
1586   @type debug: integer
1587   @param debug: debug level (0 or 1, for OS Api 10)
1588   @rtype: dict
1589   @return: dict of environment variables
1590   @raise errors.BlockDeviceError: if the block device
1591       cannot be found
1592
1593   """
1594   result = {}
1595   result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1596   result['INSTANCE_NAME'] = instance.name
1597   result['INSTANCE_OS'] = instance.os
1598   result['HYPERVISOR'] = instance.hypervisor
1599   result['DISK_COUNT'] = '%d' % len(instance.disks)
1600   result['NIC_COUNT'] = '%d' % len(instance.nics)
1601   result['DEBUG_LEVEL'] = '%d' % debug
1602   for idx, disk in enumerate(instance.disks):
1603     real_disk = _RecursiveFindBD(disk)
1604     if real_disk is None:
1605       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1606                                     str(disk))
1607     real_disk.Open()
1608     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1609     # FIXME: When disks will have read-only mode, populate this
1610     result['DISK_%d_ACCESS' % idx] = disk.mode
1611     if constants.HV_DISK_TYPE in instance.hvparams:
1612       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1613         instance.hvparams[constants.HV_DISK_TYPE]
1614     if disk.dev_type in constants.LDS_BLOCK:
1615       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1616     elif disk.dev_type == constants.LD_FILE:
1617       result['DISK_%d_BACKEND_TYPE' % idx] = \
1618         'file:%s' % disk.physical_id[0]
1619   for idx, nic in enumerate(instance.nics):
1620     result['NIC_%d_MAC' % idx] = nic.mac
1621     if nic.ip:
1622       result['NIC_%d_IP' % idx] = nic.ip
1623     result['NIC_%d_BRIDGE' % idx] = nic.bridge
1624     if constants.HV_NIC_TYPE in instance.hvparams:
1625       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1626         instance.hvparams[constants.HV_NIC_TYPE]
1627
1628   return result
1629
1630 def GrowBlockDevice(disk, amount):
1631   """Grow a stack of block devices.
1632
1633   This function is called recursively, with the childrens being the
1634   first ones to resize.
1635
1636   @type disk: L{objects.Disk}
1637   @param disk: the disk to be grown
1638   @rtype: (status, result)
1639   @return: a tuple with the status of the operation
1640       (True/False), and the errors message if status
1641       is False
1642
1643   """
1644   r_dev = _RecursiveFindBD(disk)
1645   if r_dev is None:
1646     return False, "Cannot find block device %s" % (disk,)
1647
1648   try:
1649     r_dev.Grow(amount)
1650   except errors.BlockDeviceError, err:
1651     return False, str(err)
1652
1653   return True, None
1654
1655
1656 def SnapshotBlockDevice(disk):
1657   """Create a snapshot copy of a block device.
1658
1659   This function is called recursively, and the snapshot is actually created
1660   just for the leaf lvm backend device.
1661
1662   @type disk: L{objects.Disk}
1663   @param disk: the disk to be snapshotted
1664   @rtype: string
1665   @return: snapshot disk path
1666
1667   """
1668   if disk.children:
1669     if len(disk.children) == 1:
1670       # only one child, let's recurse on it
1671       return SnapshotBlockDevice(disk.children[0])
1672     else:
1673       # more than one child, choose one that matches
1674       for child in disk.children:
1675         if child.size == disk.size:
1676           # return implies breaking the loop
1677           return SnapshotBlockDevice(child)
1678   elif disk.dev_type == constants.LD_LV:
1679     r_dev = _RecursiveFindBD(disk)
1680     if r_dev is not None:
1681       # let's stay on the safe side and ask for the full size, for now
1682       return r_dev.Snapshot(disk.size)
1683     else:
1684       return None
1685   else:
1686     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1687                                  " '%s' of type '%s'" %
1688                                  (disk.unique_id, disk.dev_type))
1689
1690
1691 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1692   """Export a block device snapshot to a remote node.
1693
1694   @type disk: L{objects.Disk}
1695   @param disk: the description of the disk to export
1696   @type dest_node: str
1697   @param dest_node: the destination node to export to
1698   @type instance: L{objects.Instance}
1699   @param instance: the instance object to whom the disk belongs
1700   @type cluster_name: str
1701   @param cluster_name: the cluster name, needed for SSH hostalias
1702   @type idx: int
1703   @param idx: the index of the disk in the instance's disk list,
1704       used to export to the OS scripts environment
1705   @rtype: boolean
1706   @return: the success of the operation
1707
1708   """
1709   export_env = OSEnvironment(instance)
1710
1711   inst_os = OSFromDisk(instance.os)
1712   export_script = inst_os.export_script
1713
1714   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1715                                      instance.name, int(time.time()))
1716   if not os.path.exists(constants.LOG_OS_DIR):
1717     os.mkdir(constants.LOG_OS_DIR, 0750)
1718   real_disk = _RecursiveFindBD(disk)
1719   if real_disk is None:
1720     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1721                                   str(disk))
1722   real_disk.Open()
1723
1724   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1725   export_env['EXPORT_INDEX'] = str(idx)
1726
1727   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1728   destfile = disk.physical_id[1]
1729
1730   # the target command is built out of three individual commands,
1731   # which are joined by pipes; we check each individual command for
1732   # valid parameters
1733   expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1734                                export_script, logfile)
1735
1736   comprcmd = "gzip"
1737
1738   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1739                                 destdir, destdir, destfile)
1740   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1741                                                    constants.GANETI_RUNAS,
1742                                                    destcmd)
1743
1744   # all commands have been checked, so we're safe to combine them
1745   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1746
1747   result = utils.RunCmd(command, env=export_env)
1748
1749   if result.failed:
1750     logging.error("os snapshot export command '%s' returned error: %s"
1751                   " output: %s", command, result.fail_reason, result.output)
1752     return False
1753
1754   return True
1755
1756
1757 def FinalizeExport(instance, snap_disks):
1758   """Write out the export configuration information.
1759
1760   @type instance: L{objects.Instance}
1761   @param instance: the instance which we export, used for
1762       saving configuration
1763   @type snap_disks: list of L{objects.Disk}
1764   @param snap_disks: list of snapshot block devices, which
1765       will be used to get the actual name of the dump file
1766
1767   @rtype: boolean
1768   @return: the success of the operation
1769
1770   """
1771   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1772   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1773
1774   config = objects.SerializableConfigParser()
1775
1776   config.add_section(constants.INISECT_EXP)
1777   config.set(constants.INISECT_EXP, 'version', '0')
1778   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1779   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1780   config.set(constants.INISECT_EXP, 'os', instance.os)
1781   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1782
1783   config.add_section(constants.INISECT_INS)
1784   config.set(constants.INISECT_INS, 'name', instance.name)
1785   config.set(constants.INISECT_INS, 'memory', '%d' %
1786              instance.beparams[constants.BE_MEMORY])
1787   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1788              instance.beparams[constants.BE_VCPUS])
1789   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1790
1791   nic_total = 0
1792   for nic_count, nic in enumerate(instance.nics):
1793     nic_total += 1
1794     config.set(constants.INISECT_INS, 'nic%d_mac' %
1795                nic_count, '%s' % nic.mac)
1796     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1797     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1798                '%s' % nic.bridge)
1799   # TODO: redundant: on load can read nics until it doesn't exist
1800   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1801
1802   disk_total = 0
1803   for disk_count, disk in enumerate(snap_disks):
1804     if disk:
1805       disk_total += 1
1806       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1807                  ('%s' % disk.iv_name))
1808       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1809                  ('%s' % disk.physical_id[1]))
1810       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1811                  ('%d' % disk.size))
1812
1813   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1814
1815   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1816                   data=config.Dumps())
1817   shutil.rmtree(finaldestdir, True)
1818   shutil.move(destdir, finaldestdir)
1819
1820   return True
1821
1822
1823 def ExportInfo(dest):
1824   """Get export configuration information.
1825
1826   @type dest: str
1827   @param dest: directory containing the export
1828
1829   @rtype: L{objects.SerializableConfigParser}
1830   @return: a serializable config file containing the
1831       export info
1832
1833   """
1834   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1835
1836   config = objects.SerializableConfigParser()
1837   config.read(cff)
1838
1839   if (not config.has_section(constants.INISECT_EXP) or
1840       not config.has_section(constants.INISECT_INS)):
1841     return None
1842
1843   return config
1844
1845
1846 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1847   """Import an os image into an instance.
1848
1849   @type instance: L{objects.Instance}
1850   @param instance: instance to import the disks into
1851   @type src_node: string
1852   @param src_node: source node for the disk images
1853   @type src_images: list of string
1854   @param src_images: absolute paths of the disk images
1855   @rtype: list of boolean
1856   @return: each boolean represent the success of importing the n-th disk
1857
1858   """
1859   import_env = OSEnvironment(instance)
1860   inst_os = OSFromDisk(instance.os)
1861   import_script = inst_os.import_script
1862
1863   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1864                                         instance.name, int(time.time()))
1865   if not os.path.exists(constants.LOG_OS_DIR):
1866     os.mkdir(constants.LOG_OS_DIR, 0750)
1867
1868   comprcmd = "gunzip"
1869   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1870                                import_script, logfile)
1871
1872   final_result = []
1873   for idx, image in enumerate(src_images):
1874     if image:
1875       destcmd = utils.BuildShellCmd('cat %s', image)
1876       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1877                                                        constants.GANETI_RUNAS,
1878                                                        destcmd)
1879       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1880       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1881       import_env['IMPORT_INDEX'] = str(idx)
1882       result = utils.RunCmd(command, env=import_env)
1883       if result.failed:
1884         logging.error("Disk import command '%s' returned error: %s"
1885                       " output: %s", command, result.fail_reason,
1886                       result.output)
1887         final_result.append(False)
1888       else:
1889         final_result.append(True)
1890     else:
1891       final_result.append(True)
1892
1893   return final_result
1894
1895
1896 def ListExports():
1897   """Return a list of exports currently available on this machine.
1898
1899   @rtype: list
1900   @return: list of the exports
1901
1902   """
1903   if os.path.isdir(constants.EXPORT_DIR):
1904     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1905   else:
1906     return []
1907
1908
1909 def RemoveExport(export):
1910   """Remove an existing export from the node.
1911
1912   @type export: str
1913   @param export: the name of the export to remove
1914   @rtype: boolean
1915   @return: the success of the operation
1916
1917   """
1918   target = os.path.join(constants.EXPORT_DIR, export)
1919
1920   shutil.rmtree(target)
1921   # TODO: catch some of the relevant exceptions and provide a pretty
1922   # error message if rmtree fails.
1923
1924   return True
1925
1926
1927 def RenameBlockDevices(devlist):
1928   """Rename a list of block devices.
1929
1930   @type devlist: list of tuples
1931   @param devlist: list of tuples of the form  (disk,
1932       new_logical_id, new_physical_id); disk is an
1933       L{objects.Disk} object describing the current disk,
1934       and new logical_id/physical_id is the name we
1935       rename it to
1936   @rtype: boolean
1937   @return: True if all renames succeeded, False otherwise
1938
1939   """
1940   result = True
1941   for disk, unique_id in devlist:
1942     dev = _RecursiveFindBD(disk)
1943     if dev is None:
1944       result = False
1945       continue
1946     try:
1947       old_rpath = dev.dev_path
1948       dev.Rename(unique_id)
1949       new_rpath = dev.dev_path
1950       if old_rpath != new_rpath:
1951         DevCacheManager.RemoveCache(old_rpath)
1952         # FIXME: we should add the new cache information here, like:
1953         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1954         # but we don't have the owner here - maybe parse from existing
1955         # cache? for now, we only lose lvm data when we rename, which
1956         # is less critical than DRBD or MD
1957     except errors.BlockDeviceError, err:
1958       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1959       result = False
1960   return result
1961
1962
1963 def _TransformFileStorageDir(file_storage_dir):
1964   """Checks whether given file_storage_dir is valid.
1965
1966   Checks wheter the given file_storage_dir is within the cluster-wide
1967   default file_storage_dir stored in SimpleStore. Only paths under that
1968   directory are allowed.
1969
1970   @type file_storage_dir: str
1971   @param file_storage_dir: the path to check
1972
1973   @return: the normalized path if valid, None otherwise
1974
1975   """
1976   cfg = _GetConfig()
1977   file_storage_dir = os.path.normpath(file_storage_dir)
1978   base_file_storage_dir = cfg.GetFileStorageDir()
1979   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1980       base_file_storage_dir):
1981     logging.error("file storage directory '%s' is not under base file"
1982                   " storage directory '%s'",
1983                   file_storage_dir, base_file_storage_dir)
1984     return None
1985   return file_storage_dir
1986
1987
1988 def CreateFileStorageDir(file_storage_dir):
1989   """Create file storage directory.
1990
1991   @type file_storage_dir: str
1992   @param file_storage_dir: directory to create
1993
1994   @rtype: tuple
1995   @return: tuple with first element a boolean indicating wheter dir
1996       creation was successful or not
1997
1998   """
1999   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2000   result = True,
2001   if not file_storage_dir:
2002     result = False,
2003   else:
2004     if os.path.exists(file_storage_dir):
2005       if not os.path.isdir(file_storage_dir):
2006         logging.error("'%s' is not a directory", file_storage_dir)
2007         result = False,
2008     else:
2009       try:
2010         os.makedirs(file_storage_dir, 0750)
2011       except OSError, err:
2012         logging.error("Cannot create file storage directory '%s': %s",
2013                       file_storage_dir, err)
2014         result = False,
2015   return result
2016
2017
2018 def RemoveFileStorageDir(file_storage_dir):
2019   """Remove file storage directory.
2020
2021   Remove it only if it's empty. If not log an error and return.
2022
2023   @type file_storage_dir: str
2024   @param file_storage_dir: the directory we should cleanup
2025   @rtype: tuple (success,)
2026   @return: tuple of one element, C{success}, denoting
2027       whether the operation was successfull
2028
2029   """
2030   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2031   result = True,
2032   if not file_storage_dir:
2033     result = False,
2034   else:
2035     if os.path.exists(file_storage_dir):
2036       if not os.path.isdir(file_storage_dir):
2037         logging.error("'%s' is not a directory", file_storage_dir)
2038         result = False,
2039       # deletes dir only if empty, otherwise we want to return False
2040       try:
2041         os.rmdir(file_storage_dir)
2042       except OSError, err:
2043         logging.exception("Cannot remove file storage directory '%s'",
2044                           file_storage_dir)
2045         result = False,
2046   return result
2047
2048
2049 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2050   """Rename the file storage directory.
2051
2052   @type old_file_storage_dir: str
2053   @param old_file_storage_dir: the current path
2054   @type new_file_storage_dir: str
2055   @param new_file_storage_dir: the name we should rename to
2056   @rtype: tuple (success,)
2057   @return: tuple of one element, C{success}, denoting
2058       whether the operation was successful
2059
2060   """
2061   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2062   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2063   result = True,
2064   if not old_file_storage_dir or not new_file_storage_dir:
2065     result = False,
2066   else:
2067     if not os.path.exists(new_file_storage_dir):
2068       if os.path.isdir(old_file_storage_dir):
2069         try:
2070           os.rename(old_file_storage_dir, new_file_storage_dir)
2071         except OSError, err:
2072           logging.exception("Cannot rename '%s' to '%s'",
2073                             old_file_storage_dir, new_file_storage_dir)
2074           result =  False,
2075       else:
2076         logging.error("'%s' is not a directory", old_file_storage_dir)
2077         result = False,
2078     else:
2079       if os.path.exists(old_file_storage_dir):
2080         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
2081                       old_file_storage_dir, new_file_storage_dir)
2082         result = False,
2083   return result
2084
2085
2086 def _IsJobQueueFile(file_name):
2087   """Checks whether the given filename is in the queue directory.
2088
2089   @type file_name: str
2090   @param file_name: the file name we should check
2091   @rtype: boolean
2092   @return: whether the file is under the queue directory
2093
2094   """
2095   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2096   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2097
2098   if not result:
2099     logging.error("'%s' is not a file in the queue directory",
2100                   file_name)
2101
2102   return result
2103
2104
2105 def JobQueueUpdate(file_name, content):
2106   """Updates a file in the queue directory.
2107
2108   This is just a wrapper over L{utils.WriteFile}, with proper
2109   checking.
2110
2111   @type file_name: str
2112   @param file_name: the job file name
2113   @type content: str
2114   @param content: the new job contents
2115   @rtype: boolean
2116   @return: the success of the operation
2117
2118   """
2119   if not _IsJobQueueFile(file_name):
2120     return False
2121
2122   # Write and replace the file atomically
2123   utils.WriteFile(file_name, data=_Decompress(content))
2124
2125   return True
2126
2127
2128 def JobQueueRename(old, new):
2129   """Renames a job queue file.
2130
2131   This is just a wrapper over os.rename with proper checking.
2132
2133   @type old: str
2134   @param old: the old (actual) file name
2135   @type new: str
2136   @param new: the desired file name
2137   @rtype: boolean
2138   @return: the success of the operation
2139
2140   """
2141   if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
2142     return False
2143
2144   utils.RenameFile(old, new, mkdir=True)
2145
2146   return True
2147
2148
2149 def JobQueueSetDrainFlag(drain_flag):
2150   """Set the drain flag for the queue.
2151
2152   This will set or unset the queue drain flag.
2153
2154   @type drain_flag: boolean
2155   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2156   @rtype: boolean
2157   @return: always True
2158   @warning: the function always returns True
2159
2160   """
2161   if drain_flag:
2162     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2163   else:
2164     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2165
2166   return True
2167
2168
2169 def CloseBlockDevices(instance_name, disks):
2170   """Closes the given block devices.
2171
2172   This means they will be switched to secondary mode (in case of
2173   DRBD).
2174
2175   @param instance_name: if the argument is not empty, the symlinks
2176       of this instance will be removed
2177   @type disks: list of L{objects.Disk}
2178   @param disks: the list of disks to be closed
2179   @rtype: tuple (success, message)
2180   @return: a tuple of success and message, where success
2181       indicates the succes of the operation, and message
2182       which will contain the error details in case we
2183       failed
2184
2185   """
2186   bdevs = []
2187   for cf in disks:
2188     rd = _RecursiveFindBD(cf)
2189     if rd is None:
2190       return (False, "Can't find device %s" % cf)
2191     bdevs.append(rd)
2192
2193   msg = []
2194   for rd in bdevs:
2195     try:
2196       rd.Close()
2197     except errors.BlockDeviceError, err:
2198       msg.append(str(err))
2199   if msg:
2200     return (False, "Can't make devices secondary: %s" % ",".join(msg))
2201   else:
2202     if instance_name:
2203       _RemoveBlockDevLinks(instance_name, disks)
2204     return (True, "All devices secondary")
2205
2206
2207 def ValidateHVParams(hvname, hvparams):
2208   """Validates the given hypervisor parameters.
2209
2210   @type hvname: string
2211   @param hvname: the hypervisor name
2212   @type hvparams: dict
2213   @param hvparams: the hypervisor parameters to be validated
2214   @rtype: tuple (success, message)
2215   @return: a tuple of success and message, where success
2216       indicates the succes of the operation, and message
2217       which will contain the error details in case we
2218       failed
2219
2220   """
2221   try:
2222     hv_type = hypervisor.GetHypervisor(hvname)
2223     hv_type.ValidateParameters(hvparams)
2224     return (True, "Validation passed")
2225   except errors.HypervisorError, err:
2226     return (False, str(err))
2227
2228
2229 def DemoteFromMC():
2230   """Demotes the current node from master candidate role.
2231
2232   """
2233   # try to ensure we're not the master by mistake
2234   master, myself = ssconf.GetMasterAndMyself()
2235   if master == myself:
2236     return (False, "ssconf status shows I'm the master node, will not demote")
2237   pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2238   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2239     return (False, "The master daemon is running, will not demote")
2240   try:
2241     utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2242   except EnvironmentError, err:
2243     if err.errno != errno.ENOENT:
2244       return (False, "Error while backing up cluster file: %s" % str(err))
2245   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2246   return (True, "Done")
2247
2248
2249 def _FindDisks(nodes_ip, disks):
2250   """Sets the physical ID on disks and returns the block devices.
2251
2252   """
2253   # set the correct physical ID
2254   my_name = utils.HostInfo().name
2255   for cf in disks:
2256     cf.SetPhysicalID(my_name, nodes_ip)
2257
2258   bdevs = []
2259
2260   for cf in disks:
2261     rd = _RecursiveFindBD(cf)
2262     if rd is None:
2263       return (False, "Can't find device %s" % cf)
2264     bdevs.append(rd)
2265   return (True, bdevs)
2266
2267
2268 def DrbdDisconnectNet(nodes_ip, disks):
2269   """Disconnects the network on a list of drbd devices.
2270
2271   """
2272   status, bdevs = _FindDisks(nodes_ip, disks)
2273   if not status:
2274     return status, bdevs
2275
2276   # disconnect disks
2277   for rd in bdevs:
2278     try:
2279       rd.DisconnectNet()
2280     except errors.BlockDeviceError, err:
2281       logging.exception("Failed to go into standalone mode")
2282       return (False, "Can't change network configuration: %s" % str(err))
2283   return (True, "All disks are now disconnected")
2284
2285
2286 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2287   """Attaches the network on a list of drbd devices.
2288
2289   """
2290   status, bdevs = _FindDisks(nodes_ip, disks)
2291   if not status:
2292     return status, bdevs
2293
2294   if multimaster:
2295     for idx, rd in enumerate(bdevs):
2296       try:
2297         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2298       except EnvironmentError, err:
2299         return (False, "Can't create symlink: %s" % str(err))
2300   # reconnect disks, switch to new master configuration and if
2301   # needed primary mode
2302   for rd in bdevs:
2303     try:
2304       rd.AttachNet(multimaster)
2305     except errors.BlockDeviceError, err:
2306       return (False, "Can't change network configuration: %s" % str(err))
2307   # wait until the disks are connected; we need to retry the re-attach
2308   # if the device becomes standalone, as this might happen if the one
2309   # node disconnects and reconnects in a different mode before the
2310   # other node reconnects; in this case, one or both of the nodes will
2311   # decide it has wrong configuration and switch to standalone
2312   RECONNECT_TIMEOUT = 2 * 60
2313   sleep_time = 0.100 # start with 100 miliseconds
2314   timeout_limit = time.time() + RECONNECT_TIMEOUT
2315   while time.time() < timeout_limit:
2316     all_connected = True
2317     for rd in bdevs:
2318       stats = rd.GetProcStatus()
2319       if not (stats.is_connected or stats.is_in_resync):
2320         all_connected = False
2321       if stats.is_standalone:
2322         # peer had different config info and this node became
2323         # standalone, even though this should not happen with the
2324         # new staged way of changing disk configs
2325         try:
2326           rd.ReAttachNet(multimaster)
2327         except errors.BlockDeviceError, err:
2328           return (False, "Can't change network configuration: %s" % str(err))
2329     if all_connected:
2330       break
2331     time.sleep(sleep_time)
2332     sleep_time = min(5, sleep_time * 1.5)
2333   if not all_connected:
2334     return (False, "Timeout in disk reconnecting")
2335   if multimaster:
2336     # change to primary mode
2337     for rd in bdevs:
2338       rd.Open()
2339   if multimaster:
2340     msg = "multi-master and primary"
2341   else:
2342     msg = "single-master"
2343   return (True, "Disks are now configured as %s" % msg)
2344
2345
2346 def DrbdWaitSync(nodes_ip, disks):
2347   """Wait until DRBDs have synchronized.
2348
2349   """
2350   status, bdevs = _FindDisks(nodes_ip, disks)
2351   if not status:
2352     return status, bdevs
2353
2354   min_resync = 100
2355   alldone = True
2356   failure = False
2357   for rd in bdevs:
2358     stats = rd.GetProcStatus()
2359     if not (stats.is_connected or stats.is_in_resync):
2360       failure = True
2361       break
2362     alldone = alldone and (not stats.is_in_resync)
2363     if stats.sync_percent is not None:
2364       min_resync = min(min_resync, stats.sync_percent)
2365   return (not failure, (alldone, min_resync))
2366
2367
2368 class HooksRunner(object):
2369   """Hook runner.
2370
2371   This class is instantiated on the node side (ganeti-noded) and not
2372   on the master side.
2373
2374   """
2375   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2376
2377   def __init__(self, hooks_base_dir=None):
2378     """Constructor for hooks runner.
2379
2380     @type hooks_base_dir: str or None
2381     @param hooks_base_dir: if not None, this overrides the
2382         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2383
2384     """
2385     if hooks_base_dir is None:
2386       hooks_base_dir = constants.HOOKS_BASE_DIR
2387     self._BASE_DIR = hooks_base_dir
2388
2389   @staticmethod
2390   def ExecHook(script, env):
2391     """Exec one hook script.
2392
2393     @type script: str
2394     @param script: the full path to the script
2395     @type env: dict
2396     @param env: the environment with which to exec the script
2397     @rtype: tuple (success, message)
2398     @return: a tuple of success and message, where success
2399         indicates the succes of the operation, and message
2400         which will contain the error details in case we
2401         failed
2402
2403     """
2404     # exec the process using subprocess and log the output
2405     fdstdin = None
2406     try:
2407       fdstdin = open("/dev/null", "r")
2408       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2409                                stderr=subprocess.STDOUT, close_fds=True,
2410                                shell=False, cwd="/", env=env)
2411       output = ""
2412       try:
2413         output = child.stdout.read(4096)
2414         child.stdout.close()
2415       except EnvironmentError, err:
2416         output += "Hook script error: %s" % str(err)
2417
2418       while True:
2419         try:
2420           result = child.wait()
2421           break
2422         except EnvironmentError, err:
2423           if err.errno == errno.EINTR:
2424             continue
2425           raise
2426     finally:
2427       # try not to leak fds
2428       for fd in (fdstdin, ):
2429         if fd is not None:
2430           try:
2431             fd.close()
2432           except EnvironmentError, err:
2433             # just log the error
2434             #logging.exception("Error while closing fd %s", fd)
2435             pass
2436
2437     return result == 0, utils.SafeEncode(output.strip())
2438
2439   def RunHooks(self, hpath, phase, env):
2440     """Run the scripts in the hooks directory.
2441
2442     @type hpath: str
2443     @param hpath: the path to the hooks directory which
2444         holds the scripts
2445     @type phase: str
2446     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2447         L{constants.HOOKS_PHASE_POST}
2448     @type env: dict
2449     @param env: dictionary with the environment for the hook
2450     @rtype: list
2451     @return: list of 3-element tuples:
2452       - script path
2453       - script result, either L{constants.HKR_SUCCESS} or
2454         L{constants.HKR_FAIL}
2455       - output of the script
2456
2457     @raise errors.ProgrammerError: for invalid input
2458         parameters
2459
2460     """
2461     if phase == constants.HOOKS_PHASE_PRE:
2462       suffix = "pre"
2463     elif phase == constants.HOOKS_PHASE_POST:
2464       suffix = "post"
2465     else:
2466       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2467     rr = []
2468
2469     subdir = "%s-%s.d" % (hpath, suffix)
2470     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2471     try:
2472       dir_contents = utils.ListVisibleFiles(dir_name)
2473     except OSError, err:
2474       # FIXME: must log output in case of failures
2475       return rr
2476
2477     # we use the standard python sort order,
2478     # so 00name is the recommended naming scheme
2479     dir_contents.sort()
2480     for relname in dir_contents:
2481       fname = os.path.join(dir_name, relname)
2482       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2483           self.RE_MASK.match(relname) is not None):
2484         rrval = constants.HKR_SKIP
2485         output = ""
2486       else:
2487         result, output = self.ExecHook(fname, env)
2488         if not result:
2489           rrval = constants.HKR_FAIL
2490         else:
2491           rrval = constants.HKR_SUCCESS
2492       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2493
2494     return rr
2495
2496
2497 class IAllocatorRunner(object):
2498   """IAllocator runner.
2499
2500   This class is instantiated on the node side (ganeti-noded) and not on
2501   the master side.
2502
2503   """
2504   def Run(self, name, idata):
2505     """Run an iallocator script.
2506
2507     @type name: str
2508     @param name: the iallocator script name
2509     @type idata: str
2510     @param idata: the allocator input data
2511
2512     @rtype: tuple
2513     @return: four element tuple of:
2514        - run status (one of the IARUN_ constants)
2515        - stdout
2516        - stderr
2517        - fail reason (as from L{utils.RunResult})
2518
2519     """
2520     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2521                                   os.path.isfile)
2522     if alloc_script is None:
2523       return (constants.IARUN_NOTFOUND, None, None, None)
2524
2525     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2526     try:
2527       os.write(fd, idata)
2528       os.close(fd)
2529       result = utils.RunCmd([alloc_script, fin_name])
2530       if result.failed:
2531         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2532                 result.fail_reason)
2533     finally:
2534       os.unlink(fin_name)
2535
2536     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2537
2538
2539 class DevCacheManager(object):
2540   """Simple class for managing a cache of block device information.
2541
2542   """
2543   _DEV_PREFIX = "/dev/"
2544   _ROOT_DIR = constants.BDEV_CACHE_DIR
2545
2546   @classmethod
2547   def _ConvertPath(cls, dev_path):
2548     """Converts a /dev/name path to the cache file name.
2549
2550     This replaces slashes with underscores and strips the /dev
2551     prefix. It then returns the full path to the cache file.
2552
2553     @type dev_path: str
2554     @param dev_path: the C{/dev/} path name
2555     @rtype: str
2556     @return: the converted path name
2557
2558     """
2559     if dev_path.startswith(cls._DEV_PREFIX):
2560       dev_path = dev_path[len(cls._DEV_PREFIX):]
2561     dev_path = dev_path.replace("/", "_")
2562     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2563     return fpath
2564
2565   @classmethod
2566   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2567     """Updates the cache information for a given device.
2568
2569     @type dev_path: str
2570     @param dev_path: the pathname of the device
2571     @type owner: str
2572     @param owner: the owner (instance name) of the device
2573     @type on_primary: bool
2574     @param on_primary: whether this is the primary
2575         node nor not
2576     @type iv_name: str
2577     @param iv_name: the instance-visible name of the
2578         device, as in objects.Disk.iv_name
2579
2580     @rtype: None
2581
2582     """
2583     if dev_path is None:
2584       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2585       return
2586     fpath = cls._ConvertPath(dev_path)
2587     if on_primary:
2588       state = "primary"
2589     else:
2590       state = "secondary"
2591     if iv_name is None:
2592       iv_name = "not_visible"
2593     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2594     try:
2595       utils.WriteFile(fpath, data=fdata)
2596     except EnvironmentError, err:
2597       logging.exception("Can't update bdev cache for %s", dev_path)
2598
2599   @classmethod
2600   def RemoveCache(cls, dev_path):
2601     """Remove data for a dev_path.
2602
2603     This is just a wrapper over L{utils.RemoveFile} with a converted
2604     path name and logging.
2605
2606     @type dev_path: str
2607     @param dev_path: the pathname of the device
2608
2609     @rtype: None
2610
2611     """
2612     if dev_path is None:
2613       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2614       return
2615     fpath = cls._ConvertPath(dev_path)
2616     try:
2617       utils.RemoveFile(fpath)
2618     except EnvironmentError, err:
2619       logging.exception("Can't update bdev cache for %s", dev_path)