Merge branch 'devel-2.1'
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50
51 from ganeti import errors
52 from ganeti import utils
53 from ganeti import ssh
54 from ganeti import hypervisor
55 from ganeti import constants
56 from ganeti import bdev
57 from ganeti import objects
58 from ganeti import ssconf
59
60
61 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
62 _ALLOWED_CLEAN_DIRS = frozenset([
63   constants.DATA_DIR,
64   constants.JOB_QUEUE_ARCHIVE_DIR,
65   constants.QUEUE_DIR,
66   constants.CRYPTO_KEYS_DIR,
67   ])
68 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
69 _X509_KEY_FILE = "key"
70 _X509_CERT_FILE = "cert"
71
72
73 class RPCFail(Exception):
74   """Class denoting RPC failure.
75
76   Its argument is the error message.
77
78   """
79
80
81 def _Fail(msg, *args, **kwargs):
82   """Log an error and the raise an RPCFail exception.
83
84   This exception is then handled specially in the ganeti daemon and
85   turned into a 'failed' return type. As such, this function is a
86   useful shortcut for logging the error and returning it to the master
87   daemon.
88
89   @type msg: string
90   @param msg: the text of the exception
91   @raise RPCFail
92
93   """
94   if args:
95     msg = msg % args
96   if "log" not in kwargs or kwargs["log"]: # if we should log this error
97     if "exc" in kwargs and kwargs["exc"]:
98       logging.exception(msg)
99     else:
100       logging.error(msg)
101   raise RPCFail(msg)
102
103
104 def _GetConfig():
105   """Simple wrapper to return a SimpleStore.
106
107   @rtype: L{ssconf.SimpleStore}
108   @return: a SimpleStore instance
109
110   """
111   return ssconf.SimpleStore()
112
113
114 def _GetSshRunner(cluster_name):
115   """Simple wrapper to return an SshRunner.
116
117   @type cluster_name: str
118   @param cluster_name: the cluster name, which is needed
119       by the SshRunner constructor
120   @rtype: L{ssh.SshRunner}
121   @return: an SshRunner instance
122
123   """
124   return ssh.SshRunner(cluster_name)
125
126
127 def _Decompress(data):
128   """Unpacks data compressed by the RPC client.
129
130   @type data: list or tuple
131   @param data: Data sent by RPC client
132   @rtype: str
133   @return: Decompressed data
134
135   """
136   assert isinstance(data, (list, tuple))
137   assert len(data) == 2
138   (encoding, content) = data
139   if encoding == constants.RPC_ENCODING_NONE:
140     return content
141   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
142     return zlib.decompress(base64.b64decode(content))
143   else:
144     raise AssertionError("Unknown data encoding")
145
146
147 def _CleanDirectory(path, exclude=None):
148   """Removes all regular files in a directory.
149
150   @type path: str
151   @param path: the directory to clean
152   @type exclude: list
153   @param exclude: list of files to be excluded, defaults
154       to the empty list
155
156   """
157   if path not in _ALLOWED_CLEAN_DIRS:
158     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
159           path)
160
161   if not os.path.isdir(path):
162     return
163   if exclude is None:
164     exclude = []
165   else:
166     # Normalize excluded paths
167     exclude = [os.path.normpath(i) for i in exclude]
168
169   for rel_name in utils.ListVisibleFiles(path):
170     full_name = utils.PathJoin(path, rel_name)
171     if full_name in exclude:
172       continue
173     if os.path.isfile(full_name) and not os.path.islink(full_name):
174       utils.RemoveFile(full_name)
175
176
177 def _BuildUploadFileList():
178   """Build the list of allowed upload files.
179
180   This is abstracted so that it's built only once at module import time.
181
182   """
183   allowed_files = set([
184     constants.CLUSTER_CONF_FILE,
185     constants.ETC_HOSTS,
186     constants.SSH_KNOWN_HOSTS_FILE,
187     constants.VNC_PASSWORD_FILE,
188     constants.RAPI_CERT_FILE,
189     constants.RAPI_USERS_FILE,
190     constants.CONFD_HMAC_KEY,
191     ])
192
193   for hv_name in constants.HYPER_TYPES:
194     hv_class = hypervisor.GetHypervisorClass(hv_name)
195     allowed_files.update(hv_class.GetAncillaryFiles())
196
197   return frozenset(allowed_files)
198
199
200 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
201
202
203 def JobQueuePurge():
204   """Removes job queue files and archived jobs.
205
206   @rtype: tuple
207   @return: True, None
208
209   """
210   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
211   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
212
213
214 def GetMasterInfo():
215   """Returns master information.
216
217   This is an utility function to compute master information, either
218   for consumption here or from the node daemon.
219
220   @rtype: tuple
221   @return: master_netdev, master_ip, master_name
222   @raise RPCFail: in case of errors
223
224   """
225   try:
226     cfg = _GetConfig()
227     master_netdev = cfg.GetMasterNetdev()
228     master_ip = cfg.GetMasterIP()
229     master_node = cfg.GetMasterNode()
230   except errors.ConfigurationError, err:
231     _Fail("Cluster configuration incomplete: %s", err, exc=True)
232   return (master_netdev, master_ip, master_node)
233
234
235 def StartMaster(start_daemons, no_voting):
236   """Activate local node as master node.
237
238   The function will always try activate the IP address of the master
239   (unless someone else has it). It will also start the master daemons,
240   based on the start_daemons parameter.
241
242   @type start_daemons: boolean
243   @param start_daemons: whether to also start the master
244       daemons (ganeti-masterd and ganeti-rapi)
245   @type no_voting: boolean
246   @param no_voting: whether to start ganeti-masterd without a node vote
247       (if start_daemons is True), but still non-interactively
248   @rtype: None
249
250   """
251   # GetMasterInfo will raise an exception if not able to return data
252   master_netdev, master_ip, _ = GetMasterInfo()
253
254   err_msgs = []
255   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
256     if utils.OwnIpAddress(master_ip):
257       # we already have the ip:
258       logging.debug("Master IP already configured, doing nothing")
259     else:
260       msg = "Someone else has the master ip, not activating"
261       logging.error(msg)
262       err_msgs.append(msg)
263   else:
264     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
265                            "dev", master_netdev, "label",
266                            "%s:0" % master_netdev])
267     if result.failed:
268       msg = "Can't activate master IP: %s" % result.output
269       logging.error(msg)
270       err_msgs.append(msg)
271
272     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
273                            "-s", master_ip, master_ip])
274     # we'll ignore the exit code of arping
275
276   # and now start the master and rapi daemons
277   if start_daemons:
278     if no_voting:
279       masterd_args = "--no-voting --yes-do-it"
280     else:
281       masterd_args = ""
282
283     env = {
284       "EXTRA_MASTERD_ARGS": masterd_args,
285       }
286
287     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
288     if result.failed:
289       msg = "Can't start Ganeti master: %s" % result.output
290       logging.error(msg)
291       err_msgs.append(msg)
292
293   if err_msgs:
294     _Fail("; ".join(err_msgs))
295
296
297 def StopMaster(stop_daemons):
298   """Deactivate this node as master.
299
300   The function will always try to deactivate the IP address of the
301   master. It will also stop the master daemons depending on the
302   stop_daemons parameter.
303
304   @type stop_daemons: boolean
305   @param stop_daemons: whether to also stop the master daemons
306       (ganeti-masterd and ganeti-rapi)
307   @rtype: None
308
309   """
310   # TODO: log and report back to the caller the error failures; we
311   # need to decide in which case we fail the RPC for this
312
313   # GetMasterInfo will raise an exception if not able to return data
314   master_netdev, master_ip, _ = GetMasterInfo()
315
316   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
317                          "dev", master_netdev])
318   if result.failed:
319     logging.error("Can't remove the master IP, error: %s", result.output)
320     # but otherwise ignore the failure
321
322   if stop_daemons:
323     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
324     if result.failed:
325       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
326                     " and error %s",
327                     result.cmd, result.exit_code, result.output)
328
329
330 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
331   """Joins this node to the cluster.
332
333   This does the following:
334       - updates the hostkeys of the machine (rsa and dsa)
335       - adds the ssh private key to the user
336       - adds the ssh public key to the users' authorized_keys file
337
338   @type dsa: str
339   @param dsa: the DSA private key to write
340   @type dsapub: str
341   @param dsapub: the DSA public key to write
342   @type rsa: str
343   @param rsa: the RSA private key to write
344   @type rsapub: str
345   @param rsapub: the RSA public key to write
346   @type sshkey: str
347   @param sshkey: the SSH private key to write
348   @type sshpub: str
349   @param sshpub: the SSH public key to write
350   @rtype: boolean
351   @return: the success of the operation
352
353   """
354   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
355                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
356                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
357                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
358   for name, content, mode in sshd_keys:
359     utils.WriteFile(name, data=content, mode=mode)
360
361   try:
362     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
363                                                     mkdir=True)
364   except errors.OpExecError, err:
365     _Fail("Error while processing user ssh files: %s", err, exc=True)
366
367   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
368     utils.WriteFile(name, data=content, mode=0600)
369
370   utils.AddAuthorizedKey(auth_keys, sshpub)
371
372   result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
373   if result.failed:
374     _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
375           result.cmd, result.exit_code, result.output)
376
377
378 def LeaveCluster(modify_ssh_setup):
379   """Cleans up and remove the current node.
380
381   This function cleans up and prepares the current node to be removed
382   from the cluster.
383
384   If processing is successful, then it raises an
385   L{errors.QuitGanetiException} which is used as a special case to
386   shutdown the node daemon.
387
388   @param modify_ssh_setup: boolean
389
390   """
391   _CleanDirectory(constants.DATA_DIR)
392   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
393   JobQueuePurge()
394
395   if modify_ssh_setup:
396     try:
397       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
398
399       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
400
401       utils.RemoveFile(priv_key)
402       utils.RemoveFile(pub_key)
403     except errors.OpExecError:
404       logging.exception("Error while processing ssh files")
405
406   try:
407     utils.RemoveFile(constants.CONFD_HMAC_KEY)
408     utils.RemoveFile(constants.RAPI_CERT_FILE)
409     utils.RemoveFile(constants.NODED_CERT_FILE)
410   except: # pylint: disable-msg=W0702
411     logging.exception("Error while removing cluster secrets")
412
413   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
414   if result.failed:
415     logging.error("Command %s failed with exitcode %s and error %s",
416                   result.cmd, result.exit_code, result.output)
417
418   # Raise a custom exception (handled in ganeti-noded)
419   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
420
421
422 def GetNodeInfo(vgname, hypervisor_type):
423   """Gives back a hash with different information about the node.
424
425   @type vgname: C{string}
426   @param vgname: the name of the volume group to ask for disk space information
427   @type hypervisor_type: C{str}
428   @param hypervisor_type: the name of the hypervisor to ask for
429       memory information
430   @rtype: C{dict}
431   @return: dictionary with the following keys:
432       - vg_size is the size of the configured volume group in MiB
433       - vg_free is the free size of the volume group in MiB
434       - memory_dom0 is the memory allocated for domain0 in MiB
435       - memory_free is the currently available (free) ram in MiB
436       - memory_total is the total number of ram in MiB
437
438   """
439   outputarray = {}
440   vginfo = _GetVGInfo(vgname)
441   outputarray['vg_size'] = vginfo['vg_size']
442   outputarray['vg_free'] = vginfo['vg_free']
443
444   hyper = hypervisor.GetHypervisor(hypervisor_type)
445   hyp_info = hyper.GetNodeInfo()
446   if hyp_info is not None:
447     outputarray.update(hyp_info)
448
449   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
450
451   return outputarray
452
453
454 def VerifyNode(what, cluster_name):
455   """Verify the status of the local node.
456
457   Based on the input L{what} parameter, various checks are done on the
458   local node.
459
460   If the I{filelist} key is present, this list of
461   files is checksummed and the file/checksum pairs are returned.
462
463   If the I{nodelist} key is present, we check that we have
464   connectivity via ssh with the target nodes (and check the hostname
465   report).
466
467   If the I{node-net-test} key is present, we check that we have
468   connectivity to the given nodes via both primary IP and, if
469   applicable, secondary IPs.
470
471   @type what: C{dict}
472   @param what: a dictionary of things to check:
473       - filelist: list of files for which to compute checksums
474       - nodelist: list of nodes we should check ssh communication with
475       - node-net-test: list of nodes we should check node daemon port
476         connectivity with
477       - hypervisor: list with hypervisors to run the verify for
478   @rtype: dict
479   @return: a dictionary with the same keys as the input dict, and
480       values representing the result of the checks
481
482   """
483   result = {}
484
485   if constants.NV_HYPERVISOR in what:
486     result[constants.NV_HYPERVISOR] = tmp = {}
487     for hv_name in what[constants.NV_HYPERVISOR]:
488       try:
489         val = hypervisor.GetHypervisor(hv_name).Verify()
490       except errors.HypervisorError, err:
491         val = "Error while checking hypervisor: %s" % str(err)
492       tmp[hv_name] = val
493
494   if constants.NV_FILELIST in what:
495     result[constants.NV_FILELIST] = utils.FingerprintFiles(
496       what[constants.NV_FILELIST])
497
498   if constants.NV_NODELIST in what:
499     result[constants.NV_NODELIST] = tmp = {}
500     random.shuffle(what[constants.NV_NODELIST])
501     for node in what[constants.NV_NODELIST]:
502       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
503       if not success:
504         tmp[node] = message
505
506   if constants.NV_NODENETTEST in what:
507     result[constants.NV_NODENETTEST] = tmp = {}
508     my_name = utils.HostInfo().name
509     my_pip = my_sip = None
510     for name, pip, sip in what[constants.NV_NODENETTEST]:
511       if name == my_name:
512         my_pip = pip
513         my_sip = sip
514         break
515     if not my_pip:
516       tmp[my_name] = ("Can't find my own primary/secondary IP"
517                       " in the node list")
518     else:
519       port = utils.GetDaemonPort(constants.NODED)
520       for name, pip, sip in what[constants.NV_NODENETTEST]:
521         fail = []
522         if not utils.TcpPing(pip, port, source=my_pip):
523           fail.append("primary")
524         if sip != pip:
525           if not utils.TcpPing(sip, port, source=my_sip):
526             fail.append("secondary")
527         if fail:
528           tmp[name] = ("failure using the %s interface(s)" %
529                        " and ".join(fail))
530
531   if constants.NV_LVLIST in what:
532     try:
533       val = GetVolumeList(what[constants.NV_LVLIST])
534     except RPCFail, err:
535       val = str(err)
536     result[constants.NV_LVLIST] = val
537
538   if constants.NV_INSTANCELIST in what:
539     # GetInstanceList can fail
540     try:
541       val = GetInstanceList(what[constants.NV_INSTANCELIST])
542     except RPCFail, err:
543       val = str(err)
544     result[constants.NV_INSTANCELIST] = val
545
546   if constants.NV_VGLIST in what:
547     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
548
549   if constants.NV_PVLIST in what:
550     result[constants.NV_PVLIST] = \
551       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
552                                    filter_allocatable=False)
553
554   if constants.NV_VERSION in what:
555     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
556                                     constants.RELEASE_VERSION)
557
558   if constants.NV_HVINFO in what:
559     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
560     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
561
562   if constants.NV_DRBDLIST in what:
563     try:
564       used_minors = bdev.DRBD8.GetUsedDevs().keys()
565     except errors.BlockDeviceError, err:
566       logging.warning("Can't get used minors list", exc_info=True)
567       used_minors = str(err)
568     result[constants.NV_DRBDLIST] = used_minors
569
570   if constants.NV_NODESETUP in what:
571     result[constants.NV_NODESETUP] = tmpr = []
572     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
573       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
574                   " under /sys, missing required directories /sys/block"
575                   " and /sys/class/net")
576     if (not os.path.isdir("/proc/sys") or
577         not os.path.isfile("/proc/sysrq-trigger")):
578       tmpr.append("The procfs filesystem doesn't seem to be mounted"
579                   " under /proc, missing required directory /proc/sys and"
580                   " the file /proc/sysrq-trigger")
581
582   if constants.NV_TIME in what:
583     result[constants.NV_TIME] = utils.SplitTime(time.time())
584
585   return result
586
587
588 def GetVolumeList(vg_name):
589   """Compute list of logical volumes and their size.
590
591   @type vg_name: str
592   @param vg_name: the volume group whose LVs we should list
593   @rtype: dict
594   @return:
595       dictionary of all partions (key) with value being a tuple of
596       their size (in MiB), inactive and online status::
597
598         {'test1': ('20.06', True, True)}
599
600       in case of errors, a string is returned with the error
601       details.
602
603   """
604   lvs = {}
605   sep = '|'
606   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
607                          "--separator=%s" % sep,
608                          "-olv_name,lv_size,lv_attr", vg_name])
609   if result.failed:
610     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
611
612   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
613   for line in result.stdout.splitlines():
614     line = line.strip()
615     match = valid_line_re.match(line)
616     if not match:
617       logging.error("Invalid line returned from lvs output: '%s'", line)
618       continue
619     name, size, attr = match.groups()
620     inactive = attr[4] == '-'
621     online = attr[5] == 'o'
622     virtual = attr[0] == 'v'
623     if virtual:
624       # we don't want to report such volumes as existing, since they
625       # don't really hold data
626       continue
627     lvs[name] = (size, inactive, online)
628
629   return lvs
630
631
632 def ListVolumeGroups():
633   """List the volume groups and their size.
634
635   @rtype: dict
636   @return: dictionary with keys volume name and values the
637       size of the volume
638
639   """
640   return utils.ListVolumeGroups()
641
642
643 def NodeVolumes():
644   """List all volumes on this node.
645
646   @rtype: list
647   @return:
648     A list of dictionaries, each having four keys:
649       - name: the logical volume name,
650       - size: the size of the logical volume
651       - dev: the physical device on which the LV lives
652       - vg: the volume group to which it belongs
653
654     In case of errors, we return an empty list and log the
655     error.
656
657     Note that since a logical volume can live on multiple physical
658     volumes, the resulting list might include a logical volume
659     multiple times.
660
661   """
662   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
663                          "--separator=|",
664                          "--options=lv_name,lv_size,devices,vg_name"])
665   if result.failed:
666     _Fail("Failed to list logical volumes, lvs output: %s",
667           result.output)
668
669   def parse_dev(dev):
670     return dev.split('(')[0]
671
672   def handle_dev(dev):
673     return [parse_dev(x) for x in dev.split(",")]
674
675   def map_line(line):
676     line = [v.strip() for v in line]
677     return [{'name': line[0], 'size': line[1],
678              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
679
680   all_devs = []
681   for line in result.stdout.splitlines():
682     if line.count('|') >= 3:
683       all_devs.extend(map_line(line.split('|')))
684     else:
685       logging.warning("Strange line in the output from lvs: '%s'", line)
686   return all_devs
687
688
689 def BridgesExist(bridges_list):
690   """Check if a list of bridges exist on the current node.
691
692   @rtype: boolean
693   @return: C{True} if all of them exist, C{False} otherwise
694
695   """
696   missing = []
697   for bridge in bridges_list:
698     if not utils.BridgeExists(bridge):
699       missing.append(bridge)
700
701   if missing:
702     _Fail("Missing bridges %s", utils.CommaJoin(missing))
703
704
705 def GetInstanceList(hypervisor_list):
706   """Provides a list of instances.
707
708   @type hypervisor_list: list
709   @param hypervisor_list: the list of hypervisors to query information
710
711   @rtype: list
712   @return: a list of all running instances on the current node
713     - instance1.example.com
714     - instance2.example.com
715
716   """
717   results = []
718   for hname in hypervisor_list:
719     try:
720       names = hypervisor.GetHypervisor(hname).ListInstances()
721       results.extend(names)
722     except errors.HypervisorError, err:
723       _Fail("Error enumerating instances (hypervisor %s): %s",
724             hname, err, exc=True)
725
726   return results
727
728
729 def GetInstanceInfo(instance, hname):
730   """Gives back the information about an instance as a dictionary.
731
732   @type instance: string
733   @param instance: the instance name
734   @type hname: string
735   @param hname: the hypervisor type of the instance
736
737   @rtype: dict
738   @return: dictionary with the following keys:
739       - memory: memory size of instance (int)
740       - state: xen state of instance (string)
741       - time: cpu time of instance (float)
742
743   """
744   output = {}
745
746   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
747   if iinfo is not None:
748     output['memory'] = iinfo[2]
749     output['state'] = iinfo[4]
750     output['time'] = iinfo[5]
751
752   return output
753
754
755 def GetInstanceMigratable(instance):
756   """Gives whether an instance can be migrated.
757
758   @type instance: L{objects.Instance}
759   @param instance: object representing the instance to be checked.
760
761   @rtype: tuple
762   @return: tuple of (result, description) where:
763       - result: whether the instance can be migrated or not
764       - description: a description of the issue, if relevant
765
766   """
767   hyper = hypervisor.GetHypervisor(instance.hypervisor)
768   iname = instance.name
769   if iname not in hyper.ListInstances():
770     _Fail("Instance %s is not running", iname)
771
772   for idx in range(len(instance.disks)):
773     link_name = _GetBlockDevSymlinkPath(iname, idx)
774     if not os.path.islink(link_name):
775       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
776
777
778 def GetAllInstancesInfo(hypervisor_list):
779   """Gather data about all instances.
780
781   This is the equivalent of L{GetInstanceInfo}, except that it
782   computes data for all instances at once, thus being faster if one
783   needs data about more than one instance.
784
785   @type hypervisor_list: list
786   @param hypervisor_list: list of hypervisors to query for instance data
787
788   @rtype: dict
789   @return: dictionary of instance: data, with data having the following keys:
790       - memory: memory size of instance (int)
791       - state: xen state of instance (string)
792       - time: cpu time of instance (float)
793       - vcpus: the number of vcpus
794
795   """
796   output = {}
797
798   for hname in hypervisor_list:
799     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
800     if iinfo:
801       for name, _, memory, vcpus, state, times in iinfo:
802         value = {
803           'memory': memory,
804           'vcpus': vcpus,
805           'state': state,
806           'time': times,
807           }
808         if name in output:
809           # we only check static parameters, like memory and vcpus,
810           # and not state and time which can change between the
811           # invocations of the different hypervisors
812           for key in 'memory', 'vcpus':
813             if value[key] != output[name][key]:
814               _Fail("Instance %s is running twice"
815                     " with different parameters", name)
816         output[name] = value
817
818   return output
819
820
821 def _InstanceLogName(kind, os_name, instance):
822   """Compute the OS log filename for a given instance and operation.
823
824   The instance name and os name are passed in as strings since not all
825   operations have these as part of an instance object.
826
827   @type kind: string
828   @param kind: the operation type (e.g. add, import, etc.)
829   @type os_name: string
830   @param os_name: the os name
831   @type instance: string
832   @param instance: the name of the instance being imported/added/etc.
833
834   """
835   base = ("%s-%s-%s-%s.log" %
836           (kind, os_name, instance, utils.TimestampForFilename()))
837   return utils.PathJoin(constants.LOG_OS_DIR, base)
838
839
840 def InstanceOsAdd(instance, reinstall, debug):
841   """Add an OS to an instance.
842
843   @type instance: L{objects.Instance}
844   @param instance: Instance whose OS is to be installed
845   @type reinstall: boolean
846   @param reinstall: whether this is an instance reinstall
847   @type debug: integer
848   @param debug: debug level, passed to the OS scripts
849   @rtype: None
850
851   """
852   inst_os = OSFromDisk(instance.os)
853
854   create_env = OSEnvironment(instance, inst_os, debug)
855   if reinstall:
856     create_env['INSTANCE_REINSTALL'] = "1"
857
858   logfile = _InstanceLogName("add", instance.os, instance.name)
859
860   result = utils.RunCmd([inst_os.create_script], env=create_env,
861                         cwd=inst_os.path, output=logfile,)
862   if result.failed:
863     logging.error("os create command '%s' returned error: %s, logfile: %s,"
864                   " output: %s", result.cmd, result.fail_reason, logfile,
865                   result.output)
866     lines = [utils.SafeEncode(val)
867              for val in utils.TailFile(logfile, lines=20)]
868     _Fail("OS create script failed (%s), last lines in the"
869           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
870
871
872 def RunRenameInstance(instance, old_name, debug):
873   """Run the OS rename script for an instance.
874
875   @type instance: L{objects.Instance}
876   @param instance: Instance whose OS is to be installed
877   @type old_name: string
878   @param old_name: previous instance name
879   @type debug: integer
880   @param debug: debug level, passed to the OS scripts
881   @rtype: boolean
882   @return: the success of the operation
883
884   """
885   inst_os = OSFromDisk(instance.os)
886
887   rename_env = OSEnvironment(instance, inst_os, debug)
888   rename_env['OLD_INSTANCE_NAME'] = old_name
889
890   logfile = _InstanceLogName("rename", instance.os,
891                              "%s-%s" % (old_name, instance.name))
892
893   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
894                         cwd=inst_os.path, output=logfile)
895
896   if result.failed:
897     logging.error("os create command '%s' returned error: %s output: %s",
898                   result.cmd, result.fail_reason, result.output)
899     lines = [utils.SafeEncode(val)
900              for val in utils.TailFile(logfile, lines=20)]
901     _Fail("OS rename script failed (%s), last lines in the"
902           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
903
904
905 def _GetVGInfo(vg_name):
906   """Get information about the volume group.
907
908   @type vg_name: str
909   @param vg_name: the volume group which we query
910   @rtype: dict
911   @return:
912     A dictionary with the following keys:
913       - C{vg_size} is the total size of the volume group in MiB
914       - C{vg_free} is the free size of the volume group in MiB
915       - C{pv_count} are the number of physical disks in that VG
916
917     If an error occurs during gathering of data, we return the same dict
918     with keys all set to None.
919
920   """
921   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
922
923   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
924                          "--nosuffix", "--units=m", "--separator=:", vg_name])
925
926   if retval.failed:
927     logging.error("volume group %s not present", vg_name)
928     return retdic
929   valarr = retval.stdout.strip().rstrip(':').split(':')
930   if len(valarr) == 3:
931     try:
932       retdic = {
933         "vg_size": int(round(float(valarr[0]), 0)),
934         "vg_free": int(round(float(valarr[1]), 0)),
935         "pv_count": int(valarr[2]),
936         }
937     except (TypeError, ValueError), err:
938       logging.exception("Fail to parse vgs output: %s", err)
939   else:
940     logging.error("vgs output has the wrong number of fields (expected"
941                   " three): %s", str(valarr))
942   return retdic
943
944
945 def _GetBlockDevSymlinkPath(instance_name, idx):
946   return utils.PathJoin(constants.DISK_LINKS_DIR,
947                         "%s:%d" % (instance_name, idx))
948
949
950 def _SymlinkBlockDev(instance_name, device_path, idx):
951   """Set up symlinks to a instance's block device.
952
953   This is an auxiliary function run when an instance is start (on the primary
954   node) or when an instance is migrated (on the target node).
955
956
957   @param instance_name: the name of the target instance
958   @param device_path: path of the physical block device, on the node
959   @param idx: the disk index
960   @return: absolute path to the disk's symlink
961
962   """
963   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
964   try:
965     os.symlink(device_path, link_name)
966   except OSError, err:
967     if err.errno == errno.EEXIST:
968       if (not os.path.islink(link_name) or
969           os.readlink(link_name) != device_path):
970         os.remove(link_name)
971         os.symlink(device_path, link_name)
972     else:
973       raise
974
975   return link_name
976
977
978 def _RemoveBlockDevLinks(instance_name, disks):
979   """Remove the block device symlinks belonging to the given instance.
980
981   """
982   for idx, _ in enumerate(disks):
983     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
984     if os.path.islink(link_name):
985       try:
986         os.remove(link_name)
987       except OSError:
988         logging.exception("Can't remove symlink '%s'", link_name)
989
990
991 def _GatherAndLinkBlockDevs(instance):
992   """Set up an instance's block device(s).
993
994   This is run on the primary node at instance startup. The block
995   devices must be already assembled.
996
997   @type instance: L{objects.Instance}
998   @param instance: the instance whose disks we shoul assemble
999   @rtype: list
1000   @return: list of (disk_object, device_path)
1001
1002   """
1003   block_devices = []
1004   for idx, disk in enumerate(instance.disks):
1005     device = _RecursiveFindBD(disk)
1006     if device is None:
1007       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1008                                     str(disk))
1009     device.Open()
1010     try:
1011       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1012     except OSError, e:
1013       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1014                                     e.strerror)
1015
1016     block_devices.append((disk, link_name))
1017
1018   return block_devices
1019
1020
1021 def StartInstance(instance):
1022   """Start an instance.
1023
1024   @type instance: L{objects.Instance}
1025   @param instance: the instance object
1026   @rtype: None
1027
1028   """
1029   running_instances = GetInstanceList([instance.hypervisor])
1030
1031   if instance.name in running_instances:
1032     logging.info("Instance %s already running, not starting", instance.name)
1033     return
1034
1035   try:
1036     block_devices = _GatherAndLinkBlockDevs(instance)
1037     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1038     hyper.StartInstance(instance, block_devices)
1039   except errors.BlockDeviceError, err:
1040     _Fail("Block device error: %s", err, exc=True)
1041   except errors.HypervisorError, err:
1042     _RemoveBlockDevLinks(instance.name, instance.disks)
1043     _Fail("Hypervisor error: %s", err, exc=True)
1044
1045
1046 def InstanceShutdown(instance, timeout):
1047   """Shut an instance down.
1048
1049   @note: this functions uses polling with a hardcoded timeout.
1050
1051   @type instance: L{objects.Instance}
1052   @param instance: the instance object
1053   @type timeout: integer
1054   @param timeout: maximum timeout for soft shutdown
1055   @rtype: None
1056
1057   """
1058   hv_name = instance.hypervisor
1059   hyper = hypervisor.GetHypervisor(hv_name)
1060   iname = instance.name
1061
1062   if instance.name not in hyper.ListInstances():
1063     logging.info("Instance %s not running, doing nothing", iname)
1064     return
1065
1066   class _TryShutdown:
1067     def __init__(self):
1068       self.tried_once = False
1069
1070     def __call__(self):
1071       if iname not in hyper.ListInstances():
1072         return
1073
1074       try:
1075         hyper.StopInstance(instance, retry=self.tried_once)
1076       except errors.HypervisorError, err:
1077         if iname not in hyper.ListInstances():
1078           # if the instance is no longer existing, consider this a
1079           # success and go to cleanup
1080           return
1081
1082         _Fail("Failed to stop instance %s: %s", iname, err)
1083
1084       self.tried_once = True
1085
1086       raise utils.RetryAgain()
1087
1088   try:
1089     utils.Retry(_TryShutdown(), 5, timeout)
1090   except utils.RetryTimeout:
1091     # the shutdown did not succeed
1092     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1093
1094     try:
1095       hyper.StopInstance(instance, force=True)
1096     except errors.HypervisorError, err:
1097       if iname in hyper.ListInstances():
1098         # only raise an error if the instance still exists, otherwise
1099         # the error could simply be "instance ... unknown"!
1100         _Fail("Failed to force stop instance %s: %s", iname, err)
1101
1102     time.sleep(1)
1103
1104     if iname in hyper.ListInstances():
1105       _Fail("Could not shutdown instance %s even by destroy", iname)
1106
1107   _RemoveBlockDevLinks(iname, instance.disks)
1108
1109
1110 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1111   """Reboot an instance.
1112
1113   @type instance: L{objects.Instance}
1114   @param instance: the instance object to reboot
1115   @type reboot_type: str
1116   @param reboot_type: the type of reboot, one the following
1117     constants:
1118       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1119         instance OS, do not recreate the VM
1120       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1121         restart the VM (at the hypervisor level)
1122       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1123         not accepted here, since that mode is handled differently, in
1124         cmdlib, and translates into full stop and start of the
1125         instance (instead of a call_instance_reboot RPC)
1126   @type shutdown_timeout: integer
1127   @param shutdown_timeout: maximum timeout for soft shutdown
1128   @rtype: None
1129
1130   """
1131   running_instances = GetInstanceList([instance.hypervisor])
1132
1133   if instance.name not in running_instances:
1134     _Fail("Cannot reboot instance %s that is not running", instance.name)
1135
1136   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1137   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1138     try:
1139       hyper.RebootInstance(instance)
1140     except errors.HypervisorError, err:
1141       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1142   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1143     try:
1144       InstanceShutdown(instance, shutdown_timeout)
1145       return StartInstance(instance)
1146     except errors.HypervisorError, err:
1147       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1148   else:
1149     _Fail("Invalid reboot_type received: %s", reboot_type)
1150
1151
1152 def MigrationInfo(instance):
1153   """Gather information about an instance to be migrated.
1154
1155   @type instance: L{objects.Instance}
1156   @param instance: the instance definition
1157
1158   """
1159   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1160   try:
1161     info = hyper.MigrationInfo(instance)
1162   except errors.HypervisorError, err:
1163     _Fail("Failed to fetch migration information: %s", err, exc=True)
1164   return info
1165
1166
1167 def AcceptInstance(instance, info, target):
1168   """Prepare the node to accept an instance.
1169
1170   @type instance: L{objects.Instance}
1171   @param instance: the instance definition
1172   @type info: string/data (opaque)
1173   @param info: migration information, from the source node
1174   @type target: string
1175   @param target: target host (usually ip), on this node
1176
1177   """
1178   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1179   try:
1180     hyper.AcceptInstance(instance, info, target)
1181   except errors.HypervisorError, err:
1182     _Fail("Failed to accept instance: %s", err, exc=True)
1183
1184
1185 def FinalizeMigration(instance, info, success):
1186   """Finalize any preparation to accept an instance.
1187
1188   @type instance: L{objects.Instance}
1189   @param instance: the instance definition
1190   @type info: string/data (opaque)
1191   @param info: migration information, from the source node
1192   @type success: boolean
1193   @param success: whether the migration was a success or a failure
1194
1195   """
1196   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1197   try:
1198     hyper.FinalizeMigration(instance, info, success)
1199   except errors.HypervisorError, err:
1200     _Fail("Failed to finalize migration: %s", err, exc=True)
1201
1202
1203 def MigrateInstance(instance, target, live):
1204   """Migrates an instance to another node.
1205
1206   @type instance: L{objects.Instance}
1207   @param instance: the instance definition
1208   @type target: string
1209   @param target: the target node name
1210   @type live: boolean
1211   @param live: whether the migration should be done live or not (the
1212       interpretation of this parameter is left to the hypervisor)
1213   @rtype: tuple
1214   @return: a tuple of (success, msg) where:
1215       - succes is a boolean denoting the success/failure of the operation
1216       - msg is a string with details in case of failure
1217
1218   """
1219   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1220
1221   try:
1222     hyper.MigrateInstance(instance, target, live)
1223   except errors.HypervisorError, err:
1224     _Fail("Failed to migrate instance: %s", err, exc=True)
1225
1226
1227 def BlockdevCreate(disk, size, owner, on_primary, info):
1228   """Creates a block device for an instance.
1229
1230   @type disk: L{objects.Disk}
1231   @param disk: the object describing the disk we should create
1232   @type size: int
1233   @param size: the size of the physical underlying device, in MiB
1234   @type owner: str
1235   @param owner: the name of the instance for which disk is created,
1236       used for device cache data
1237   @type on_primary: boolean
1238   @param on_primary:  indicates if it is the primary node or not
1239   @type info: string
1240   @param info: string that will be sent to the physical device
1241       creation, used for example to set (LVM) tags on LVs
1242
1243   @return: the new unique_id of the device (this can sometime be
1244       computed only after creation), or None. On secondary nodes,
1245       it's not required to return anything.
1246
1247   """
1248   # TODO: remove the obsolete 'size' argument
1249   # pylint: disable-msg=W0613
1250   clist = []
1251   if disk.children:
1252     for child in disk.children:
1253       try:
1254         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1255       except errors.BlockDeviceError, err:
1256         _Fail("Can't assemble device %s: %s", child, err)
1257       if on_primary or disk.AssembleOnSecondary():
1258         # we need the children open in case the device itself has to
1259         # be assembled
1260         try:
1261           # pylint: disable-msg=E1103
1262           crdev.Open()
1263         except errors.BlockDeviceError, err:
1264           _Fail("Can't make child '%s' read-write: %s", child, err)
1265       clist.append(crdev)
1266
1267   try:
1268     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1269   except errors.BlockDeviceError, err:
1270     _Fail("Can't create block device: %s", err)
1271
1272   if on_primary or disk.AssembleOnSecondary():
1273     try:
1274       device.Assemble()
1275     except errors.BlockDeviceError, err:
1276       _Fail("Can't assemble device after creation, unusual event: %s", err)
1277     device.SetSyncSpeed(constants.SYNC_SPEED)
1278     if on_primary or disk.OpenOnSecondary():
1279       try:
1280         device.Open(force=True)
1281       except errors.BlockDeviceError, err:
1282         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1283     DevCacheManager.UpdateCache(device.dev_path, owner,
1284                                 on_primary, disk.iv_name)
1285
1286   device.SetInfo(info)
1287
1288   return device.unique_id
1289
1290
1291 def BlockdevRemove(disk):
1292   """Remove a block device.
1293
1294   @note: This is intended to be called recursively.
1295
1296   @type disk: L{objects.Disk}
1297   @param disk: the disk object we should remove
1298   @rtype: boolean
1299   @return: the success of the operation
1300
1301   """
1302   msgs = []
1303   try:
1304     rdev = _RecursiveFindBD(disk)
1305   except errors.BlockDeviceError, err:
1306     # probably can't attach
1307     logging.info("Can't attach to device %s in remove", disk)
1308     rdev = None
1309   if rdev is not None:
1310     r_path = rdev.dev_path
1311     try:
1312       rdev.Remove()
1313     except errors.BlockDeviceError, err:
1314       msgs.append(str(err))
1315     if not msgs:
1316       DevCacheManager.RemoveCache(r_path)
1317
1318   if disk.children:
1319     for child in disk.children:
1320       try:
1321         BlockdevRemove(child)
1322       except RPCFail, err:
1323         msgs.append(str(err))
1324
1325   if msgs:
1326     _Fail("; ".join(msgs))
1327
1328
1329 def _RecursiveAssembleBD(disk, owner, as_primary):
1330   """Activate a block device for an instance.
1331
1332   This is run on the primary and secondary nodes for an instance.
1333
1334   @note: this function is called recursively.
1335
1336   @type disk: L{objects.Disk}
1337   @param disk: the disk we try to assemble
1338   @type owner: str
1339   @param owner: the name of the instance which owns the disk
1340   @type as_primary: boolean
1341   @param as_primary: if we should make the block device
1342       read/write
1343
1344   @return: the assembled device or None (in case no device
1345       was assembled)
1346   @raise errors.BlockDeviceError: in case there is an error
1347       during the activation of the children or the device
1348       itself
1349
1350   """
1351   children = []
1352   if disk.children:
1353     mcn = disk.ChildrenNeeded()
1354     if mcn == -1:
1355       mcn = 0 # max number of Nones allowed
1356     else:
1357       mcn = len(disk.children) - mcn # max number of Nones
1358     for chld_disk in disk.children:
1359       try:
1360         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1361       except errors.BlockDeviceError, err:
1362         if children.count(None) >= mcn:
1363           raise
1364         cdev = None
1365         logging.error("Error in child activation (but continuing): %s",
1366                       str(err))
1367       children.append(cdev)
1368
1369   if as_primary or disk.AssembleOnSecondary():
1370     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1371     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1372     result = r_dev
1373     if as_primary or disk.OpenOnSecondary():
1374       r_dev.Open()
1375     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1376                                 as_primary, disk.iv_name)
1377
1378   else:
1379     result = True
1380   return result
1381
1382
1383 def BlockdevAssemble(disk, owner, as_primary):
1384   """Activate a block device for an instance.
1385
1386   This is a wrapper over _RecursiveAssembleBD.
1387
1388   @rtype: str or boolean
1389   @return: a C{/dev/...} path for primary nodes, and
1390       C{True} for secondary nodes
1391
1392   """
1393   try:
1394     result = _RecursiveAssembleBD(disk, owner, as_primary)
1395     if isinstance(result, bdev.BlockDev):
1396       # pylint: disable-msg=E1103
1397       result = result.dev_path
1398   except errors.BlockDeviceError, err:
1399     _Fail("Error while assembling disk: %s", err, exc=True)
1400
1401   return result
1402
1403
1404 def BlockdevShutdown(disk):
1405   """Shut down a block device.
1406
1407   First, if the device is assembled (Attach() is successful), then
1408   the device is shutdown. Then the children of the device are
1409   shutdown.
1410
1411   This function is called recursively. Note that we don't cache the
1412   children or such, as oppossed to assemble, shutdown of different
1413   devices doesn't require that the upper device was active.
1414
1415   @type disk: L{objects.Disk}
1416   @param disk: the description of the disk we should
1417       shutdown
1418   @rtype: None
1419
1420   """
1421   msgs = []
1422   r_dev = _RecursiveFindBD(disk)
1423   if r_dev is not None:
1424     r_path = r_dev.dev_path
1425     try:
1426       r_dev.Shutdown()
1427       DevCacheManager.RemoveCache(r_path)
1428     except errors.BlockDeviceError, err:
1429       msgs.append(str(err))
1430
1431   if disk.children:
1432     for child in disk.children:
1433       try:
1434         BlockdevShutdown(child)
1435       except RPCFail, err:
1436         msgs.append(str(err))
1437
1438   if msgs:
1439     _Fail("; ".join(msgs))
1440
1441
1442 def BlockdevAddchildren(parent_cdev, new_cdevs):
1443   """Extend a mirrored block device.
1444
1445   @type parent_cdev: L{objects.Disk}
1446   @param parent_cdev: the disk to which we should add children
1447   @type new_cdevs: list of L{objects.Disk}
1448   @param new_cdevs: the list of children which we should add
1449   @rtype: None
1450
1451   """
1452   parent_bdev = _RecursiveFindBD(parent_cdev)
1453   if parent_bdev is None:
1454     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1455   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1456   if new_bdevs.count(None) > 0:
1457     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1458   parent_bdev.AddChildren(new_bdevs)
1459
1460
1461 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1462   """Shrink a mirrored block device.
1463
1464   @type parent_cdev: L{objects.Disk}
1465   @param parent_cdev: the disk from which we should remove children
1466   @type new_cdevs: list of L{objects.Disk}
1467   @param new_cdevs: the list of children which we should remove
1468   @rtype: None
1469
1470   """
1471   parent_bdev = _RecursiveFindBD(parent_cdev)
1472   if parent_bdev is None:
1473     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1474   devs = []
1475   for disk in new_cdevs:
1476     rpath = disk.StaticDevPath()
1477     if rpath is None:
1478       bd = _RecursiveFindBD(disk)
1479       if bd is None:
1480         _Fail("Can't find device %s while removing children", disk)
1481       else:
1482         devs.append(bd.dev_path)
1483     else:
1484       if not utils.IsNormAbsPath(rpath):
1485         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1486       devs.append(rpath)
1487   parent_bdev.RemoveChildren(devs)
1488
1489
1490 def BlockdevGetmirrorstatus(disks):
1491   """Get the mirroring status of a list of devices.
1492
1493   @type disks: list of L{objects.Disk}
1494   @param disks: the list of disks which we should query
1495   @rtype: disk
1496   @return:
1497       a list of (mirror_done, estimated_time) tuples, which
1498       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1499   @raise errors.BlockDeviceError: if any of the disks cannot be
1500       found
1501
1502   """
1503   stats = []
1504   for dsk in disks:
1505     rbd = _RecursiveFindBD(dsk)
1506     if rbd is None:
1507       _Fail("Can't find device %s", dsk)
1508
1509     stats.append(rbd.CombinedSyncStatus())
1510
1511   return stats
1512
1513
1514 def _RecursiveFindBD(disk):
1515   """Check if a device is activated.
1516
1517   If so, return information about the real device.
1518
1519   @type disk: L{objects.Disk}
1520   @param disk: the disk object we need to find
1521
1522   @return: None if the device can't be found,
1523       otherwise the device instance
1524
1525   """
1526   children = []
1527   if disk.children:
1528     for chdisk in disk.children:
1529       children.append(_RecursiveFindBD(chdisk))
1530
1531   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1532
1533
1534 def BlockdevFind(disk):
1535   """Check if a device is activated.
1536
1537   If it is, return information about the real device.
1538
1539   @type disk: L{objects.Disk}
1540   @param disk: the disk to find
1541   @rtype: None or objects.BlockDevStatus
1542   @return: None if the disk cannot be found, otherwise a the current
1543            information
1544
1545   """
1546   try:
1547     rbd = _RecursiveFindBD(disk)
1548   except errors.BlockDeviceError, err:
1549     _Fail("Failed to find device: %s", err, exc=True)
1550
1551   if rbd is None:
1552     return None
1553
1554   return rbd.GetSyncStatus()
1555
1556
1557 def BlockdevGetsize(disks):
1558   """Computes the size of the given disks.
1559
1560   If a disk is not found, returns None instead.
1561
1562   @type disks: list of L{objects.Disk}
1563   @param disks: the list of disk to compute the size for
1564   @rtype: list
1565   @return: list with elements None if the disk cannot be found,
1566       otherwise the size
1567
1568   """
1569   result = []
1570   for cf in disks:
1571     try:
1572       rbd = _RecursiveFindBD(cf)
1573     except errors.BlockDeviceError:
1574       result.append(None)
1575       continue
1576     if rbd is None:
1577       result.append(None)
1578     else:
1579       result.append(rbd.GetActualSize())
1580   return result
1581
1582
1583 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1584   """Export a block device to a remote node.
1585
1586   @type disk: L{objects.Disk}
1587   @param disk: the description of the disk to export
1588   @type dest_node: str
1589   @param dest_node: the destination node to export to
1590   @type dest_path: str
1591   @param dest_path: the destination path on the target node
1592   @type cluster_name: str
1593   @param cluster_name: the cluster name, needed for SSH hostalias
1594   @rtype: None
1595
1596   """
1597   real_disk = _RecursiveFindBD(disk)
1598   if real_disk is None:
1599     _Fail("Block device '%s' is not set up", disk)
1600
1601   real_disk.Open()
1602
1603   # the block size on the read dd is 1MiB to match our units
1604   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1605                                "dd if=%s bs=1048576 count=%s",
1606                                real_disk.dev_path, str(disk.size))
1607
1608   # we set here a smaller block size as, due to ssh buffering, more
1609   # than 64-128k will mostly ignored; we use nocreat to fail if the
1610   # device is not already there or we pass a wrong path; we use
1611   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1612   # to not buffer too much memory; this means that at best, we flush
1613   # every 64k, which will not be very fast
1614   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1615                                 " oflag=dsync", dest_path)
1616
1617   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1618                                                    constants.GANETI_RUNAS,
1619                                                    destcmd)
1620
1621   # all commands have been checked, so we're safe to combine them
1622   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1623
1624   result = utils.RunCmd(["bash", "-c", command])
1625
1626   if result.failed:
1627     _Fail("Disk copy command '%s' returned error: %s"
1628           " output: %s", command, result.fail_reason, result.output)
1629
1630
1631 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1632   """Write a file to the filesystem.
1633
1634   This allows the master to overwrite(!) a file. It will only perform
1635   the operation if the file belongs to a list of configuration files.
1636
1637   @type file_name: str
1638   @param file_name: the target file name
1639   @type data: str
1640   @param data: the new contents of the file
1641   @type mode: int
1642   @param mode: the mode to give the file (can be None)
1643   @type uid: int
1644   @param uid: the owner of the file (can be -1 for default)
1645   @type gid: int
1646   @param gid: the group of the file (can be -1 for default)
1647   @type atime: float
1648   @param atime: the atime to set on the file (can be None)
1649   @type mtime: float
1650   @param mtime: the mtime to set on the file (can be None)
1651   @rtype: None
1652
1653   """
1654   if not os.path.isabs(file_name):
1655     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1656
1657   if file_name not in _ALLOWED_UPLOAD_FILES:
1658     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1659           file_name)
1660
1661   raw_data = _Decompress(data)
1662
1663   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1664                   atime=atime, mtime=mtime)
1665
1666
1667 def WriteSsconfFiles(values):
1668   """Update all ssconf files.
1669
1670   Wrapper around the SimpleStore.WriteFiles.
1671
1672   """
1673   ssconf.SimpleStore().WriteFiles(values)
1674
1675
1676 def _ErrnoOrStr(err):
1677   """Format an EnvironmentError exception.
1678
1679   If the L{err} argument has an errno attribute, it will be looked up
1680   and converted into a textual C{E...} description. Otherwise the
1681   string representation of the error will be returned.
1682
1683   @type err: L{EnvironmentError}
1684   @param err: the exception to format
1685
1686   """
1687   if hasattr(err, 'errno'):
1688     detail = errno.errorcode[err.errno]
1689   else:
1690     detail = str(err)
1691   return detail
1692
1693
1694 def _OSOndiskAPIVersion(os_dir):
1695   """Compute and return the API version of a given OS.
1696
1697   This function will try to read the API version of the OS residing in
1698   the 'os_dir' directory.
1699
1700   @type os_dir: str
1701   @param os_dir: the directory in which we should look for the OS
1702   @rtype: tuple
1703   @return: tuple (status, data) with status denoting the validity and
1704       data holding either the vaid versions or an error message
1705
1706   """
1707   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1708
1709   try:
1710     st = os.stat(api_file)
1711   except EnvironmentError, err:
1712     return False, ("Required file '%s' not found under path %s: %s" %
1713                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1714
1715   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1716     return False, ("File '%s' in %s is not a regular file" %
1717                    (constants.OS_API_FILE, os_dir))
1718
1719   try:
1720     api_versions = utils.ReadFile(api_file).splitlines()
1721   except EnvironmentError, err:
1722     return False, ("Error while reading the API version file at %s: %s" %
1723                    (api_file, _ErrnoOrStr(err)))
1724
1725   try:
1726     api_versions = [int(version.strip()) for version in api_versions]
1727   except (TypeError, ValueError), err:
1728     return False, ("API version(s) can't be converted to integer: %s" %
1729                    str(err))
1730
1731   return True, api_versions
1732
1733
1734 def DiagnoseOS(top_dirs=None):
1735   """Compute the validity for all OSes.
1736
1737   @type top_dirs: list
1738   @param top_dirs: the list of directories in which to
1739       search (if not given defaults to
1740       L{constants.OS_SEARCH_PATH})
1741   @rtype: list of L{objects.OS}
1742   @return: a list of tuples (name, path, status, diagnose, variants)
1743       for all (potential) OSes under all search paths, where:
1744           - name is the (potential) OS name
1745           - path is the full path to the OS
1746           - status True/False is the validity of the OS
1747           - diagnose is the error message for an invalid OS, otherwise empty
1748           - variants is a list of supported OS variants, if any
1749
1750   """
1751   if top_dirs is None:
1752     top_dirs = constants.OS_SEARCH_PATH
1753
1754   result = []
1755   for dir_name in top_dirs:
1756     if os.path.isdir(dir_name):
1757       try:
1758         f_names = utils.ListVisibleFiles(dir_name)
1759       except EnvironmentError, err:
1760         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1761         break
1762       for name in f_names:
1763         os_path = utils.PathJoin(dir_name, name)
1764         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1765         if status:
1766           diagnose = ""
1767           variants = os_inst.supported_variants
1768         else:
1769           diagnose = os_inst
1770           variants = []
1771         result.append((name, os_path, status, diagnose, variants))
1772
1773   return result
1774
1775
1776 def _TryOSFromDisk(name, base_dir=None):
1777   """Create an OS instance from disk.
1778
1779   This function will return an OS instance if the given name is a
1780   valid OS name.
1781
1782   @type base_dir: string
1783   @keyword base_dir: Base directory containing OS installations.
1784                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1785   @rtype: tuple
1786   @return: success and either the OS instance if we find a valid one,
1787       or error message
1788
1789   """
1790   if base_dir is None:
1791     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1792   else:
1793     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1794
1795   if os_dir is None:
1796     return False, "Directory for OS %s not found in search path" % name
1797
1798   status, api_versions = _OSOndiskAPIVersion(os_dir)
1799   if not status:
1800     # push the error up
1801     return status, api_versions
1802
1803   if not constants.OS_API_VERSIONS.intersection(api_versions):
1804     return False, ("API version mismatch for path '%s': found %s, want %s." %
1805                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1806
1807   # OS Files dictionary, we will populate it with the absolute path names
1808   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1809
1810   if max(api_versions) >= constants.OS_API_V15:
1811     os_files[constants.OS_VARIANTS_FILE] = ''
1812
1813   for filename in os_files:
1814     os_files[filename] = utils.PathJoin(os_dir, filename)
1815
1816     try:
1817       st = os.stat(os_files[filename])
1818     except EnvironmentError, err:
1819       return False, ("File '%s' under path '%s' is missing (%s)" %
1820                      (filename, os_dir, _ErrnoOrStr(err)))
1821
1822     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1823       return False, ("File '%s' under path '%s' is not a regular file" %
1824                      (filename, os_dir))
1825
1826     if filename in constants.OS_SCRIPTS:
1827       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1828         return False, ("File '%s' under path '%s' is not executable" %
1829                        (filename, os_dir))
1830
1831   variants = None
1832   if constants.OS_VARIANTS_FILE in os_files:
1833     variants_file = os_files[constants.OS_VARIANTS_FILE]
1834     try:
1835       variants = utils.ReadFile(variants_file).splitlines()
1836     except EnvironmentError, err:
1837       return False, ("Error while reading the OS variants file at %s: %s" %
1838                      (variants_file, _ErrnoOrStr(err)))
1839     if not variants:
1840       return False, ("No supported os variant found")
1841
1842   os_obj = objects.OS(name=name, path=os_dir,
1843                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1844                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1845                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1846                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1847                       supported_variants=variants,
1848                       api_versions=api_versions)
1849   return True, os_obj
1850
1851
1852 def OSFromDisk(name, base_dir=None):
1853   """Create an OS instance from disk.
1854
1855   This function will return an OS instance if the given name is a
1856   valid OS name. Otherwise, it will raise an appropriate
1857   L{RPCFail} exception, detailing why this is not a valid OS.
1858
1859   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1860   an exception but returns true/false status data.
1861
1862   @type base_dir: string
1863   @keyword base_dir: Base directory containing OS installations.
1864                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1865   @rtype: L{objects.OS}
1866   @return: the OS instance if we find a valid one
1867   @raise RPCFail: if we don't find a valid OS
1868
1869   """
1870   name_only = name.split("+", 1)[0]
1871   status, payload = _TryOSFromDisk(name_only, base_dir)
1872
1873   if not status:
1874     _Fail(payload)
1875
1876   return payload
1877
1878
1879 def OSEnvironment(instance, inst_os, debug=0):
1880   """Calculate the environment for an os script.
1881
1882   @type instance: L{objects.Instance}
1883   @param instance: target instance for the os script run
1884   @type inst_os: L{objects.OS}
1885   @param inst_os: operating system for which the environment is being built
1886   @type debug: integer
1887   @param debug: debug level (0 or 1, for OS Api 10)
1888   @rtype: dict
1889   @return: dict of environment variables
1890   @raise errors.BlockDeviceError: if the block device
1891       cannot be found
1892
1893   """
1894   result = {}
1895   api_version = \
1896     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1897   result['OS_API_VERSION'] = '%d' % api_version
1898   result['INSTANCE_NAME'] = instance.name
1899   result['INSTANCE_OS'] = instance.os
1900   result['HYPERVISOR'] = instance.hypervisor
1901   result['DISK_COUNT'] = '%d' % len(instance.disks)
1902   result['NIC_COUNT'] = '%d' % len(instance.nics)
1903   result['DEBUG_LEVEL'] = '%d' % debug
1904   if api_version >= constants.OS_API_V15:
1905     try:
1906       variant = instance.os.split('+', 1)[1]
1907     except IndexError:
1908       variant = inst_os.supported_variants[0]
1909     result['OS_VARIANT'] = variant
1910   for idx, disk in enumerate(instance.disks):
1911     real_disk = _RecursiveFindBD(disk)
1912     if real_disk is None:
1913       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1914                                     str(disk))
1915     real_disk.Open()
1916     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1917     result['DISK_%d_ACCESS' % idx] = disk.mode
1918     if constants.HV_DISK_TYPE in instance.hvparams:
1919       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1920         instance.hvparams[constants.HV_DISK_TYPE]
1921     if disk.dev_type in constants.LDS_BLOCK:
1922       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1923     elif disk.dev_type == constants.LD_FILE:
1924       result['DISK_%d_BACKEND_TYPE' % idx] = \
1925         'file:%s' % disk.physical_id[0]
1926   for idx, nic in enumerate(instance.nics):
1927     result['NIC_%d_MAC' % idx] = nic.mac
1928     if nic.ip:
1929       result['NIC_%d_IP' % idx] = nic.ip
1930     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1931     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1932       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1933     if nic.nicparams[constants.NIC_LINK]:
1934       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1935     if constants.HV_NIC_TYPE in instance.hvparams:
1936       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1937         instance.hvparams[constants.HV_NIC_TYPE]
1938
1939   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1940     for key, value in source.items():
1941       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1942
1943   return result
1944
1945 def BlockdevGrow(disk, amount):
1946   """Grow a stack of block devices.
1947
1948   This function is called recursively, with the childrens being the
1949   first ones to resize.
1950
1951   @type disk: L{objects.Disk}
1952   @param disk: the disk to be grown
1953   @rtype: (status, result)
1954   @return: a tuple with the status of the operation
1955       (True/False), and the errors message if status
1956       is False
1957
1958   """
1959   r_dev = _RecursiveFindBD(disk)
1960   if r_dev is None:
1961     _Fail("Cannot find block device %s", disk)
1962
1963   try:
1964     r_dev.Grow(amount)
1965   except errors.BlockDeviceError, err:
1966     _Fail("Failed to grow block device: %s", err, exc=True)
1967
1968
1969 def BlockdevSnapshot(disk):
1970   """Create a snapshot copy of a block device.
1971
1972   This function is called recursively, and the snapshot is actually created
1973   just for the leaf lvm backend device.
1974
1975   @type disk: L{objects.Disk}
1976   @param disk: the disk to be snapshotted
1977   @rtype: string
1978   @return: snapshot disk path
1979
1980   """
1981   if disk.dev_type == constants.LD_DRBD8:
1982     if not disk.children:
1983       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
1984             disk.unique_id)
1985     return BlockdevSnapshot(disk.children[0])
1986   elif disk.dev_type == constants.LD_LV:
1987     r_dev = _RecursiveFindBD(disk)
1988     if r_dev is not None:
1989       # FIXME: choose a saner value for the snapshot size
1990       # let's stay on the safe side and ask for the full size, for now
1991       return r_dev.Snapshot(disk.size)
1992     else:
1993       _Fail("Cannot find block device %s", disk)
1994   else:
1995     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1996           disk.unique_id, disk.dev_type)
1997
1998
1999 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
2000   """Export a block device snapshot to a remote node.
2001
2002   @type disk: L{objects.Disk}
2003   @param disk: the description of the disk to export
2004   @type dest_node: str
2005   @param dest_node: the destination node to export to
2006   @type instance: L{objects.Instance}
2007   @param instance: the instance object to whom the disk belongs
2008   @type cluster_name: str
2009   @param cluster_name: the cluster name, needed for SSH hostalias
2010   @type idx: int
2011   @param idx: the index of the disk in the instance's disk list,
2012       used to export to the OS scripts environment
2013   @type debug: integer
2014   @param debug: debug level, passed to the OS scripts
2015   @rtype: None
2016
2017   """
2018   inst_os = OSFromDisk(instance.os)
2019   export_env = OSEnvironment(instance, inst_os, debug)
2020
2021   export_script = inst_os.export_script
2022
2023   logfile = _InstanceLogName("export", inst_os.name, instance.name)
2024   if not os.path.exists(constants.LOG_OS_DIR):
2025     os.mkdir(constants.LOG_OS_DIR, 0750)
2026   real_disk = _RecursiveFindBD(disk)
2027   if real_disk is None:
2028     _Fail("Block device '%s' is not set up", disk)
2029
2030   real_disk.Open()
2031
2032   export_env['EXPORT_DEVICE'] = real_disk.dev_path
2033   export_env['EXPORT_INDEX'] = str(idx)
2034
2035   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2036   destfile = disk.physical_id[1]
2037
2038   # the target command is built out of three individual commands,
2039   # which are joined by pipes; we check each individual command for
2040   # valid parameters
2041   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
2042                                inst_os.path, export_script, logfile)
2043
2044   comprcmd = "gzip"
2045
2046   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
2047                                 destdir, utils.PathJoin(destdir, destfile))
2048   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2049                                                    constants.GANETI_RUNAS,
2050                                                    destcmd)
2051
2052   # all commands have been checked, so we're safe to combine them
2053   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
2054
2055   result = utils.RunCmd(["bash", "-c", command], env=export_env)
2056
2057   if result.failed:
2058     _Fail("OS snapshot export command '%s' returned error: %s"
2059           " output: %s", command, result.fail_reason, result.output)
2060
2061
2062 def FinalizeExport(instance, snap_disks):
2063   """Write out the export configuration information.
2064
2065   @type instance: L{objects.Instance}
2066   @param instance: the instance which we export, used for
2067       saving configuration
2068   @type snap_disks: list of L{objects.Disk}
2069   @param snap_disks: list of snapshot block devices, which
2070       will be used to get the actual name of the dump file
2071
2072   @rtype: None
2073
2074   """
2075   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2076   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2077
2078   config = objects.SerializableConfigParser()
2079
2080   config.add_section(constants.INISECT_EXP)
2081   config.set(constants.INISECT_EXP, 'version', '0')
2082   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2083   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2084   config.set(constants.INISECT_EXP, 'os', instance.os)
2085   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2086
2087   config.add_section(constants.INISECT_INS)
2088   config.set(constants.INISECT_INS, 'name', instance.name)
2089   config.set(constants.INISECT_INS, 'memory', '%d' %
2090              instance.beparams[constants.BE_MEMORY])
2091   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2092              instance.beparams[constants.BE_VCPUS])
2093   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2094
2095   nic_total = 0
2096   for nic_count, nic in enumerate(instance.nics):
2097     nic_total += 1
2098     config.set(constants.INISECT_INS, 'nic%d_mac' %
2099                nic_count, '%s' % nic.mac)
2100     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2101     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2102                '%s' % nic.bridge)
2103   # TODO: redundant: on load can read nics until it doesn't exist
2104   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2105
2106   disk_total = 0
2107   for disk_count, disk in enumerate(snap_disks):
2108     if disk:
2109       disk_total += 1
2110       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2111                  ('%s' % disk.iv_name))
2112       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2113                  ('%s' % disk.physical_id[1]))
2114       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2115                  ('%d' % disk.size))
2116
2117   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2118
2119   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2120                   data=config.Dumps())
2121   shutil.rmtree(finaldestdir, ignore_errors=True)
2122   shutil.move(destdir, finaldestdir)
2123
2124
2125 def ExportInfo(dest):
2126   """Get export configuration information.
2127
2128   @type dest: str
2129   @param dest: directory containing the export
2130
2131   @rtype: L{objects.SerializableConfigParser}
2132   @return: a serializable config file containing the
2133       export info
2134
2135   """
2136   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2137
2138   config = objects.SerializableConfigParser()
2139   config.read(cff)
2140
2141   if (not config.has_section(constants.INISECT_EXP) or
2142       not config.has_section(constants.INISECT_INS)):
2143     _Fail("Export info file doesn't have the required fields")
2144
2145   return config.Dumps()
2146
2147
2148 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
2149   """Import an os image into an instance.
2150
2151   @type instance: L{objects.Instance}
2152   @param instance: instance to import the disks into
2153   @type src_node: string
2154   @param src_node: source node for the disk images
2155   @type src_images: list of string
2156   @param src_images: absolute paths of the disk images
2157   @type debug: integer
2158   @param debug: debug level, passed to the OS scripts
2159   @rtype: list of boolean
2160   @return: each boolean represent the success of importing the n-th disk
2161
2162   """
2163   inst_os = OSFromDisk(instance.os)
2164   import_env = OSEnvironment(instance, inst_os, debug)
2165   import_script = inst_os.import_script
2166
2167   logfile = _InstanceLogName("import", instance.os, instance.name)
2168   if not os.path.exists(constants.LOG_OS_DIR):
2169     os.mkdir(constants.LOG_OS_DIR, 0750)
2170
2171   comprcmd = "gunzip"
2172   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2173                                import_script, logfile)
2174
2175   final_result = []
2176   for idx, image in enumerate(src_images):
2177     if image:
2178       destcmd = utils.BuildShellCmd('cat %s', image)
2179       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2180                                                        constants.GANETI_RUNAS,
2181                                                        destcmd)
2182       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2183       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2184       import_env['IMPORT_INDEX'] = str(idx)
2185       result = utils.RunCmd(command, env=import_env)
2186       if result.failed:
2187         logging.error("Disk import command '%s' returned error: %s"
2188                       " output: %s", command, result.fail_reason,
2189                       result.output)
2190         final_result.append("error importing disk %d: %s, %s" %
2191                             (idx, result.fail_reason, result.output[-100]))
2192
2193   if final_result:
2194     _Fail("; ".join(final_result), log=False)
2195
2196
2197 def ListExports():
2198   """Return a list of exports currently available on this machine.
2199
2200   @rtype: list
2201   @return: list of the exports
2202
2203   """
2204   if os.path.isdir(constants.EXPORT_DIR):
2205     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2206   else:
2207     _Fail("No exports directory")
2208
2209
2210 def RemoveExport(export):
2211   """Remove an existing export from the node.
2212
2213   @type export: str
2214   @param export: the name of the export to remove
2215   @rtype: None
2216
2217   """
2218   target = utils.PathJoin(constants.EXPORT_DIR, export)
2219
2220   try:
2221     shutil.rmtree(target)
2222   except EnvironmentError, err:
2223     _Fail("Error while removing the export: %s", err, exc=True)
2224
2225
2226 def BlockdevRename(devlist):
2227   """Rename a list of block devices.
2228
2229   @type devlist: list of tuples
2230   @param devlist: list of tuples of the form  (disk,
2231       new_logical_id, new_physical_id); disk is an
2232       L{objects.Disk} object describing the current disk,
2233       and new logical_id/physical_id is the name we
2234       rename it to
2235   @rtype: boolean
2236   @return: True if all renames succeeded, False otherwise
2237
2238   """
2239   msgs = []
2240   result = True
2241   for disk, unique_id in devlist:
2242     dev = _RecursiveFindBD(disk)
2243     if dev is None:
2244       msgs.append("Can't find device %s in rename" % str(disk))
2245       result = False
2246       continue
2247     try:
2248       old_rpath = dev.dev_path
2249       dev.Rename(unique_id)
2250       new_rpath = dev.dev_path
2251       if old_rpath != new_rpath:
2252         DevCacheManager.RemoveCache(old_rpath)
2253         # FIXME: we should add the new cache information here, like:
2254         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2255         # but we don't have the owner here - maybe parse from existing
2256         # cache? for now, we only lose lvm data when we rename, which
2257         # is less critical than DRBD or MD
2258     except errors.BlockDeviceError, err:
2259       msgs.append("Can't rename device '%s' to '%s': %s" %
2260                   (dev, unique_id, err))
2261       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2262       result = False
2263   if not result:
2264     _Fail("; ".join(msgs))
2265
2266
2267 def _TransformFileStorageDir(file_storage_dir):
2268   """Checks whether given file_storage_dir is valid.
2269
2270   Checks wheter the given file_storage_dir is within the cluster-wide
2271   default file_storage_dir stored in SimpleStore. Only paths under that
2272   directory are allowed.
2273
2274   @type file_storage_dir: str
2275   @param file_storage_dir: the path to check
2276
2277   @return: the normalized path if valid, None otherwise
2278
2279   """
2280   if not constants.ENABLE_FILE_STORAGE:
2281     _Fail("File storage disabled at configure time")
2282   cfg = _GetConfig()
2283   file_storage_dir = os.path.normpath(file_storage_dir)
2284   base_file_storage_dir = cfg.GetFileStorageDir()
2285   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2286       base_file_storage_dir):
2287     _Fail("File storage directory '%s' is not under base file"
2288           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2289   return file_storage_dir
2290
2291
2292 def CreateFileStorageDir(file_storage_dir):
2293   """Create file storage directory.
2294
2295   @type file_storage_dir: str
2296   @param file_storage_dir: directory to create
2297
2298   @rtype: tuple
2299   @return: tuple with first element a boolean indicating wheter dir
2300       creation was successful or not
2301
2302   """
2303   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2304   if os.path.exists(file_storage_dir):
2305     if not os.path.isdir(file_storage_dir):
2306       _Fail("Specified storage dir '%s' is not a directory",
2307             file_storage_dir)
2308   else:
2309     try:
2310       os.makedirs(file_storage_dir, 0750)
2311     except OSError, err:
2312       _Fail("Cannot create file storage directory '%s': %s",
2313             file_storage_dir, err, exc=True)
2314
2315
2316 def RemoveFileStorageDir(file_storage_dir):
2317   """Remove file storage directory.
2318
2319   Remove it only if it's empty. If not log an error and return.
2320
2321   @type file_storage_dir: str
2322   @param file_storage_dir: the directory we should cleanup
2323   @rtype: tuple (success,)
2324   @return: tuple of one element, C{success}, denoting
2325       whether the operation was successful
2326
2327   """
2328   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2329   if os.path.exists(file_storage_dir):
2330     if not os.path.isdir(file_storage_dir):
2331       _Fail("Specified Storage directory '%s' is not a directory",
2332             file_storage_dir)
2333     # deletes dir only if empty, otherwise we want to fail the rpc call
2334     try:
2335       os.rmdir(file_storage_dir)
2336     except OSError, err:
2337       _Fail("Cannot remove file storage directory '%s': %s",
2338             file_storage_dir, err)
2339
2340
2341 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2342   """Rename the file storage directory.
2343
2344   @type old_file_storage_dir: str
2345   @param old_file_storage_dir: the current path
2346   @type new_file_storage_dir: str
2347   @param new_file_storage_dir: the name we should rename to
2348   @rtype: tuple (success,)
2349   @return: tuple of one element, C{success}, denoting
2350       whether the operation was successful
2351
2352   """
2353   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2354   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2355   if not os.path.exists(new_file_storage_dir):
2356     if os.path.isdir(old_file_storage_dir):
2357       try:
2358         os.rename(old_file_storage_dir, new_file_storage_dir)
2359       except OSError, err:
2360         _Fail("Cannot rename '%s' to '%s': %s",
2361               old_file_storage_dir, new_file_storage_dir, err)
2362     else:
2363       _Fail("Specified storage dir '%s' is not a directory",
2364             old_file_storage_dir)
2365   else:
2366     if os.path.exists(old_file_storage_dir):
2367       _Fail("Cannot rename '%s' to '%s': both locations exist",
2368             old_file_storage_dir, new_file_storage_dir)
2369
2370
2371 def _EnsureJobQueueFile(file_name):
2372   """Checks whether the given filename is in the queue directory.
2373
2374   @type file_name: str
2375   @param file_name: the file name we should check
2376   @rtype: None
2377   @raises RPCFail: if the file is not valid
2378
2379   """
2380   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2381   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2382
2383   if not result:
2384     _Fail("Passed job queue file '%s' does not belong to"
2385           " the queue directory '%s'", file_name, queue_dir)
2386
2387
2388 def JobQueueUpdate(file_name, content):
2389   """Updates a file in the queue directory.
2390
2391   This is just a wrapper over L{utils.WriteFile}, with proper
2392   checking.
2393
2394   @type file_name: str
2395   @param file_name: the job file name
2396   @type content: str
2397   @param content: the new job contents
2398   @rtype: boolean
2399   @return: the success of the operation
2400
2401   """
2402   _EnsureJobQueueFile(file_name)
2403
2404   # Write and replace the file atomically
2405   utils.WriteFile(file_name, data=_Decompress(content))
2406
2407
2408 def JobQueueRename(old, new):
2409   """Renames a job queue file.
2410
2411   This is just a wrapper over os.rename with proper checking.
2412
2413   @type old: str
2414   @param old: the old (actual) file name
2415   @type new: str
2416   @param new: the desired file name
2417   @rtype: tuple
2418   @return: the success of the operation and payload
2419
2420   """
2421   _EnsureJobQueueFile(old)
2422   _EnsureJobQueueFile(new)
2423
2424   utils.RenameFile(old, new, mkdir=True)
2425
2426
2427 def JobQueueSetDrainFlag(drain_flag):
2428   """Set the drain flag for the queue.
2429
2430   This will set or unset the queue drain flag.
2431
2432   @type drain_flag: boolean
2433   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2434   @rtype: truple
2435   @return: always True, None
2436   @warning: the function always returns True
2437
2438   """
2439   if drain_flag:
2440     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2441   else:
2442     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2443
2444
2445 def BlockdevClose(instance_name, disks):
2446   """Closes the given block devices.
2447
2448   This means they will be switched to secondary mode (in case of
2449   DRBD).
2450
2451   @param instance_name: if the argument is not empty, the symlinks
2452       of this instance will be removed
2453   @type disks: list of L{objects.Disk}
2454   @param disks: the list of disks to be closed
2455   @rtype: tuple (success, message)
2456   @return: a tuple of success and message, where success
2457       indicates the succes of the operation, and message
2458       which will contain the error details in case we
2459       failed
2460
2461   """
2462   bdevs = []
2463   for cf in disks:
2464     rd = _RecursiveFindBD(cf)
2465     if rd is None:
2466       _Fail("Can't find device %s", cf)
2467     bdevs.append(rd)
2468
2469   msg = []
2470   for rd in bdevs:
2471     try:
2472       rd.Close()
2473     except errors.BlockDeviceError, err:
2474       msg.append(str(err))
2475   if msg:
2476     _Fail("Can't make devices secondary: %s", ",".join(msg))
2477   else:
2478     if instance_name:
2479       _RemoveBlockDevLinks(instance_name, disks)
2480
2481
2482 def ValidateHVParams(hvname, hvparams):
2483   """Validates the given hypervisor parameters.
2484
2485   @type hvname: string
2486   @param hvname: the hypervisor name
2487   @type hvparams: dict
2488   @param hvparams: the hypervisor parameters to be validated
2489   @rtype: None
2490
2491   """
2492   try:
2493     hv_type = hypervisor.GetHypervisor(hvname)
2494     hv_type.ValidateParameters(hvparams)
2495   except errors.HypervisorError, err:
2496     _Fail(str(err), log=False)
2497
2498
2499 def DemoteFromMC():
2500   """Demotes the current node from master candidate role.
2501
2502   """
2503   # try to ensure we're not the master by mistake
2504   master, myself = ssconf.GetMasterAndMyself()
2505   if master == myself:
2506     _Fail("ssconf status shows I'm the master node, will not demote")
2507
2508   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2509   if not result.failed:
2510     _Fail("The master daemon is running, will not demote")
2511
2512   try:
2513     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2514       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2515   except EnvironmentError, err:
2516     if err.errno != errno.ENOENT:
2517       _Fail("Error while backing up cluster file: %s", err, exc=True)
2518
2519   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2520
2521
2522 def _GetX509Filenames(cryptodir, name):
2523   """Returns the full paths for the private key and certificate.
2524
2525   """
2526   return (utils.PathJoin(cryptodir, name),
2527           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2528           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2529
2530
2531 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2532   """Creates a new X509 certificate for SSL/TLS.
2533
2534   @type validity: int
2535   @param validity: Validity in seconds
2536   @rtype: tuple; (string, string)
2537   @return: Certificate name and public part
2538
2539   """
2540   (key_pem, cert_pem) = \
2541     utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2542                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2543
2544   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2545                               prefix="x509-%s-" % utils.TimestampForFilename())
2546   try:
2547     name = os.path.basename(cert_dir)
2548     assert len(name) > 5
2549
2550     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2551
2552     utils.WriteFile(key_file, mode=0400, data=key_pem)
2553     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2554
2555     # Never return private key as it shouldn't leave the node
2556     return (name, cert_pem)
2557   except Exception:
2558     shutil.rmtree(cert_dir, ignore_errors=True)
2559     raise
2560
2561
2562 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2563   """Removes a X509 certificate.
2564
2565   @type name: string
2566   @param name: Certificate name
2567
2568   """
2569   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2570
2571   utils.RemoveFile(key_file)
2572   utils.RemoveFile(cert_file)
2573
2574   try:
2575     os.rmdir(cert_dir)
2576   except EnvironmentError, err:
2577     _Fail("Cannot remove certificate directory '%s': %s",
2578           cert_dir, err)
2579
2580
2581 def _FindDisks(nodes_ip, disks):
2582   """Sets the physical ID on disks and returns the block devices.
2583
2584   """
2585   # set the correct physical ID
2586   my_name = utils.HostInfo().name
2587   for cf in disks:
2588     cf.SetPhysicalID(my_name, nodes_ip)
2589
2590   bdevs = []
2591
2592   for cf in disks:
2593     rd = _RecursiveFindBD(cf)
2594     if rd is None:
2595       _Fail("Can't find device %s", cf)
2596     bdevs.append(rd)
2597   return bdevs
2598
2599
2600 def DrbdDisconnectNet(nodes_ip, disks):
2601   """Disconnects the network on a list of drbd devices.
2602
2603   """
2604   bdevs = _FindDisks(nodes_ip, disks)
2605
2606   # disconnect disks
2607   for rd in bdevs:
2608     try:
2609       rd.DisconnectNet()
2610     except errors.BlockDeviceError, err:
2611       _Fail("Can't change network configuration to standalone mode: %s",
2612             err, exc=True)
2613
2614
2615 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2616   """Attaches the network on a list of drbd devices.
2617
2618   """
2619   bdevs = _FindDisks(nodes_ip, disks)
2620
2621   if multimaster:
2622     for idx, rd in enumerate(bdevs):
2623       try:
2624         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2625       except EnvironmentError, err:
2626         _Fail("Can't create symlink: %s", err)
2627   # reconnect disks, switch to new master configuration and if
2628   # needed primary mode
2629   for rd in bdevs:
2630     try:
2631       rd.AttachNet(multimaster)
2632     except errors.BlockDeviceError, err:
2633       _Fail("Can't change network configuration: %s", err)
2634
2635   # wait until the disks are connected; we need to retry the re-attach
2636   # if the device becomes standalone, as this might happen if the one
2637   # node disconnects and reconnects in a different mode before the
2638   # other node reconnects; in this case, one or both of the nodes will
2639   # decide it has wrong configuration and switch to standalone
2640
2641   def _Attach():
2642     all_connected = True
2643
2644     for rd in bdevs:
2645       stats = rd.GetProcStatus()
2646
2647       all_connected = (all_connected and
2648                        (stats.is_connected or stats.is_in_resync))
2649
2650       if stats.is_standalone:
2651         # peer had different config info and this node became
2652         # standalone, even though this should not happen with the
2653         # new staged way of changing disk configs
2654         try:
2655           rd.AttachNet(multimaster)
2656         except errors.BlockDeviceError, err:
2657           _Fail("Can't change network configuration: %s", err)
2658
2659     if not all_connected:
2660       raise utils.RetryAgain()
2661
2662   try:
2663     # Start with a delay of 100 miliseconds and go up to 5 seconds
2664     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2665   except utils.RetryTimeout:
2666     _Fail("Timeout in disk reconnecting")
2667
2668   if multimaster:
2669     # change to primary mode
2670     for rd in bdevs:
2671       try:
2672         rd.Open()
2673       except errors.BlockDeviceError, err:
2674         _Fail("Can't change to primary mode: %s", err)
2675
2676
2677 def DrbdWaitSync(nodes_ip, disks):
2678   """Wait until DRBDs have synchronized.
2679
2680   """
2681   def _helper(rd):
2682     stats = rd.GetProcStatus()
2683     if not (stats.is_connected or stats.is_in_resync):
2684       raise utils.RetryAgain()
2685     return stats
2686
2687   bdevs = _FindDisks(nodes_ip, disks)
2688
2689   min_resync = 100
2690   alldone = True
2691   for rd in bdevs:
2692     try:
2693       # poll each second for 15 seconds
2694       stats = utils.Retry(_helper, 1, 15, args=[rd])
2695     except utils.RetryTimeout:
2696       stats = rd.GetProcStatus()
2697       # last check
2698       if not (stats.is_connected or stats.is_in_resync):
2699         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2700     alldone = alldone and (not stats.is_in_resync)
2701     if stats.sync_percent is not None:
2702       min_resync = min(min_resync, stats.sync_percent)
2703
2704   return (alldone, min_resync)
2705
2706
2707 def PowercycleNode(hypervisor_type):
2708   """Hard-powercycle the node.
2709
2710   Because we need to return first, and schedule the powercycle in the
2711   background, we won't be able to report failures nicely.
2712
2713   """
2714   hyper = hypervisor.GetHypervisor(hypervisor_type)
2715   try:
2716     pid = os.fork()
2717   except OSError:
2718     # if we can't fork, we'll pretend that we're in the child process
2719     pid = 0
2720   if pid > 0:
2721     return "Reboot scheduled in 5 seconds"
2722   time.sleep(5)
2723   hyper.PowercycleNode()
2724
2725
2726 class HooksRunner(object):
2727   """Hook runner.
2728
2729   This class is instantiated on the node side (ganeti-noded) and not
2730   on the master side.
2731
2732   """
2733   def __init__(self, hooks_base_dir=None):
2734     """Constructor for hooks runner.
2735
2736     @type hooks_base_dir: str or None
2737     @param hooks_base_dir: if not None, this overrides the
2738         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2739
2740     """
2741     if hooks_base_dir is None:
2742       hooks_base_dir = constants.HOOKS_BASE_DIR
2743     # yeah, _BASE_DIR is not valid for attributes, we use it like a
2744     # constant
2745     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2746
2747   def RunHooks(self, hpath, phase, env):
2748     """Run the scripts in the hooks directory.
2749
2750     @type hpath: str
2751     @param hpath: the path to the hooks directory which
2752         holds the scripts
2753     @type phase: str
2754     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2755         L{constants.HOOKS_PHASE_POST}
2756     @type env: dict
2757     @param env: dictionary with the environment for the hook
2758     @rtype: list
2759     @return: list of 3-element tuples:
2760       - script path
2761       - script result, either L{constants.HKR_SUCCESS} or
2762         L{constants.HKR_FAIL}
2763       - output of the script
2764
2765     @raise errors.ProgrammerError: for invalid input
2766         parameters
2767
2768     """
2769     if phase == constants.HOOKS_PHASE_PRE:
2770       suffix = "pre"
2771     elif phase == constants.HOOKS_PHASE_POST:
2772       suffix = "post"
2773     else:
2774       _Fail("Unknown hooks phase '%s'", phase)
2775
2776
2777     subdir = "%s-%s.d" % (hpath, suffix)
2778     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2779
2780     results = []
2781
2782     if not os.path.isdir(dir_name):
2783       # for non-existing/non-dirs, we simply exit instead of logging a
2784       # warning at every operation
2785       return results
2786
2787     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
2788
2789     for (relname, relstatus, runresult)  in runparts_results:
2790       if relstatus == constants.RUNPARTS_SKIP:
2791         rrval = constants.HKR_SKIP
2792         output = ""
2793       elif relstatus == constants.RUNPARTS_ERR:
2794         rrval = constants.HKR_FAIL
2795         output = "Hook script execution error: %s" % runresult
2796       elif relstatus == constants.RUNPARTS_RUN:
2797         if runresult.failed:
2798           rrval = constants.HKR_FAIL
2799         else:
2800           rrval = constants.HKR_SUCCESS
2801         output = utils.SafeEncode(runresult.output.strip())
2802       results.append(("%s/%s" % (subdir, relname), rrval, output))
2803
2804     return results
2805
2806
2807 class IAllocatorRunner(object):
2808   """IAllocator runner.
2809
2810   This class is instantiated on the node side (ganeti-noded) and not on
2811   the master side.
2812
2813   """
2814   @staticmethod
2815   def Run(name, idata):
2816     """Run an iallocator script.
2817
2818     @type name: str
2819     @param name: the iallocator script name
2820     @type idata: str
2821     @param idata: the allocator input data
2822
2823     @rtype: tuple
2824     @return: two element tuple of:
2825        - status
2826        - either error message or stdout of allocator (for success)
2827
2828     """
2829     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2830                                   os.path.isfile)
2831     if alloc_script is None:
2832       _Fail("iallocator module '%s' not found in the search path", name)
2833
2834     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2835     try:
2836       os.write(fd, idata)
2837       os.close(fd)
2838       result = utils.RunCmd([alloc_script, fin_name])
2839       if result.failed:
2840         _Fail("iallocator module '%s' failed: %s, output '%s'",
2841               name, result.fail_reason, result.output)
2842     finally:
2843       os.unlink(fin_name)
2844
2845     return result.stdout
2846
2847
2848 class DevCacheManager(object):
2849   """Simple class for managing a cache of block device information.
2850
2851   """
2852   _DEV_PREFIX = "/dev/"
2853   _ROOT_DIR = constants.BDEV_CACHE_DIR
2854
2855   @classmethod
2856   def _ConvertPath(cls, dev_path):
2857     """Converts a /dev/name path to the cache file name.
2858
2859     This replaces slashes with underscores and strips the /dev
2860     prefix. It then returns the full path to the cache file.
2861
2862     @type dev_path: str
2863     @param dev_path: the C{/dev/} path name
2864     @rtype: str
2865     @return: the converted path name
2866
2867     """
2868     if dev_path.startswith(cls._DEV_PREFIX):
2869       dev_path = dev_path[len(cls._DEV_PREFIX):]
2870     dev_path = dev_path.replace("/", "_")
2871     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
2872     return fpath
2873
2874   @classmethod
2875   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2876     """Updates the cache information for a given device.
2877
2878     @type dev_path: str
2879     @param dev_path: the pathname of the device
2880     @type owner: str
2881     @param owner: the owner (instance name) of the device
2882     @type on_primary: bool
2883     @param on_primary: whether this is the primary
2884         node nor not
2885     @type iv_name: str
2886     @param iv_name: the instance-visible name of the
2887         device, as in objects.Disk.iv_name
2888
2889     @rtype: None
2890
2891     """
2892     if dev_path is None:
2893       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2894       return
2895     fpath = cls._ConvertPath(dev_path)
2896     if on_primary:
2897       state = "primary"
2898     else:
2899       state = "secondary"
2900     if iv_name is None:
2901       iv_name = "not_visible"
2902     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2903     try:
2904       utils.WriteFile(fpath, data=fdata)
2905     except EnvironmentError, err:
2906       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2907
2908   @classmethod
2909   def RemoveCache(cls, dev_path):
2910     """Remove data for a dev_path.
2911
2912     This is just a wrapper over L{utils.RemoveFile} with a converted
2913     path name and logging.
2914
2915     @type dev_path: str
2916     @param dev_path: the pathname of the device
2917
2918     @rtype: None
2919
2920     """
2921     if dev_path is None:
2922       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2923       return
2924     fpath = cls._ConvertPath(dev_path)
2925     try:
2926       utils.RemoveFile(fpath)
2927     except EnvironmentError, err:
2928       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)