1271681aa20bf367c6a2bcb873b7bf0078c9c345
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50
51 from ganeti import errors
52 from ganeti import utils
53 from ganeti import ssh
54 from ganeti import hypervisor
55 from ganeti import constants
56 from ganeti import bdev
57 from ganeti import objects
58 from ganeti import ssconf
59
60
61 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
62 _ALLOWED_CLEAN_DIRS = frozenset([
63   constants.DATA_DIR,
64   constants.JOB_QUEUE_ARCHIVE_DIR,
65   constants.QUEUE_DIR,
66   constants.CRYPTO_KEYS_DIR,
67   ])
68 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
69 _X509_KEY_FILE = "key"
70 _X509_CERT_FILE = "cert"
71
72
73 class RPCFail(Exception):
74   """Class denoting RPC failure.
75
76   Its argument is the error message.
77
78   """
79
80
81 def _Fail(msg, *args, **kwargs):
82   """Log an error and the raise an RPCFail exception.
83
84   This exception is then handled specially in the ganeti daemon and
85   turned into a 'failed' return type. As such, this function is a
86   useful shortcut for logging the error and returning it to the master
87   daemon.
88
89   @type msg: string
90   @param msg: the text of the exception
91   @raise RPCFail
92
93   """
94   if args:
95     msg = msg % args
96   if "log" not in kwargs or kwargs["log"]: # if we should log this error
97     if "exc" in kwargs and kwargs["exc"]:
98       logging.exception(msg)
99     else:
100       logging.error(msg)
101   raise RPCFail(msg)
102
103
104 def _GetConfig():
105   """Simple wrapper to return a SimpleStore.
106
107   @rtype: L{ssconf.SimpleStore}
108   @return: a SimpleStore instance
109
110   """
111   return ssconf.SimpleStore()
112
113
114 def _GetSshRunner(cluster_name):
115   """Simple wrapper to return an SshRunner.
116
117   @type cluster_name: str
118   @param cluster_name: the cluster name, which is needed
119       by the SshRunner constructor
120   @rtype: L{ssh.SshRunner}
121   @return: an SshRunner instance
122
123   """
124   return ssh.SshRunner(cluster_name)
125
126
127 def _Decompress(data):
128   """Unpacks data compressed by the RPC client.
129
130   @type data: list or tuple
131   @param data: Data sent by RPC client
132   @rtype: str
133   @return: Decompressed data
134
135   """
136   assert isinstance(data, (list, tuple))
137   assert len(data) == 2
138   (encoding, content) = data
139   if encoding == constants.RPC_ENCODING_NONE:
140     return content
141   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
142     return zlib.decompress(base64.b64decode(content))
143   else:
144     raise AssertionError("Unknown data encoding")
145
146
147 def _CleanDirectory(path, exclude=None):
148   """Removes all regular files in a directory.
149
150   @type path: str
151   @param path: the directory to clean
152   @type exclude: list
153   @param exclude: list of files to be excluded, defaults
154       to the empty list
155
156   """
157   if path not in _ALLOWED_CLEAN_DIRS:
158     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
159           path)
160
161   if not os.path.isdir(path):
162     return
163   if exclude is None:
164     exclude = []
165   else:
166     # Normalize excluded paths
167     exclude = [os.path.normpath(i) for i in exclude]
168
169   for rel_name in utils.ListVisibleFiles(path):
170     full_name = utils.PathJoin(path, rel_name)
171     if full_name in exclude:
172       continue
173     if os.path.isfile(full_name) and not os.path.islink(full_name):
174       utils.RemoveFile(full_name)
175
176
177 def _BuildUploadFileList():
178   """Build the list of allowed upload files.
179
180   This is abstracted so that it's built only once at module import time.
181
182   """
183   allowed_files = set([
184     constants.CLUSTER_CONF_FILE,
185     constants.ETC_HOSTS,
186     constants.SSH_KNOWN_HOSTS_FILE,
187     constants.VNC_PASSWORD_FILE,
188     constants.RAPI_CERT_FILE,
189     constants.RAPI_USERS_FILE,
190     constants.CONFD_HMAC_KEY,
191     ])
192
193   for hv_name in constants.HYPER_TYPES:
194     hv_class = hypervisor.GetHypervisorClass(hv_name)
195     allowed_files.update(hv_class.GetAncillaryFiles())
196
197   return frozenset(allowed_files)
198
199
200 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
201
202
203 def JobQueuePurge():
204   """Removes job queue files and archived jobs.
205
206   @rtype: tuple
207   @return: True, None
208
209   """
210   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
211   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
212
213
214 def GetMasterInfo():
215   """Returns master information.
216
217   This is an utility function to compute master information, either
218   for consumption here or from the node daemon.
219
220   @rtype: tuple
221   @return: master_netdev, master_ip, master_name
222   @raise RPCFail: in case of errors
223
224   """
225   try:
226     cfg = _GetConfig()
227     master_netdev = cfg.GetMasterNetdev()
228     master_ip = cfg.GetMasterIP()
229     master_node = cfg.GetMasterNode()
230   except errors.ConfigurationError, err:
231     _Fail("Cluster configuration incomplete: %s", err, exc=True)
232   return (master_netdev, master_ip, master_node)
233
234
235 def StartMaster(start_daemons, no_voting):
236   """Activate local node as master node.
237
238   The function will always try activate the IP address of the master
239   (unless someone else has it). It will also start the master daemons,
240   based on the start_daemons parameter.
241
242   @type start_daemons: boolean
243   @param start_daemons: whether to also start the master
244       daemons (ganeti-masterd and ganeti-rapi)
245   @type no_voting: boolean
246   @param no_voting: whether to start ganeti-masterd without a node vote
247       (if start_daemons is True), but still non-interactively
248   @rtype: None
249
250   """
251   # GetMasterInfo will raise an exception if not able to return data
252   master_netdev, master_ip, _ = GetMasterInfo()
253
254   err_msgs = []
255   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
256     if utils.OwnIpAddress(master_ip):
257       # we already have the ip:
258       logging.debug("Master IP already configured, doing nothing")
259     else:
260       msg = "Someone else has the master ip, not activating"
261       logging.error(msg)
262       err_msgs.append(msg)
263   else:
264     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
265                            "dev", master_netdev, "label",
266                            "%s:0" % master_netdev])
267     if result.failed:
268       msg = "Can't activate master IP: %s" % result.output
269       logging.error(msg)
270       err_msgs.append(msg)
271
272     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
273                            "-s", master_ip, master_ip])
274     # we'll ignore the exit code of arping
275
276   # and now start the master and rapi daemons
277   if start_daemons:
278     if no_voting:
279       masterd_args = "--no-voting --yes-do-it"
280     else:
281       masterd_args = ""
282
283     env = {
284       "EXTRA_MASTERD_ARGS": masterd_args,
285       }
286
287     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
288     if result.failed:
289       msg = "Can't start Ganeti master: %s" % result.output
290       logging.error(msg)
291       err_msgs.append(msg)
292
293   if err_msgs:
294     _Fail("; ".join(err_msgs))
295
296
297 def StopMaster(stop_daemons):
298   """Deactivate this node as master.
299
300   The function will always try to deactivate the IP address of the
301   master. It will also stop the master daemons depending on the
302   stop_daemons parameter.
303
304   @type stop_daemons: boolean
305   @param stop_daemons: whether to also stop the master daemons
306       (ganeti-masterd and ganeti-rapi)
307   @rtype: None
308
309   """
310   # TODO: log and report back to the caller the error failures; we
311   # need to decide in which case we fail the RPC for this
312
313   # GetMasterInfo will raise an exception if not able to return data
314   master_netdev, master_ip, _ = GetMasterInfo()
315
316   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
317                          "dev", master_netdev])
318   if result.failed:
319     logging.error("Can't remove the master IP, error: %s", result.output)
320     # but otherwise ignore the failure
321
322   if stop_daemons:
323     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
324     if result.failed:
325       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
326                     " and error %s",
327                     result.cmd, result.exit_code, result.output)
328
329
330 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
331   """Joins this node to the cluster.
332
333   This does the following:
334       - updates the hostkeys of the machine (rsa and dsa)
335       - adds the ssh private key to the user
336       - adds the ssh public key to the users' authorized_keys file
337
338   @type dsa: str
339   @param dsa: the DSA private key to write
340   @type dsapub: str
341   @param dsapub: the DSA public key to write
342   @type rsa: str
343   @param rsa: the RSA private key to write
344   @type rsapub: str
345   @param rsapub: the RSA public key to write
346   @type sshkey: str
347   @param sshkey: the SSH private key to write
348   @type sshpub: str
349   @param sshpub: the SSH public key to write
350   @rtype: boolean
351   @return: the success of the operation
352
353   """
354   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
355                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
356                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
357                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
358   for name, content, mode in sshd_keys:
359     utils.WriteFile(name, data=content, mode=mode)
360
361   try:
362     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
363                                                     mkdir=True)
364   except errors.OpExecError, err:
365     _Fail("Error while processing user ssh files: %s", err, exc=True)
366
367   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
368     utils.WriteFile(name, data=content, mode=0600)
369
370   utils.AddAuthorizedKey(auth_keys, sshpub)
371
372   result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
373   if result.failed:
374     _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
375           result.cmd, result.exit_code, result.output)
376
377
378 def LeaveCluster(modify_ssh_setup):
379   """Cleans up and remove the current node.
380
381   This function cleans up and prepares the current node to be removed
382   from the cluster.
383
384   If processing is successful, then it raises an
385   L{errors.QuitGanetiException} which is used as a special case to
386   shutdown the node daemon.
387
388   @param modify_ssh_setup: boolean
389
390   """
391   _CleanDirectory(constants.DATA_DIR)
392   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
393   JobQueuePurge()
394
395   if modify_ssh_setup:
396     try:
397       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
398
399       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
400
401       utils.RemoveFile(priv_key)
402       utils.RemoveFile(pub_key)
403     except errors.OpExecError:
404       logging.exception("Error while processing ssh files")
405
406   try:
407     utils.RemoveFile(constants.CONFD_HMAC_KEY)
408     utils.RemoveFile(constants.RAPI_CERT_FILE)
409     utils.RemoveFile(constants.NODED_CERT_FILE)
410   except: # pylint: disable-msg=W0702
411     logging.exception("Error while removing cluster secrets")
412
413   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
414   if result.failed:
415     logging.error("Command %s failed with exitcode %s and error %s",
416                   result.cmd, result.exit_code, result.output)
417
418   # Raise a custom exception (handled in ganeti-noded)
419   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
420
421
422 def GetNodeInfo(vgname, hypervisor_type):
423   """Gives back a hash with different information about the node.
424
425   @type vgname: C{string}
426   @param vgname: the name of the volume group to ask for disk space information
427   @type hypervisor_type: C{str}
428   @param hypervisor_type: the name of the hypervisor to ask for
429       memory information
430   @rtype: C{dict}
431   @return: dictionary with the following keys:
432       - vg_size is the size of the configured volume group in MiB
433       - vg_free is the free size of the volume group in MiB
434       - memory_dom0 is the memory allocated for domain0 in MiB
435       - memory_free is the currently available (free) ram in MiB
436       - memory_total is the total number of ram in MiB
437
438   """
439   outputarray = {}
440   vginfo = _GetVGInfo(vgname)
441   outputarray['vg_size'] = vginfo['vg_size']
442   outputarray['vg_free'] = vginfo['vg_free']
443
444   hyper = hypervisor.GetHypervisor(hypervisor_type)
445   hyp_info = hyper.GetNodeInfo()
446   if hyp_info is not None:
447     outputarray.update(hyp_info)
448
449   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
450
451   return outputarray
452
453
454 def VerifyNode(what, cluster_name):
455   """Verify the status of the local node.
456
457   Based on the input L{what} parameter, various checks are done on the
458   local node.
459
460   If the I{filelist} key is present, this list of
461   files is checksummed and the file/checksum pairs are returned.
462
463   If the I{nodelist} key is present, we check that we have
464   connectivity via ssh with the target nodes (and check the hostname
465   report).
466
467   If the I{node-net-test} key is present, we check that we have
468   connectivity to the given nodes via both primary IP and, if
469   applicable, secondary IPs.
470
471   @type what: C{dict}
472   @param what: a dictionary of things to check:
473       - filelist: list of files for which to compute checksums
474       - nodelist: list of nodes we should check ssh communication with
475       - node-net-test: list of nodes we should check node daemon port
476         connectivity with
477       - hypervisor: list with hypervisors to run the verify for
478   @rtype: dict
479   @return: a dictionary with the same keys as the input dict, and
480       values representing the result of the checks
481
482   """
483   result = {}
484
485   if constants.NV_HYPERVISOR in what:
486     result[constants.NV_HYPERVISOR] = tmp = {}
487     for hv_name in what[constants.NV_HYPERVISOR]:
488       try:
489         val = hypervisor.GetHypervisor(hv_name).Verify()
490       except errors.HypervisorError, err:
491         val = "Error while checking hypervisor: %s" % str(err)
492       tmp[hv_name] = val
493
494   if constants.NV_FILELIST in what:
495     result[constants.NV_FILELIST] = utils.FingerprintFiles(
496       what[constants.NV_FILELIST])
497
498   if constants.NV_NODELIST in what:
499     result[constants.NV_NODELIST] = tmp = {}
500     random.shuffle(what[constants.NV_NODELIST])
501     for node in what[constants.NV_NODELIST]:
502       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
503       if not success:
504         tmp[node] = message
505
506   if constants.NV_NODENETTEST in what:
507     result[constants.NV_NODENETTEST] = tmp = {}
508     my_name = utils.HostInfo().name
509     my_pip = my_sip = None
510     for name, pip, sip in what[constants.NV_NODENETTEST]:
511       if name == my_name:
512         my_pip = pip
513         my_sip = sip
514         break
515     if not my_pip:
516       tmp[my_name] = ("Can't find my own primary/secondary IP"
517                       " in the node list")
518     else:
519       port = utils.GetDaemonPort(constants.NODED)
520       for name, pip, sip in what[constants.NV_NODENETTEST]:
521         fail = []
522         if not utils.TcpPing(pip, port, source=my_pip):
523           fail.append("primary")
524         if sip != pip:
525           if not utils.TcpPing(sip, port, source=my_sip):
526             fail.append("secondary")
527         if fail:
528           tmp[name] = ("failure using the %s interface(s)" %
529                        " and ".join(fail))
530
531   if constants.NV_LVLIST in what:
532     try:
533       val = GetVolumeList(what[constants.NV_LVLIST])
534     except RPCFail, err:
535       val = str(err)
536     result[constants.NV_LVLIST] = val
537
538   if constants.NV_INSTANCELIST in what:
539     # GetInstanceList can fail
540     try:
541       val = GetInstanceList(what[constants.NV_INSTANCELIST])
542     except RPCFail, err:
543       val = str(err)
544     result[constants.NV_INSTANCELIST] = val
545
546   if constants.NV_VGLIST in what:
547     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
548
549   if constants.NV_PVLIST in what:
550     result[constants.NV_PVLIST] = \
551       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
552                                    filter_allocatable=False)
553
554   if constants.NV_VERSION in what:
555     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
556                                     constants.RELEASE_VERSION)
557
558   if constants.NV_HVINFO in what:
559     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
560     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
561
562   if constants.NV_DRBDLIST in what:
563     try:
564       used_minors = bdev.DRBD8.GetUsedDevs().keys()
565     except errors.BlockDeviceError, err:
566       logging.warning("Can't get used minors list", exc_info=True)
567       used_minors = str(err)
568     result[constants.NV_DRBDLIST] = used_minors
569
570   if constants.NV_NODESETUP in what:
571     result[constants.NV_NODESETUP] = tmpr = []
572     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
573       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
574                   " under /sys, missing required directories /sys/block"
575                   " and /sys/class/net")
576     if (not os.path.isdir("/proc/sys") or
577         not os.path.isfile("/proc/sysrq-trigger")):
578       tmpr.append("The procfs filesystem doesn't seem to be mounted"
579                   " under /proc, missing required directory /proc/sys and"
580                   " the file /proc/sysrq-trigger")
581
582   if constants.NV_TIME in what:
583     result[constants.NV_TIME] = utils.SplitTime(time.time())
584
585   return result
586
587
588 def GetVolumeList(vg_name):
589   """Compute list of logical volumes and their size.
590
591   @type vg_name: str
592   @param vg_name: the volume group whose LVs we should list
593   @rtype: dict
594   @return:
595       dictionary of all partions (key) with value being a tuple of
596       their size (in MiB), inactive and online status::
597
598         {'test1': ('20.06', True, True)}
599
600       in case of errors, a string is returned with the error
601       details.
602
603   """
604   lvs = {}
605   sep = '|'
606   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
607                          "--separator=%s" % sep,
608                          "-olv_name,lv_size,lv_attr", vg_name])
609   if result.failed:
610     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
611
612   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
613   for line in result.stdout.splitlines():
614     line = line.strip()
615     match = valid_line_re.match(line)
616     if not match:
617       logging.error("Invalid line returned from lvs output: '%s'", line)
618       continue
619     name, size, attr = match.groups()
620     inactive = attr[4] == '-'
621     online = attr[5] == 'o'
622     virtual = attr[0] == 'v'
623     if virtual:
624       # we don't want to report such volumes as existing, since they
625       # don't really hold data
626       continue
627     lvs[name] = (size, inactive, online)
628
629   return lvs
630
631
632 def ListVolumeGroups():
633   """List the volume groups and their size.
634
635   @rtype: dict
636   @return: dictionary with keys volume name and values the
637       size of the volume
638
639   """
640   return utils.ListVolumeGroups()
641
642
643 def NodeVolumes():
644   """List all volumes on this node.
645
646   @rtype: list
647   @return:
648     A list of dictionaries, each having four keys:
649       - name: the logical volume name,
650       - size: the size of the logical volume
651       - dev: the physical device on which the LV lives
652       - vg: the volume group to which it belongs
653
654     In case of errors, we return an empty list and log the
655     error.
656
657     Note that since a logical volume can live on multiple physical
658     volumes, the resulting list might include a logical volume
659     multiple times.
660
661   """
662   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
663                          "--separator=|",
664                          "--options=lv_name,lv_size,devices,vg_name"])
665   if result.failed:
666     _Fail("Failed to list logical volumes, lvs output: %s",
667           result.output)
668
669   def parse_dev(dev):
670     return dev.split('(')[0]
671
672   def handle_dev(dev):
673     return [parse_dev(x) for x in dev.split(",")]
674
675   def map_line(line):
676     line = [v.strip() for v in line]
677     return [{'name': line[0], 'size': line[1],
678              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
679
680   all_devs = []
681   for line in result.stdout.splitlines():
682     if line.count('|') >= 3:
683       all_devs.extend(map_line(line.split('|')))
684     else:
685       logging.warning("Strange line in the output from lvs: '%s'", line)
686   return all_devs
687
688
689 def BridgesExist(bridges_list):
690   """Check if a list of bridges exist on the current node.
691
692   @rtype: boolean
693   @return: C{True} if all of them exist, C{False} otherwise
694
695   """
696   missing = []
697   for bridge in bridges_list:
698     if not utils.BridgeExists(bridge):
699       missing.append(bridge)
700
701   if missing:
702     _Fail("Missing bridges %s", utils.CommaJoin(missing))
703
704
705 def GetInstanceList(hypervisor_list):
706   """Provides a list of instances.
707
708   @type hypervisor_list: list
709   @param hypervisor_list: the list of hypervisors to query information
710
711   @rtype: list
712   @return: a list of all running instances on the current node
713     - instance1.example.com
714     - instance2.example.com
715
716   """
717   results = []
718   for hname in hypervisor_list:
719     try:
720       names = hypervisor.GetHypervisor(hname).ListInstances()
721       results.extend(names)
722     except errors.HypervisorError, err:
723       _Fail("Error enumerating instances (hypervisor %s): %s",
724             hname, err, exc=True)
725
726   return results
727
728
729 def GetInstanceInfo(instance, hname):
730   """Gives back the information about an instance as a dictionary.
731
732   @type instance: string
733   @param instance: the instance name
734   @type hname: string
735   @param hname: the hypervisor type of the instance
736
737   @rtype: dict
738   @return: dictionary with the following keys:
739       - memory: memory size of instance (int)
740       - state: xen state of instance (string)
741       - time: cpu time of instance (float)
742
743   """
744   output = {}
745
746   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
747   if iinfo is not None:
748     output['memory'] = iinfo[2]
749     output['state'] = iinfo[4]
750     output['time'] = iinfo[5]
751
752   return output
753
754
755 def GetInstanceMigratable(instance):
756   """Gives whether an instance can be migrated.
757
758   @type instance: L{objects.Instance}
759   @param instance: object representing the instance to be checked.
760
761   @rtype: tuple
762   @return: tuple of (result, description) where:
763       - result: whether the instance can be migrated or not
764       - description: a description of the issue, if relevant
765
766   """
767   hyper = hypervisor.GetHypervisor(instance.hypervisor)
768   iname = instance.name
769   if iname not in hyper.ListInstances():
770     _Fail("Instance %s is not running", iname)
771
772   for idx in range(len(instance.disks)):
773     link_name = _GetBlockDevSymlinkPath(iname, idx)
774     if not os.path.islink(link_name):
775       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
776
777
778 def GetAllInstancesInfo(hypervisor_list):
779   """Gather data about all instances.
780
781   This is the equivalent of L{GetInstanceInfo}, except that it
782   computes data for all instances at once, thus being faster if one
783   needs data about more than one instance.
784
785   @type hypervisor_list: list
786   @param hypervisor_list: list of hypervisors to query for instance data
787
788   @rtype: dict
789   @return: dictionary of instance: data, with data having the following keys:
790       - memory: memory size of instance (int)
791       - state: xen state of instance (string)
792       - time: cpu time of instance (float)
793       - vcpus: the number of vcpus
794
795   """
796   output = {}
797
798   for hname in hypervisor_list:
799     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
800     if iinfo:
801       for name, _, memory, vcpus, state, times in iinfo:
802         value = {
803           'memory': memory,
804           'vcpus': vcpus,
805           'state': state,
806           'time': times,
807           }
808         if name in output:
809           # we only check static parameters, like memory and vcpus,
810           # and not state and time which can change between the
811           # invocations of the different hypervisors
812           for key in 'memory', 'vcpus':
813             if value[key] != output[name][key]:
814               _Fail("Instance %s is running twice"
815                     " with different parameters", name)
816         output[name] = value
817
818   return output
819
820
821 def _InstanceLogName(kind, os_name, instance):
822   """Compute the OS log filename for a given instance and operation.
823
824   The instance name and os name are passed in as strings since not all
825   operations have these as part of an instance object.
826
827   @type kind: string
828   @param kind: the operation type (e.g. add, import, etc.)
829   @type os_name: string
830   @param os_name: the os name
831   @type instance: string
832   @param instance: the name of the instance being imported/added/etc.
833
834   """
835   base = ("%s-%s-%s-%s.log" %
836           (kind, os_name, instance, utils.TimestampForFilename()))
837   return utils.PathJoin(constants.LOG_OS_DIR, base)
838
839
840 def InstanceOsAdd(instance, reinstall, debug):
841   """Add an OS to an instance.
842
843   @type instance: L{objects.Instance}
844   @param instance: Instance whose OS is to be installed
845   @type reinstall: boolean
846   @param reinstall: whether this is an instance reinstall
847   @type debug: integer
848   @param debug: debug level, passed to the OS scripts
849   @rtype: None
850
851   """
852   inst_os = OSFromDisk(instance.os)
853
854   create_env = OSEnvironment(instance, inst_os, debug)
855   if reinstall:
856     create_env['INSTANCE_REINSTALL'] = "1"
857
858   logfile = _InstanceLogName("add", instance.os, instance.name)
859
860   result = utils.RunCmd([inst_os.create_script], env=create_env,
861                         cwd=inst_os.path, output=logfile,)
862   if result.failed:
863     logging.error("os create command '%s' returned error: %s, logfile: %s,"
864                   " output: %s", result.cmd, result.fail_reason, logfile,
865                   result.output)
866     lines = [utils.SafeEncode(val)
867              for val in utils.TailFile(logfile, lines=20)]
868     _Fail("OS create script failed (%s), last lines in the"
869           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
870
871
872 def RunRenameInstance(instance, old_name, debug):
873   """Run the OS rename script for an instance.
874
875   @type instance: L{objects.Instance}
876   @param instance: Instance whose OS is to be installed
877   @type old_name: string
878   @param old_name: previous instance name
879   @type debug: integer
880   @param debug: debug level, passed to the OS scripts
881   @rtype: boolean
882   @return: the success of the operation
883
884   """
885   inst_os = OSFromDisk(instance.os)
886
887   rename_env = OSEnvironment(instance, inst_os, debug)
888   rename_env['OLD_INSTANCE_NAME'] = old_name
889
890   logfile = _InstanceLogName("rename", instance.os,
891                              "%s-%s" % (old_name, instance.name))
892
893   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
894                         cwd=inst_os.path, output=logfile)
895
896   if result.failed:
897     logging.error("os create command '%s' returned error: %s output: %s",
898                   result.cmd, result.fail_reason, result.output)
899     lines = [utils.SafeEncode(val)
900              for val in utils.TailFile(logfile, lines=20)]
901     _Fail("OS rename script failed (%s), last lines in the"
902           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
903
904
905 def _GetVGInfo(vg_name):
906   """Get information about the volume group.
907
908   @type vg_name: str
909   @param vg_name: the volume group which we query
910   @rtype: dict
911   @return:
912     A dictionary with the following keys:
913       - C{vg_size} is the total size of the volume group in MiB
914       - C{vg_free} is the free size of the volume group in MiB
915       - C{pv_count} are the number of physical disks in that VG
916
917     If an error occurs during gathering of data, we return the same dict
918     with keys all set to None.
919
920   """
921   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
922
923   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
924                          "--nosuffix", "--units=m", "--separator=:", vg_name])
925
926   if retval.failed:
927     logging.error("volume group %s not present", vg_name)
928     return retdic
929   valarr = retval.stdout.strip().rstrip(':').split(':')
930   if len(valarr) == 3:
931     try:
932       retdic = {
933         "vg_size": int(round(float(valarr[0]), 0)),
934         "vg_free": int(round(float(valarr[1]), 0)),
935         "pv_count": int(valarr[2]),
936         }
937     except (TypeError, ValueError), err:
938       logging.exception("Fail to parse vgs output: %s", err)
939   else:
940     logging.error("vgs output has the wrong number of fields (expected"
941                   " three): %s", str(valarr))
942   return retdic
943
944
945 def _GetBlockDevSymlinkPath(instance_name, idx):
946   return utils.PathJoin(constants.DISK_LINKS_DIR,
947                         "%s:%d" % (instance_name, idx))
948
949
950 def _SymlinkBlockDev(instance_name, device_path, idx):
951   """Set up symlinks to a instance's block device.
952
953   This is an auxiliary function run when an instance is start (on the primary
954   node) or when an instance is migrated (on the target node).
955
956
957   @param instance_name: the name of the target instance
958   @param device_path: path of the physical block device, on the node
959   @param idx: the disk index
960   @return: absolute path to the disk's symlink
961
962   """
963   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
964   try:
965     os.symlink(device_path, link_name)
966   except OSError, err:
967     if err.errno == errno.EEXIST:
968       if (not os.path.islink(link_name) or
969           os.readlink(link_name) != device_path):
970         os.remove(link_name)
971         os.symlink(device_path, link_name)
972     else:
973       raise
974
975   return link_name
976
977
978 def _RemoveBlockDevLinks(instance_name, disks):
979   """Remove the block device symlinks belonging to the given instance.
980
981   """
982   for idx, _ in enumerate(disks):
983     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
984     if os.path.islink(link_name):
985       try:
986         os.remove(link_name)
987       except OSError:
988         logging.exception("Can't remove symlink '%s'", link_name)
989
990
991 def _GatherAndLinkBlockDevs(instance):
992   """Set up an instance's block device(s).
993
994   This is run on the primary node at instance startup. The block
995   devices must be already assembled.
996
997   @type instance: L{objects.Instance}
998   @param instance: the instance whose disks we shoul assemble
999   @rtype: list
1000   @return: list of (disk_object, device_path)
1001
1002   """
1003   block_devices = []
1004   for idx, disk in enumerate(instance.disks):
1005     device = _RecursiveFindBD(disk)
1006     if device is None:
1007       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1008                                     str(disk))
1009     device.Open()
1010     try:
1011       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1012     except OSError, e:
1013       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1014                                     e.strerror)
1015
1016     block_devices.append((disk, link_name))
1017
1018   return block_devices
1019
1020
1021 def StartInstance(instance):
1022   """Start an instance.
1023
1024   @type instance: L{objects.Instance}
1025   @param instance: the instance object
1026   @rtype: None
1027
1028   """
1029   running_instances = GetInstanceList([instance.hypervisor])
1030
1031   if instance.name in running_instances:
1032     logging.info("Instance %s already running, not starting", instance.name)
1033     return
1034
1035   try:
1036     block_devices = _GatherAndLinkBlockDevs(instance)
1037     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1038     hyper.StartInstance(instance, block_devices)
1039   except errors.BlockDeviceError, err:
1040     _Fail("Block device error: %s", err, exc=True)
1041   except errors.HypervisorError, err:
1042     _RemoveBlockDevLinks(instance.name, instance.disks)
1043     _Fail("Hypervisor error: %s", err, exc=True)
1044
1045
1046 def InstanceShutdown(instance, timeout):
1047   """Shut an instance down.
1048
1049   @note: this functions uses polling with a hardcoded timeout.
1050
1051   @type instance: L{objects.Instance}
1052   @param instance: the instance object
1053   @type timeout: integer
1054   @param timeout: maximum timeout for soft shutdown
1055   @rtype: None
1056
1057   """
1058   hv_name = instance.hypervisor
1059   hyper = hypervisor.GetHypervisor(hv_name)
1060   iname = instance.name
1061
1062   if instance.name not in hyper.ListInstances():
1063     logging.info("Instance %s not running, doing nothing", iname)
1064     return
1065
1066   class _TryShutdown:
1067     def __init__(self):
1068       self.tried_once = False
1069
1070     def __call__(self):
1071       if iname not in hyper.ListInstances():
1072         return
1073
1074       try:
1075         hyper.StopInstance(instance, retry=self.tried_once)
1076       except errors.HypervisorError, err:
1077         if iname not in hyper.ListInstances():
1078           # if the instance is no longer existing, consider this a
1079           # success and go to cleanup
1080           return
1081
1082         _Fail("Failed to stop instance %s: %s", iname, err)
1083
1084       self.tried_once = True
1085
1086       raise utils.RetryAgain()
1087
1088   try:
1089     utils.Retry(_TryShutdown(), 5, timeout)
1090   except utils.RetryTimeout:
1091     # the shutdown did not succeed
1092     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1093
1094     try:
1095       hyper.StopInstance(instance, force=True)
1096     except errors.HypervisorError, err:
1097       if iname in hyper.ListInstances():
1098         # only raise an error if the instance still exists, otherwise
1099         # the error could simply be "instance ... unknown"!
1100         _Fail("Failed to force stop instance %s: %s", iname, err)
1101
1102     time.sleep(1)
1103
1104     if iname in hyper.ListInstances():
1105       _Fail("Could not shutdown instance %s even by destroy", iname)
1106
1107   _RemoveBlockDevLinks(iname, instance.disks)
1108
1109
1110 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1111   """Reboot an instance.
1112
1113   @type instance: L{objects.Instance}
1114   @param instance: the instance object to reboot
1115   @type reboot_type: str
1116   @param reboot_type: the type of reboot, one the following
1117     constants:
1118       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1119         instance OS, do not recreate the VM
1120       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1121         restart the VM (at the hypervisor level)
1122       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1123         not accepted here, since that mode is handled differently, in
1124         cmdlib, and translates into full stop and start of the
1125         instance (instead of a call_instance_reboot RPC)
1126   @type shutdown_timeout: integer
1127   @param shutdown_timeout: maximum timeout for soft shutdown
1128   @rtype: None
1129
1130   """
1131   running_instances = GetInstanceList([instance.hypervisor])
1132
1133   if instance.name not in running_instances:
1134     _Fail("Cannot reboot instance %s that is not running", instance.name)
1135
1136   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1137   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1138     try:
1139       hyper.RebootInstance(instance)
1140     except errors.HypervisorError, err:
1141       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1142   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1143     try:
1144       InstanceShutdown(instance, shutdown_timeout)
1145       return StartInstance(instance)
1146     except errors.HypervisorError, err:
1147       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1148   else:
1149     _Fail("Invalid reboot_type received: %s", reboot_type)
1150
1151
1152 def MigrationInfo(instance):
1153   """Gather information about an instance to be migrated.
1154
1155   @type instance: L{objects.Instance}
1156   @param instance: the instance definition
1157
1158   """
1159   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1160   try:
1161     info = hyper.MigrationInfo(instance)
1162   except errors.HypervisorError, err:
1163     _Fail("Failed to fetch migration information: %s", err, exc=True)
1164   return info
1165
1166
1167 def AcceptInstance(instance, info, target):
1168   """Prepare the node to accept an instance.
1169
1170   @type instance: L{objects.Instance}
1171   @param instance: the instance definition
1172   @type info: string/data (opaque)
1173   @param info: migration information, from the source node
1174   @type target: string
1175   @param target: target host (usually ip), on this node
1176
1177   """
1178   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1179   try:
1180     hyper.AcceptInstance(instance, info, target)
1181   except errors.HypervisorError, err:
1182     _Fail("Failed to accept instance: %s", err, exc=True)
1183
1184
1185 def FinalizeMigration(instance, info, success):
1186   """Finalize any preparation to accept an instance.
1187
1188   @type instance: L{objects.Instance}
1189   @param instance: the instance definition
1190   @type info: string/data (opaque)
1191   @param info: migration information, from the source node
1192   @type success: boolean
1193   @param success: whether the migration was a success or a failure
1194
1195   """
1196   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1197   try:
1198     hyper.FinalizeMigration(instance, info, success)
1199   except errors.HypervisorError, err:
1200     _Fail("Failed to finalize migration: %s", err, exc=True)
1201
1202
1203 def MigrateInstance(instance, target, live):
1204   """Migrates an instance to another node.
1205
1206   @type instance: L{objects.Instance}
1207   @param instance: the instance definition
1208   @type target: string
1209   @param target: the target node name
1210   @type live: boolean
1211   @param live: whether the migration should be done live or not (the
1212       interpretation of this parameter is left to the hypervisor)
1213   @rtype: tuple
1214   @return: a tuple of (success, msg) where:
1215       - succes is a boolean denoting the success/failure of the operation
1216       - msg is a string with details in case of failure
1217
1218   """
1219   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1220
1221   try:
1222     hyper.MigrateInstance(instance, target, live)
1223   except errors.HypervisorError, err:
1224     _Fail("Failed to migrate instance: %s", err, exc=True)
1225
1226
1227 def BlockdevCreate(disk, size, owner, on_primary, info):
1228   """Creates a block device for an instance.
1229
1230   @type disk: L{objects.Disk}
1231   @param disk: the object describing the disk we should create
1232   @type size: int
1233   @param size: the size of the physical underlying device, in MiB
1234   @type owner: str
1235   @param owner: the name of the instance for which disk is created,
1236       used for device cache data
1237   @type on_primary: boolean
1238   @param on_primary:  indicates if it is the primary node or not
1239   @type info: string
1240   @param info: string that will be sent to the physical device
1241       creation, used for example to set (LVM) tags on LVs
1242
1243   @return: the new unique_id of the device (this can sometime be
1244       computed only after creation), or None. On secondary nodes,
1245       it's not required to return anything.
1246
1247   """
1248   # TODO: remove the obsolete 'size' argument
1249   # pylint: disable-msg=W0613
1250   clist = []
1251   if disk.children:
1252     for child in disk.children:
1253       try:
1254         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1255       except errors.BlockDeviceError, err:
1256         _Fail("Can't assemble device %s: %s", child, err)
1257       if on_primary or disk.AssembleOnSecondary():
1258         # we need the children open in case the device itself has to
1259         # be assembled
1260         try:
1261           # pylint: disable-msg=E1103
1262           crdev.Open()
1263         except errors.BlockDeviceError, err:
1264           _Fail("Can't make child '%s' read-write: %s", child, err)
1265       clist.append(crdev)
1266
1267   try:
1268     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1269   except errors.BlockDeviceError, err:
1270     _Fail("Can't create block device: %s", err)
1271
1272   if on_primary or disk.AssembleOnSecondary():
1273     try:
1274       device.Assemble()
1275     except errors.BlockDeviceError, err:
1276       _Fail("Can't assemble device after creation, unusual event: %s", err)
1277     device.SetSyncSpeed(constants.SYNC_SPEED)
1278     if on_primary or disk.OpenOnSecondary():
1279       try:
1280         device.Open(force=True)
1281       except errors.BlockDeviceError, err:
1282         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1283     DevCacheManager.UpdateCache(device.dev_path, owner,
1284                                 on_primary, disk.iv_name)
1285
1286   device.SetInfo(info)
1287
1288   return device.unique_id
1289
1290
1291 def BlockdevRemove(disk):
1292   """Remove a block device.
1293
1294   @note: This is intended to be called recursively.
1295
1296   @type disk: L{objects.Disk}
1297   @param disk: the disk object we should remove
1298   @rtype: boolean
1299   @return: the success of the operation
1300
1301   """
1302   msgs = []
1303   try:
1304     rdev = _RecursiveFindBD(disk)
1305   except errors.BlockDeviceError, err:
1306     # probably can't attach
1307     logging.info("Can't attach to device %s in remove", disk)
1308     rdev = None
1309   if rdev is not None:
1310     r_path = rdev.dev_path
1311     try:
1312       rdev.Remove()
1313     except errors.BlockDeviceError, err:
1314       msgs.append(str(err))
1315     if not msgs:
1316       DevCacheManager.RemoveCache(r_path)
1317
1318   if disk.children:
1319     for child in disk.children:
1320       try:
1321         BlockdevRemove(child)
1322       except RPCFail, err:
1323         msgs.append(str(err))
1324
1325   if msgs:
1326     _Fail("; ".join(msgs))
1327
1328
1329 def _RecursiveAssembleBD(disk, owner, as_primary):
1330   """Activate a block device for an instance.
1331
1332   This is run on the primary and secondary nodes for an instance.
1333
1334   @note: this function is called recursively.
1335
1336   @type disk: L{objects.Disk}
1337   @param disk: the disk we try to assemble
1338   @type owner: str
1339   @param owner: the name of the instance which owns the disk
1340   @type as_primary: boolean
1341   @param as_primary: if we should make the block device
1342       read/write
1343
1344   @return: the assembled device or None (in case no device
1345       was assembled)
1346   @raise errors.BlockDeviceError: in case there is an error
1347       during the activation of the children or the device
1348       itself
1349
1350   """
1351   children = []
1352   if disk.children:
1353     mcn = disk.ChildrenNeeded()
1354     if mcn == -1:
1355       mcn = 0 # max number of Nones allowed
1356     else:
1357       mcn = len(disk.children) - mcn # max number of Nones
1358     for chld_disk in disk.children:
1359       try:
1360         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1361       except errors.BlockDeviceError, err:
1362         if children.count(None) >= mcn:
1363           raise
1364         cdev = None
1365         logging.error("Error in child activation (but continuing): %s",
1366                       str(err))
1367       children.append(cdev)
1368
1369   if as_primary or disk.AssembleOnSecondary():
1370     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1371     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1372     result = r_dev
1373     if as_primary or disk.OpenOnSecondary():
1374       r_dev.Open()
1375     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1376                                 as_primary, disk.iv_name)
1377
1378   else:
1379     result = True
1380   return result
1381
1382
1383 def BlockdevAssemble(disk, owner, as_primary):
1384   """Activate a block device for an instance.
1385
1386   This is a wrapper over _RecursiveAssembleBD.
1387
1388   @rtype: str or boolean
1389   @return: a C{/dev/...} path for primary nodes, and
1390       C{True} for secondary nodes
1391
1392   """
1393   try:
1394     result = _RecursiveAssembleBD(disk, owner, as_primary)
1395     if isinstance(result, bdev.BlockDev):
1396       # pylint: disable-msg=E1103
1397       result = result.dev_path
1398   except errors.BlockDeviceError, err:
1399     _Fail("Error while assembling disk: %s", err, exc=True)
1400
1401   return result
1402
1403
1404 def BlockdevShutdown(disk):
1405   """Shut down a block device.
1406
1407   First, if the device is assembled (Attach() is successful), then
1408   the device is shutdown. Then the children of the device are
1409   shutdown.
1410
1411   This function is called recursively. Note that we don't cache the
1412   children or such, as oppossed to assemble, shutdown of different
1413   devices doesn't require that the upper device was active.
1414
1415   @type disk: L{objects.Disk}
1416   @param disk: the description of the disk we should
1417       shutdown
1418   @rtype: None
1419
1420   """
1421   msgs = []
1422   r_dev = _RecursiveFindBD(disk)
1423   if r_dev is not None:
1424     r_path = r_dev.dev_path
1425     try:
1426       r_dev.Shutdown()
1427       DevCacheManager.RemoveCache(r_path)
1428     except errors.BlockDeviceError, err:
1429       msgs.append(str(err))
1430
1431   if disk.children:
1432     for child in disk.children:
1433       try:
1434         BlockdevShutdown(child)
1435       except RPCFail, err:
1436         msgs.append(str(err))
1437
1438   if msgs:
1439     _Fail("; ".join(msgs))
1440
1441
1442 def BlockdevAddchildren(parent_cdev, new_cdevs):
1443   """Extend a mirrored block device.
1444
1445   @type parent_cdev: L{objects.Disk}
1446   @param parent_cdev: the disk to which we should add children
1447   @type new_cdevs: list of L{objects.Disk}
1448   @param new_cdevs: the list of children which we should add
1449   @rtype: None
1450
1451   """
1452   parent_bdev = _RecursiveFindBD(parent_cdev)
1453   if parent_bdev is None:
1454     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1455   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1456   if new_bdevs.count(None) > 0:
1457     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1458   parent_bdev.AddChildren(new_bdevs)
1459
1460
1461 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1462   """Shrink a mirrored block device.
1463
1464   @type parent_cdev: L{objects.Disk}
1465   @param parent_cdev: the disk from which we should remove children
1466   @type new_cdevs: list of L{objects.Disk}
1467   @param new_cdevs: the list of children which we should remove
1468   @rtype: None
1469
1470   """
1471   parent_bdev = _RecursiveFindBD(parent_cdev)
1472   if parent_bdev is None:
1473     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1474   devs = []
1475   for disk in new_cdevs:
1476     rpath = disk.StaticDevPath()
1477     if rpath is None:
1478       bd = _RecursiveFindBD(disk)
1479       if bd is None:
1480         _Fail("Can't find device %s while removing children", disk)
1481       else:
1482         devs.append(bd.dev_path)
1483     else:
1484       if not utils.IsNormAbsPath(rpath):
1485         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1486       devs.append(rpath)
1487   parent_bdev.RemoveChildren(devs)
1488
1489
1490 def BlockdevGetmirrorstatus(disks):
1491   """Get the mirroring status of a list of devices.
1492
1493   @type disks: list of L{objects.Disk}
1494   @param disks: the list of disks which we should query
1495   @rtype: disk
1496   @return:
1497       a list of (mirror_done, estimated_time) tuples, which
1498       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1499   @raise errors.BlockDeviceError: if any of the disks cannot be
1500       found
1501
1502   """
1503   stats = []
1504   for dsk in disks:
1505     rbd = _RecursiveFindBD(dsk)
1506     if rbd is None:
1507       _Fail("Can't find device %s", dsk)
1508
1509     stats.append(rbd.CombinedSyncStatus())
1510
1511   return stats
1512
1513
1514 def _RecursiveFindBD(disk):
1515   """Check if a device is activated.
1516
1517   If so, return information about the real device.
1518
1519   @type disk: L{objects.Disk}
1520   @param disk: the disk object we need to find
1521
1522   @return: None if the device can't be found,
1523       otherwise the device instance
1524
1525   """
1526   children = []
1527   if disk.children:
1528     for chdisk in disk.children:
1529       children.append(_RecursiveFindBD(chdisk))
1530
1531   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1532
1533
1534 def _OpenRealBD(disk):
1535   """Opens the underlying block device of a disk.
1536
1537   @type disk: L{objects.Disk}
1538   @param disk: the disk object we want to open
1539
1540   """
1541   real_disk = _RecursiveFindBD(disk)
1542   if real_disk is None:
1543     _Fail("Block device '%s' is not set up", disk)
1544
1545   real_disk.Open()
1546
1547   return real_disk
1548
1549
1550 def BlockdevFind(disk):
1551   """Check if a device is activated.
1552
1553   If it is, return information about the real device.
1554
1555   @type disk: L{objects.Disk}
1556   @param disk: the disk to find
1557   @rtype: None or objects.BlockDevStatus
1558   @return: None if the disk cannot be found, otherwise a the current
1559            information
1560
1561   """
1562   try:
1563     rbd = _RecursiveFindBD(disk)
1564   except errors.BlockDeviceError, err:
1565     _Fail("Failed to find device: %s", err, exc=True)
1566
1567   if rbd is None:
1568     return None
1569
1570   return rbd.GetSyncStatus()
1571
1572
1573 def BlockdevGetsize(disks):
1574   """Computes the size of the given disks.
1575
1576   If a disk is not found, returns None instead.
1577
1578   @type disks: list of L{objects.Disk}
1579   @param disks: the list of disk to compute the size for
1580   @rtype: list
1581   @return: list with elements None if the disk cannot be found,
1582       otherwise the size
1583
1584   """
1585   result = []
1586   for cf in disks:
1587     try:
1588       rbd = _RecursiveFindBD(cf)
1589     except errors.BlockDeviceError:
1590       result.append(None)
1591       continue
1592     if rbd is None:
1593       result.append(None)
1594     else:
1595       result.append(rbd.GetActualSize())
1596   return result
1597
1598
1599 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1600   """Export a block device to a remote node.
1601
1602   @type disk: L{objects.Disk}
1603   @param disk: the description of the disk to export
1604   @type dest_node: str
1605   @param dest_node: the destination node to export to
1606   @type dest_path: str
1607   @param dest_path: the destination path on the target node
1608   @type cluster_name: str
1609   @param cluster_name: the cluster name, needed for SSH hostalias
1610   @rtype: None
1611
1612   """
1613   real_disk = _OpenRealBD(disk)
1614
1615   # the block size on the read dd is 1MiB to match our units
1616   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1617                                "dd if=%s bs=1048576 count=%s",
1618                                real_disk.dev_path, str(disk.size))
1619
1620   # we set here a smaller block size as, due to ssh buffering, more
1621   # than 64-128k will mostly ignored; we use nocreat to fail if the
1622   # device is not already there or we pass a wrong path; we use
1623   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1624   # to not buffer too much memory; this means that at best, we flush
1625   # every 64k, which will not be very fast
1626   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1627                                 " oflag=dsync", dest_path)
1628
1629   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1630                                                    constants.GANETI_RUNAS,
1631                                                    destcmd)
1632
1633   # all commands have been checked, so we're safe to combine them
1634   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1635
1636   result = utils.RunCmd(["bash", "-c", command])
1637
1638   if result.failed:
1639     _Fail("Disk copy command '%s' returned error: %s"
1640           " output: %s", command, result.fail_reason, result.output)
1641
1642
1643 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1644   """Write a file to the filesystem.
1645
1646   This allows the master to overwrite(!) a file. It will only perform
1647   the operation if the file belongs to a list of configuration files.
1648
1649   @type file_name: str
1650   @param file_name: the target file name
1651   @type data: str
1652   @param data: the new contents of the file
1653   @type mode: int
1654   @param mode: the mode to give the file (can be None)
1655   @type uid: int
1656   @param uid: the owner of the file (can be -1 for default)
1657   @type gid: int
1658   @param gid: the group of the file (can be -1 for default)
1659   @type atime: float
1660   @param atime: the atime to set on the file (can be None)
1661   @type mtime: float
1662   @param mtime: the mtime to set on the file (can be None)
1663   @rtype: None
1664
1665   """
1666   if not os.path.isabs(file_name):
1667     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1668
1669   if file_name not in _ALLOWED_UPLOAD_FILES:
1670     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1671           file_name)
1672
1673   raw_data = _Decompress(data)
1674
1675   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1676                   atime=atime, mtime=mtime)
1677
1678
1679 def WriteSsconfFiles(values):
1680   """Update all ssconf files.
1681
1682   Wrapper around the SimpleStore.WriteFiles.
1683
1684   """
1685   ssconf.SimpleStore().WriteFiles(values)
1686
1687
1688 def _ErrnoOrStr(err):
1689   """Format an EnvironmentError exception.
1690
1691   If the L{err} argument has an errno attribute, it will be looked up
1692   and converted into a textual C{E...} description. Otherwise the
1693   string representation of the error will be returned.
1694
1695   @type err: L{EnvironmentError}
1696   @param err: the exception to format
1697
1698   """
1699   if hasattr(err, 'errno'):
1700     detail = errno.errorcode[err.errno]
1701   else:
1702     detail = str(err)
1703   return detail
1704
1705
1706 def _OSOndiskAPIVersion(os_dir):
1707   """Compute and return the API version of a given OS.
1708
1709   This function will try to read the API version of the OS residing in
1710   the 'os_dir' directory.
1711
1712   @type os_dir: str
1713   @param os_dir: the directory in which we should look for the OS
1714   @rtype: tuple
1715   @return: tuple (status, data) with status denoting the validity and
1716       data holding either the vaid versions or an error message
1717
1718   """
1719   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1720
1721   try:
1722     st = os.stat(api_file)
1723   except EnvironmentError, err:
1724     return False, ("Required file '%s' not found under path %s: %s" %
1725                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1726
1727   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1728     return False, ("File '%s' in %s is not a regular file" %
1729                    (constants.OS_API_FILE, os_dir))
1730
1731   try:
1732     api_versions = utils.ReadFile(api_file).splitlines()
1733   except EnvironmentError, err:
1734     return False, ("Error while reading the API version file at %s: %s" %
1735                    (api_file, _ErrnoOrStr(err)))
1736
1737   try:
1738     api_versions = [int(version.strip()) for version in api_versions]
1739   except (TypeError, ValueError), err:
1740     return False, ("API version(s) can't be converted to integer: %s" %
1741                    str(err))
1742
1743   return True, api_versions
1744
1745
1746 def DiagnoseOS(top_dirs=None):
1747   """Compute the validity for all OSes.
1748
1749   @type top_dirs: list
1750   @param top_dirs: the list of directories in which to
1751       search (if not given defaults to
1752       L{constants.OS_SEARCH_PATH})
1753   @rtype: list of L{objects.OS}
1754   @return: a list of tuples (name, path, status, diagnose, variants)
1755       for all (potential) OSes under all search paths, where:
1756           - name is the (potential) OS name
1757           - path is the full path to the OS
1758           - status True/False is the validity of the OS
1759           - diagnose is the error message for an invalid OS, otherwise empty
1760           - variants is a list of supported OS variants, if any
1761
1762   """
1763   if top_dirs is None:
1764     top_dirs = constants.OS_SEARCH_PATH
1765
1766   result = []
1767   for dir_name in top_dirs:
1768     if os.path.isdir(dir_name):
1769       try:
1770         f_names = utils.ListVisibleFiles(dir_name)
1771       except EnvironmentError, err:
1772         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1773         break
1774       for name in f_names:
1775         os_path = utils.PathJoin(dir_name, name)
1776         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1777         if status:
1778           diagnose = ""
1779           variants = os_inst.supported_variants
1780         else:
1781           diagnose = os_inst
1782           variants = []
1783         result.append((name, os_path, status, diagnose, variants))
1784
1785   return result
1786
1787
1788 def _TryOSFromDisk(name, base_dir=None):
1789   """Create an OS instance from disk.
1790
1791   This function will return an OS instance if the given name is a
1792   valid OS name.
1793
1794   @type base_dir: string
1795   @keyword base_dir: Base directory containing OS installations.
1796                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1797   @rtype: tuple
1798   @return: success and either the OS instance if we find a valid one,
1799       or error message
1800
1801   """
1802   if base_dir is None:
1803     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1804   else:
1805     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1806
1807   if os_dir is None:
1808     return False, "Directory for OS %s not found in search path" % name
1809
1810   status, api_versions = _OSOndiskAPIVersion(os_dir)
1811   if not status:
1812     # push the error up
1813     return status, api_versions
1814
1815   if not constants.OS_API_VERSIONS.intersection(api_versions):
1816     return False, ("API version mismatch for path '%s': found %s, want %s." %
1817                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1818
1819   # OS Files dictionary, we will populate it with the absolute path names
1820   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1821
1822   if max(api_versions) >= constants.OS_API_V15:
1823     os_files[constants.OS_VARIANTS_FILE] = ''
1824
1825   for filename in os_files:
1826     os_files[filename] = utils.PathJoin(os_dir, filename)
1827
1828     try:
1829       st = os.stat(os_files[filename])
1830     except EnvironmentError, err:
1831       return False, ("File '%s' under path '%s' is missing (%s)" %
1832                      (filename, os_dir, _ErrnoOrStr(err)))
1833
1834     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1835       return False, ("File '%s' under path '%s' is not a regular file" %
1836                      (filename, os_dir))
1837
1838     if filename in constants.OS_SCRIPTS:
1839       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1840         return False, ("File '%s' under path '%s' is not executable" %
1841                        (filename, os_dir))
1842
1843   variants = None
1844   if constants.OS_VARIANTS_FILE in os_files:
1845     variants_file = os_files[constants.OS_VARIANTS_FILE]
1846     try:
1847       variants = utils.ReadFile(variants_file).splitlines()
1848     except EnvironmentError, err:
1849       return False, ("Error while reading the OS variants file at %s: %s" %
1850                      (variants_file, _ErrnoOrStr(err)))
1851     if not variants:
1852       return False, ("No supported os variant found")
1853
1854   os_obj = objects.OS(name=name, path=os_dir,
1855                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1856                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1857                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1858                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1859                       supported_variants=variants,
1860                       api_versions=api_versions)
1861   return True, os_obj
1862
1863
1864 def OSFromDisk(name, base_dir=None):
1865   """Create an OS instance from disk.
1866
1867   This function will return an OS instance if the given name is a
1868   valid OS name. Otherwise, it will raise an appropriate
1869   L{RPCFail} exception, detailing why this is not a valid OS.
1870
1871   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1872   an exception but returns true/false status data.
1873
1874   @type base_dir: string
1875   @keyword base_dir: Base directory containing OS installations.
1876                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1877   @rtype: L{objects.OS}
1878   @return: the OS instance if we find a valid one
1879   @raise RPCFail: if we don't find a valid OS
1880
1881   """
1882   name_only = name.split("+", 1)[0]
1883   status, payload = _TryOSFromDisk(name_only, base_dir)
1884
1885   if not status:
1886     _Fail(payload)
1887
1888   return payload
1889
1890
1891 def OSEnvironment(instance, inst_os, debug=0):
1892   """Calculate the environment for an os script.
1893
1894   @type instance: L{objects.Instance}
1895   @param instance: target instance for the os script run
1896   @type inst_os: L{objects.OS}
1897   @param inst_os: operating system for which the environment is being built
1898   @type debug: integer
1899   @param debug: debug level (0 or 1, for OS Api 10)
1900   @rtype: dict
1901   @return: dict of environment variables
1902   @raise errors.BlockDeviceError: if the block device
1903       cannot be found
1904
1905   """
1906   result = {}
1907   api_version = \
1908     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1909   result['OS_API_VERSION'] = '%d' % api_version
1910   result['INSTANCE_NAME'] = instance.name
1911   result['INSTANCE_OS'] = instance.os
1912   result['HYPERVISOR'] = instance.hypervisor
1913   result['DISK_COUNT'] = '%d' % len(instance.disks)
1914   result['NIC_COUNT'] = '%d' % len(instance.nics)
1915   result['DEBUG_LEVEL'] = '%d' % debug
1916   if api_version >= constants.OS_API_V15:
1917     try:
1918       variant = instance.os.split('+', 1)[1]
1919     except IndexError:
1920       variant = inst_os.supported_variants[0]
1921     result['OS_VARIANT'] = variant
1922   for idx, disk in enumerate(instance.disks):
1923     real_disk = _OpenRealBD(disk)
1924     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1925     result['DISK_%d_ACCESS' % idx] = disk.mode
1926     if constants.HV_DISK_TYPE in instance.hvparams:
1927       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1928         instance.hvparams[constants.HV_DISK_TYPE]
1929     if disk.dev_type in constants.LDS_BLOCK:
1930       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1931     elif disk.dev_type == constants.LD_FILE:
1932       result['DISK_%d_BACKEND_TYPE' % idx] = \
1933         'file:%s' % disk.physical_id[0]
1934   for idx, nic in enumerate(instance.nics):
1935     result['NIC_%d_MAC' % idx] = nic.mac
1936     if nic.ip:
1937       result['NIC_%d_IP' % idx] = nic.ip
1938     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1939     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1940       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1941     if nic.nicparams[constants.NIC_LINK]:
1942       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1943     if constants.HV_NIC_TYPE in instance.hvparams:
1944       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1945         instance.hvparams[constants.HV_NIC_TYPE]
1946
1947   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1948     for key, value in source.items():
1949       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1950
1951   return result
1952
1953
1954 def BlockdevGrow(disk, amount):
1955   """Grow a stack of block devices.
1956
1957   This function is called recursively, with the childrens being the
1958   first ones to resize.
1959
1960   @type disk: L{objects.Disk}
1961   @param disk: the disk to be grown
1962   @rtype: (status, result)
1963   @return: a tuple with the status of the operation
1964       (True/False), and the errors message if status
1965       is False
1966
1967   """
1968   r_dev = _RecursiveFindBD(disk)
1969   if r_dev is None:
1970     _Fail("Cannot find block device %s", disk)
1971
1972   try:
1973     r_dev.Grow(amount)
1974   except errors.BlockDeviceError, err:
1975     _Fail("Failed to grow block device: %s", err, exc=True)
1976
1977
1978 def BlockdevSnapshot(disk):
1979   """Create a snapshot copy of a block device.
1980
1981   This function is called recursively, and the snapshot is actually created
1982   just for the leaf lvm backend device.
1983
1984   @type disk: L{objects.Disk}
1985   @param disk: the disk to be snapshotted
1986   @rtype: string
1987   @return: snapshot disk path
1988
1989   """
1990   if disk.dev_type == constants.LD_DRBD8:
1991     if not disk.children:
1992       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
1993             disk.unique_id)
1994     return BlockdevSnapshot(disk.children[0])
1995   elif disk.dev_type == constants.LD_LV:
1996     r_dev = _RecursiveFindBD(disk)
1997     if r_dev is not None:
1998       # FIXME: choose a saner value for the snapshot size
1999       # let's stay on the safe side and ask for the full size, for now
2000       return r_dev.Snapshot(disk.size)
2001     else:
2002       _Fail("Cannot find block device %s", disk)
2003   else:
2004     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2005           disk.unique_id, disk.dev_type)
2006
2007
2008 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
2009   """Export a block device snapshot to a remote node.
2010
2011   @type disk: L{objects.Disk}
2012   @param disk: the description of the disk to export
2013   @type dest_node: str
2014   @param dest_node: the destination node to export to
2015   @type instance: L{objects.Instance}
2016   @param instance: the instance object to whom the disk belongs
2017   @type cluster_name: str
2018   @param cluster_name: the cluster name, needed for SSH hostalias
2019   @type idx: int
2020   @param idx: the index of the disk in the instance's disk list,
2021       used to export to the OS scripts environment
2022   @type debug: integer
2023   @param debug: debug level, passed to the OS scripts
2024   @rtype: None
2025
2026   """
2027   inst_os = OSFromDisk(instance.os)
2028   export_env = OSEnvironment(instance, inst_os, debug)
2029
2030   export_script = inst_os.export_script
2031
2032   logfile = _InstanceLogName("export", inst_os.name, instance.name)
2033   if not os.path.exists(constants.LOG_OS_DIR):
2034     os.mkdir(constants.LOG_OS_DIR, 0750)
2035
2036   real_disk = _OpenRealBD(disk)
2037
2038   export_env['EXPORT_DEVICE'] = real_disk.dev_path
2039   export_env['EXPORT_INDEX'] = str(idx)
2040
2041   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2042   destfile = disk.physical_id[1]
2043
2044   # the target command is built out of three individual commands,
2045   # which are joined by pipes; we check each individual command for
2046   # valid parameters
2047   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
2048                                inst_os.path, export_script, logfile)
2049
2050   comprcmd = "gzip"
2051
2052   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
2053                                 destdir, utils.PathJoin(destdir, destfile))
2054   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2055                                                    constants.GANETI_RUNAS,
2056                                                    destcmd)
2057
2058   # all commands have been checked, so we're safe to combine them
2059   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
2060
2061   result = utils.RunCmd(["bash", "-c", command], env=export_env)
2062
2063   if result.failed:
2064     _Fail("OS snapshot export command '%s' returned error: %s"
2065           " output: %s", command, result.fail_reason, result.output)
2066
2067
2068 def FinalizeExport(instance, snap_disks):
2069   """Write out the export configuration information.
2070
2071   @type instance: L{objects.Instance}
2072   @param instance: the instance which we export, used for
2073       saving configuration
2074   @type snap_disks: list of L{objects.Disk}
2075   @param snap_disks: list of snapshot block devices, which
2076       will be used to get the actual name of the dump file
2077
2078   @rtype: None
2079
2080   """
2081   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2082   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2083
2084   config = objects.SerializableConfigParser()
2085
2086   config.add_section(constants.INISECT_EXP)
2087   config.set(constants.INISECT_EXP, 'version', '0')
2088   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2089   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2090   config.set(constants.INISECT_EXP, 'os', instance.os)
2091   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2092
2093   config.add_section(constants.INISECT_INS)
2094   config.set(constants.INISECT_INS, 'name', instance.name)
2095   config.set(constants.INISECT_INS, 'memory', '%d' %
2096              instance.beparams[constants.BE_MEMORY])
2097   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2098              instance.beparams[constants.BE_VCPUS])
2099   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2100   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2101
2102   nic_total = 0
2103   for nic_count, nic in enumerate(instance.nics):
2104     nic_total += 1
2105     config.set(constants.INISECT_INS, 'nic%d_mac' %
2106                nic_count, '%s' % nic.mac)
2107     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2108     for param in constants.NICS_PARAMETER_TYPES:
2109       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2110                  '%s' % nic.nicparams.get(param, None))
2111   # TODO: redundant: on load can read nics until it doesn't exist
2112   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2113
2114   disk_total = 0
2115   for disk_count, disk in enumerate(snap_disks):
2116     if disk:
2117       disk_total += 1
2118       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2119                  ('%s' % disk.iv_name))
2120       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2121                  ('%s' % disk.physical_id[1]))
2122       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2123                  ('%d' % disk.size))
2124
2125   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2126
2127   # New-style hypervisor/backend parameters
2128
2129   config.add_section(constants.INISECT_HYP)
2130   for name, value in instance.hvparams.items():
2131     if name not in constants.HVC_GLOBALS:
2132       config.set(constants.INISECT_HYP, name, str(value))
2133
2134   config.add_section(constants.INISECT_BEP)
2135   for name, value in instance.beparams.items():
2136     config.set(constants.INISECT_BEP, name, str(value))
2137
2138   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2139                   data=config.Dumps())
2140   shutil.rmtree(finaldestdir, ignore_errors=True)
2141   shutil.move(destdir, finaldestdir)
2142
2143
2144 def ExportInfo(dest):
2145   """Get export configuration information.
2146
2147   @type dest: str
2148   @param dest: directory containing the export
2149
2150   @rtype: L{objects.SerializableConfigParser}
2151   @return: a serializable config file containing the
2152       export info
2153
2154   """
2155   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2156
2157   config = objects.SerializableConfigParser()
2158   config.read(cff)
2159
2160   if (not config.has_section(constants.INISECT_EXP) or
2161       not config.has_section(constants.INISECT_INS)):
2162     _Fail("Export info file doesn't have the required fields")
2163
2164   return config.Dumps()
2165
2166
2167 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
2168   """Import an os image into an instance.
2169
2170   @type instance: L{objects.Instance}
2171   @param instance: instance to import the disks into
2172   @type src_node: string
2173   @param src_node: source node for the disk images
2174   @type src_images: list of string
2175   @param src_images: absolute paths of the disk images
2176   @type debug: integer
2177   @param debug: debug level, passed to the OS scripts
2178   @rtype: list of boolean
2179   @return: each boolean represent the success of importing the n-th disk
2180
2181   """
2182   inst_os = OSFromDisk(instance.os)
2183   import_env = OSEnvironment(instance, inst_os, debug)
2184   import_script = inst_os.import_script
2185
2186   logfile = _InstanceLogName("import", instance.os, instance.name)
2187   if not os.path.exists(constants.LOG_OS_DIR):
2188     os.mkdir(constants.LOG_OS_DIR, 0750)
2189
2190   comprcmd = "gunzip"
2191   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2192                                import_script, logfile)
2193
2194   final_result = []
2195   for idx, image in enumerate(src_images):
2196     if image:
2197       destcmd = utils.BuildShellCmd('cat %s', image)
2198       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2199                                                        constants.GANETI_RUNAS,
2200                                                        destcmd)
2201       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2202       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2203       import_env['IMPORT_INDEX'] = str(idx)
2204       result = utils.RunCmd(command, env=import_env)
2205       if result.failed:
2206         logging.error("Disk import command '%s' returned error: %s"
2207                       " output: %s", command, result.fail_reason,
2208                       result.output)
2209         final_result.append("error importing disk %d: %s, %s" %
2210                             (idx, result.fail_reason, result.output[-100]))
2211
2212   if final_result:
2213     _Fail("; ".join(final_result), log=False)
2214
2215
2216 def ListExports():
2217   """Return a list of exports currently available on this machine.
2218
2219   @rtype: list
2220   @return: list of the exports
2221
2222   """
2223   if os.path.isdir(constants.EXPORT_DIR):
2224     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2225   else:
2226     _Fail("No exports directory")
2227
2228
2229 def RemoveExport(export):
2230   """Remove an existing export from the node.
2231
2232   @type export: str
2233   @param export: the name of the export to remove
2234   @rtype: None
2235
2236   """
2237   target = utils.PathJoin(constants.EXPORT_DIR, export)
2238
2239   try:
2240     shutil.rmtree(target)
2241   except EnvironmentError, err:
2242     _Fail("Error while removing the export: %s", err, exc=True)
2243
2244
2245 def BlockdevRename(devlist):
2246   """Rename a list of block devices.
2247
2248   @type devlist: list of tuples
2249   @param devlist: list of tuples of the form  (disk,
2250       new_logical_id, new_physical_id); disk is an
2251       L{objects.Disk} object describing the current disk,
2252       and new logical_id/physical_id is the name we
2253       rename it to
2254   @rtype: boolean
2255   @return: True if all renames succeeded, False otherwise
2256
2257   """
2258   msgs = []
2259   result = True
2260   for disk, unique_id in devlist:
2261     dev = _RecursiveFindBD(disk)
2262     if dev is None:
2263       msgs.append("Can't find device %s in rename" % str(disk))
2264       result = False
2265       continue
2266     try:
2267       old_rpath = dev.dev_path
2268       dev.Rename(unique_id)
2269       new_rpath = dev.dev_path
2270       if old_rpath != new_rpath:
2271         DevCacheManager.RemoveCache(old_rpath)
2272         # FIXME: we should add the new cache information here, like:
2273         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2274         # but we don't have the owner here - maybe parse from existing
2275         # cache? for now, we only lose lvm data when we rename, which
2276         # is less critical than DRBD or MD
2277     except errors.BlockDeviceError, err:
2278       msgs.append("Can't rename device '%s' to '%s': %s" %
2279                   (dev, unique_id, err))
2280       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2281       result = False
2282   if not result:
2283     _Fail("; ".join(msgs))
2284
2285
2286 def _TransformFileStorageDir(file_storage_dir):
2287   """Checks whether given file_storage_dir is valid.
2288
2289   Checks wheter the given file_storage_dir is within the cluster-wide
2290   default file_storage_dir stored in SimpleStore. Only paths under that
2291   directory are allowed.
2292
2293   @type file_storage_dir: str
2294   @param file_storage_dir: the path to check
2295
2296   @return: the normalized path if valid, None otherwise
2297
2298   """
2299   if not constants.ENABLE_FILE_STORAGE:
2300     _Fail("File storage disabled at configure time")
2301   cfg = _GetConfig()
2302   file_storage_dir = os.path.normpath(file_storage_dir)
2303   base_file_storage_dir = cfg.GetFileStorageDir()
2304   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2305       base_file_storage_dir):
2306     _Fail("File storage directory '%s' is not under base file"
2307           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2308   return file_storage_dir
2309
2310
2311 def CreateFileStorageDir(file_storage_dir):
2312   """Create file storage directory.
2313
2314   @type file_storage_dir: str
2315   @param file_storage_dir: directory to create
2316
2317   @rtype: tuple
2318   @return: tuple with first element a boolean indicating wheter dir
2319       creation was successful or not
2320
2321   """
2322   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2323   if os.path.exists(file_storage_dir):
2324     if not os.path.isdir(file_storage_dir):
2325       _Fail("Specified storage dir '%s' is not a directory",
2326             file_storage_dir)
2327   else:
2328     try:
2329       os.makedirs(file_storage_dir, 0750)
2330     except OSError, err:
2331       _Fail("Cannot create file storage directory '%s': %s",
2332             file_storage_dir, err, exc=True)
2333
2334
2335 def RemoveFileStorageDir(file_storage_dir):
2336   """Remove file storage directory.
2337
2338   Remove it only if it's empty. If not log an error and return.
2339
2340   @type file_storage_dir: str
2341   @param file_storage_dir: the directory we should cleanup
2342   @rtype: tuple (success,)
2343   @return: tuple of one element, C{success}, denoting
2344       whether the operation was successful
2345
2346   """
2347   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2348   if os.path.exists(file_storage_dir):
2349     if not os.path.isdir(file_storage_dir):
2350       _Fail("Specified Storage directory '%s' is not a directory",
2351             file_storage_dir)
2352     # deletes dir only if empty, otherwise we want to fail the rpc call
2353     try:
2354       os.rmdir(file_storage_dir)
2355     except OSError, err:
2356       _Fail("Cannot remove file storage directory '%s': %s",
2357             file_storage_dir, err)
2358
2359
2360 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2361   """Rename the file storage directory.
2362
2363   @type old_file_storage_dir: str
2364   @param old_file_storage_dir: the current path
2365   @type new_file_storage_dir: str
2366   @param new_file_storage_dir: the name we should rename to
2367   @rtype: tuple (success,)
2368   @return: tuple of one element, C{success}, denoting
2369       whether the operation was successful
2370
2371   """
2372   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2373   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2374   if not os.path.exists(new_file_storage_dir):
2375     if os.path.isdir(old_file_storage_dir):
2376       try:
2377         os.rename(old_file_storage_dir, new_file_storage_dir)
2378       except OSError, err:
2379         _Fail("Cannot rename '%s' to '%s': %s",
2380               old_file_storage_dir, new_file_storage_dir, err)
2381     else:
2382       _Fail("Specified storage dir '%s' is not a directory",
2383             old_file_storage_dir)
2384   else:
2385     if os.path.exists(old_file_storage_dir):
2386       _Fail("Cannot rename '%s' to '%s': both locations exist",
2387             old_file_storage_dir, new_file_storage_dir)
2388
2389
2390 def _EnsureJobQueueFile(file_name):
2391   """Checks whether the given filename is in the queue directory.
2392
2393   @type file_name: str
2394   @param file_name: the file name we should check
2395   @rtype: None
2396   @raises RPCFail: if the file is not valid
2397
2398   """
2399   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2400   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2401
2402   if not result:
2403     _Fail("Passed job queue file '%s' does not belong to"
2404           " the queue directory '%s'", file_name, queue_dir)
2405
2406
2407 def JobQueueUpdate(file_name, content):
2408   """Updates a file in the queue directory.
2409
2410   This is just a wrapper over L{utils.WriteFile}, with proper
2411   checking.
2412
2413   @type file_name: str
2414   @param file_name: the job file name
2415   @type content: str
2416   @param content: the new job contents
2417   @rtype: boolean
2418   @return: the success of the operation
2419
2420   """
2421   _EnsureJobQueueFile(file_name)
2422
2423   # Write and replace the file atomically
2424   utils.WriteFile(file_name, data=_Decompress(content))
2425
2426
2427 def JobQueueRename(old, new):
2428   """Renames a job queue file.
2429
2430   This is just a wrapper over os.rename with proper checking.
2431
2432   @type old: str
2433   @param old: the old (actual) file name
2434   @type new: str
2435   @param new: the desired file name
2436   @rtype: tuple
2437   @return: the success of the operation and payload
2438
2439   """
2440   _EnsureJobQueueFile(old)
2441   _EnsureJobQueueFile(new)
2442
2443   utils.RenameFile(old, new, mkdir=True)
2444
2445
2446 def JobQueueSetDrainFlag(drain_flag):
2447   """Set the drain flag for the queue.
2448
2449   This will set or unset the queue drain flag.
2450
2451   @type drain_flag: boolean
2452   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2453   @rtype: truple
2454   @return: always True, None
2455   @warning: the function always returns True
2456
2457   """
2458   if drain_flag:
2459     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2460   else:
2461     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2462
2463
2464 def BlockdevClose(instance_name, disks):
2465   """Closes the given block devices.
2466
2467   This means they will be switched to secondary mode (in case of
2468   DRBD).
2469
2470   @param instance_name: if the argument is not empty, the symlinks
2471       of this instance will be removed
2472   @type disks: list of L{objects.Disk}
2473   @param disks: the list of disks to be closed
2474   @rtype: tuple (success, message)
2475   @return: a tuple of success and message, where success
2476       indicates the succes of the operation, and message
2477       which will contain the error details in case we
2478       failed
2479
2480   """
2481   bdevs = []
2482   for cf in disks:
2483     rd = _RecursiveFindBD(cf)
2484     if rd is None:
2485       _Fail("Can't find device %s", cf)
2486     bdevs.append(rd)
2487
2488   msg = []
2489   for rd in bdevs:
2490     try:
2491       rd.Close()
2492     except errors.BlockDeviceError, err:
2493       msg.append(str(err))
2494   if msg:
2495     _Fail("Can't make devices secondary: %s", ",".join(msg))
2496   else:
2497     if instance_name:
2498       _RemoveBlockDevLinks(instance_name, disks)
2499
2500
2501 def ValidateHVParams(hvname, hvparams):
2502   """Validates the given hypervisor parameters.
2503
2504   @type hvname: string
2505   @param hvname: the hypervisor name
2506   @type hvparams: dict
2507   @param hvparams: the hypervisor parameters to be validated
2508   @rtype: None
2509
2510   """
2511   try:
2512     hv_type = hypervisor.GetHypervisor(hvname)
2513     hv_type.ValidateParameters(hvparams)
2514   except errors.HypervisorError, err:
2515     _Fail(str(err), log=False)
2516
2517
2518 def DemoteFromMC():
2519   """Demotes the current node from master candidate role.
2520
2521   """
2522   # try to ensure we're not the master by mistake
2523   master, myself = ssconf.GetMasterAndMyself()
2524   if master == myself:
2525     _Fail("ssconf status shows I'm the master node, will not demote")
2526
2527   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2528   if not result.failed:
2529     _Fail("The master daemon is running, will not demote")
2530
2531   try:
2532     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2533       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2534   except EnvironmentError, err:
2535     if err.errno != errno.ENOENT:
2536       _Fail("Error while backing up cluster file: %s", err, exc=True)
2537
2538   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2539
2540
2541 def _GetX509Filenames(cryptodir, name):
2542   """Returns the full paths for the private key and certificate.
2543
2544   """
2545   return (utils.PathJoin(cryptodir, name),
2546           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2547           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2548
2549
2550 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2551   """Creates a new X509 certificate for SSL/TLS.
2552
2553   @type validity: int
2554   @param validity: Validity in seconds
2555   @rtype: tuple; (string, string)
2556   @return: Certificate name and public part
2557
2558   """
2559   (key_pem, cert_pem) = \
2560     utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2561                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2562
2563   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2564                               prefix="x509-%s-" % utils.TimestampForFilename())
2565   try:
2566     name = os.path.basename(cert_dir)
2567     assert len(name) > 5
2568
2569     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2570
2571     utils.WriteFile(key_file, mode=0400, data=key_pem)
2572     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2573
2574     # Never return private key as it shouldn't leave the node
2575     return (name, cert_pem)
2576   except Exception:
2577     shutil.rmtree(cert_dir, ignore_errors=True)
2578     raise
2579
2580
2581 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2582   """Removes a X509 certificate.
2583
2584   @type name: string
2585   @param name: Certificate name
2586
2587   """
2588   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2589
2590   utils.RemoveFile(key_file)
2591   utils.RemoveFile(cert_file)
2592
2593   try:
2594     os.rmdir(cert_dir)
2595   except EnvironmentError, err:
2596     _Fail("Cannot remove certificate directory '%s': %s",
2597           cert_dir, err)
2598
2599
2600 def _FindDisks(nodes_ip, disks):
2601   """Sets the physical ID on disks and returns the block devices.
2602
2603   """
2604   # set the correct physical ID
2605   my_name = utils.HostInfo().name
2606   for cf in disks:
2607     cf.SetPhysicalID(my_name, nodes_ip)
2608
2609   bdevs = []
2610
2611   for cf in disks:
2612     rd = _RecursiveFindBD(cf)
2613     if rd is None:
2614       _Fail("Can't find device %s", cf)
2615     bdevs.append(rd)
2616   return bdevs
2617
2618
2619 def DrbdDisconnectNet(nodes_ip, disks):
2620   """Disconnects the network on a list of drbd devices.
2621
2622   """
2623   bdevs = _FindDisks(nodes_ip, disks)
2624
2625   # disconnect disks
2626   for rd in bdevs:
2627     try:
2628       rd.DisconnectNet()
2629     except errors.BlockDeviceError, err:
2630       _Fail("Can't change network configuration to standalone mode: %s",
2631             err, exc=True)
2632
2633
2634 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2635   """Attaches the network on a list of drbd devices.
2636
2637   """
2638   bdevs = _FindDisks(nodes_ip, disks)
2639
2640   if multimaster:
2641     for idx, rd in enumerate(bdevs):
2642       try:
2643         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2644       except EnvironmentError, err:
2645         _Fail("Can't create symlink: %s", err)
2646   # reconnect disks, switch to new master configuration and if
2647   # needed primary mode
2648   for rd in bdevs:
2649     try:
2650       rd.AttachNet(multimaster)
2651     except errors.BlockDeviceError, err:
2652       _Fail("Can't change network configuration: %s", err)
2653
2654   # wait until the disks are connected; we need to retry the re-attach
2655   # if the device becomes standalone, as this might happen if the one
2656   # node disconnects and reconnects in a different mode before the
2657   # other node reconnects; in this case, one or both of the nodes will
2658   # decide it has wrong configuration and switch to standalone
2659
2660   def _Attach():
2661     all_connected = True
2662
2663     for rd in bdevs:
2664       stats = rd.GetProcStatus()
2665
2666       all_connected = (all_connected and
2667                        (stats.is_connected or stats.is_in_resync))
2668
2669       if stats.is_standalone:
2670         # peer had different config info and this node became
2671         # standalone, even though this should not happen with the
2672         # new staged way of changing disk configs
2673         try:
2674           rd.AttachNet(multimaster)
2675         except errors.BlockDeviceError, err:
2676           _Fail("Can't change network configuration: %s", err)
2677
2678     if not all_connected:
2679       raise utils.RetryAgain()
2680
2681   try:
2682     # Start with a delay of 100 miliseconds and go up to 5 seconds
2683     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2684   except utils.RetryTimeout:
2685     _Fail("Timeout in disk reconnecting")
2686
2687   if multimaster:
2688     # change to primary mode
2689     for rd in bdevs:
2690       try:
2691         rd.Open()
2692       except errors.BlockDeviceError, err:
2693         _Fail("Can't change to primary mode: %s", err)
2694
2695
2696 def DrbdWaitSync(nodes_ip, disks):
2697   """Wait until DRBDs have synchronized.
2698
2699   """
2700   def _helper(rd):
2701     stats = rd.GetProcStatus()
2702     if not (stats.is_connected or stats.is_in_resync):
2703       raise utils.RetryAgain()
2704     return stats
2705
2706   bdevs = _FindDisks(nodes_ip, disks)
2707
2708   min_resync = 100
2709   alldone = True
2710   for rd in bdevs:
2711     try:
2712       # poll each second for 15 seconds
2713       stats = utils.Retry(_helper, 1, 15, args=[rd])
2714     except utils.RetryTimeout:
2715       stats = rd.GetProcStatus()
2716       # last check
2717       if not (stats.is_connected or stats.is_in_resync):
2718         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2719     alldone = alldone and (not stats.is_in_resync)
2720     if stats.sync_percent is not None:
2721       min_resync = min(min_resync, stats.sync_percent)
2722
2723   return (alldone, min_resync)
2724
2725
2726 def PowercycleNode(hypervisor_type):
2727   """Hard-powercycle the node.
2728
2729   Because we need to return first, and schedule the powercycle in the
2730   background, we won't be able to report failures nicely.
2731
2732   """
2733   hyper = hypervisor.GetHypervisor(hypervisor_type)
2734   try:
2735     pid = os.fork()
2736   except OSError:
2737     # if we can't fork, we'll pretend that we're in the child process
2738     pid = 0
2739   if pid > 0:
2740     return "Reboot scheduled in 5 seconds"
2741   time.sleep(5)
2742   hyper.PowercycleNode()
2743
2744
2745 class HooksRunner(object):
2746   """Hook runner.
2747
2748   This class is instantiated on the node side (ganeti-noded) and not
2749   on the master side.
2750
2751   """
2752   def __init__(self, hooks_base_dir=None):
2753     """Constructor for hooks runner.
2754
2755     @type hooks_base_dir: str or None
2756     @param hooks_base_dir: if not None, this overrides the
2757         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2758
2759     """
2760     if hooks_base_dir is None:
2761       hooks_base_dir = constants.HOOKS_BASE_DIR
2762     # yeah, _BASE_DIR is not valid for attributes, we use it like a
2763     # constant
2764     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
2765
2766   def RunHooks(self, hpath, phase, env):
2767     """Run the scripts in the hooks directory.
2768
2769     @type hpath: str
2770     @param hpath: the path to the hooks directory which
2771         holds the scripts
2772     @type phase: str
2773     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2774         L{constants.HOOKS_PHASE_POST}
2775     @type env: dict
2776     @param env: dictionary with the environment for the hook
2777     @rtype: list
2778     @return: list of 3-element tuples:
2779       - script path
2780       - script result, either L{constants.HKR_SUCCESS} or
2781         L{constants.HKR_FAIL}
2782       - output of the script
2783
2784     @raise errors.ProgrammerError: for invalid input
2785         parameters
2786
2787     """
2788     if phase == constants.HOOKS_PHASE_PRE:
2789       suffix = "pre"
2790     elif phase == constants.HOOKS_PHASE_POST:
2791       suffix = "post"
2792     else:
2793       _Fail("Unknown hooks phase '%s'", phase)
2794
2795
2796     subdir = "%s-%s.d" % (hpath, suffix)
2797     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
2798
2799     results = []
2800
2801     if not os.path.isdir(dir_name):
2802       # for non-existing/non-dirs, we simply exit instead of logging a
2803       # warning at every operation
2804       return results
2805
2806     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
2807
2808     for (relname, relstatus, runresult)  in runparts_results:
2809       if relstatus == constants.RUNPARTS_SKIP:
2810         rrval = constants.HKR_SKIP
2811         output = ""
2812       elif relstatus == constants.RUNPARTS_ERR:
2813         rrval = constants.HKR_FAIL
2814         output = "Hook script execution error: %s" % runresult
2815       elif relstatus == constants.RUNPARTS_RUN:
2816         if runresult.failed:
2817           rrval = constants.HKR_FAIL
2818         else:
2819           rrval = constants.HKR_SUCCESS
2820         output = utils.SafeEncode(runresult.output.strip())
2821       results.append(("%s/%s" % (subdir, relname), rrval, output))
2822
2823     return results
2824
2825
2826 class IAllocatorRunner(object):
2827   """IAllocator runner.
2828
2829   This class is instantiated on the node side (ganeti-noded) and not on
2830   the master side.
2831
2832   """
2833   @staticmethod
2834   def Run(name, idata):
2835     """Run an iallocator script.
2836
2837     @type name: str
2838     @param name: the iallocator script name
2839     @type idata: str
2840     @param idata: the allocator input data
2841
2842     @rtype: tuple
2843     @return: two element tuple of:
2844        - status
2845        - either error message or stdout of allocator (for success)
2846
2847     """
2848     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2849                                   os.path.isfile)
2850     if alloc_script is None:
2851       _Fail("iallocator module '%s' not found in the search path", name)
2852
2853     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2854     try:
2855       os.write(fd, idata)
2856       os.close(fd)
2857       result = utils.RunCmd([alloc_script, fin_name])
2858       if result.failed:
2859         _Fail("iallocator module '%s' failed: %s, output '%s'",
2860               name, result.fail_reason, result.output)
2861     finally:
2862       os.unlink(fin_name)
2863
2864     return result.stdout
2865
2866
2867 class DevCacheManager(object):
2868   """Simple class for managing a cache of block device information.
2869
2870   """
2871   _DEV_PREFIX = "/dev/"
2872   _ROOT_DIR = constants.BDEV_CACHE_DIR
2873
2874   @classmethod
2875   def _ConvertPath(cls, dev_path):
2876     """Converts a /dev/name path to the cache file name.
2877
2878     This replaces slashes with underscores and strips the /dev
2879     prefix. It then returns the full path to the cache file.
2880
2881     @type dev_path: str
2882     @param dev_path: the C{/dev/} path name
2883     @rtype: str
2884     @return: the converted path name
2885
2886     """
2887     if dev_path.startswith(cls._DEV_PREFIX):
2888       dev_path = dev_path[len(cls._DEV_PREFIX):]
2889     dev_path = dev_path.replace("/", "_")
2890     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
2891     return fpath
2892
2893   @classmethod
2894   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2895     """Updates the cache information for a given device.
2896
2897     @type dev_path: str
2898     @param dev_path: the pathname of the device
2899     @type owner: str
2900     @param owner: the owner (instance name) of the device
2901     @type on_primary: bool
2902     @param on_primary: whether this is the primary
2903         node nor not
2904     @type iv_name: str
2905     @param iv_name: the instance-visible name of the
2906         device, as in objects.Disk.iv_name
2907
2908     @rtype: None
2909
2910     """
2911     if dev_path is None:
2912       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2913       return
2914     fpath = cls._ConvertPath(dev_path)
2915     if on_primary:
2916       state = "primary"
2917     else:
2918       state = "secondary"
2919     if iv_name is None:
2920       iv_name = "not_visible"
2921     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2922     try:
2923       utils.WriteFile(fpath, data=fdata)
2924     except EnvironmentError, err:
2925       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2926
2927   @classmethod
2928   def RemoveCache(cls, dev_path):
2929     """Remove data for a dev_path.
2930
2931     This is just a wrapper over L{utils.RemoveFile} with a converted
2932     path name and logging.
2933
2934     @type dev_path: str
2935     @param dev_path: the pathname of the device
2936
2937     @rtype: None
2938
2939     """
2940     if dev_path is None:
2941       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2942       return
2943     fpath = cls._ConvertPath(dev_path)
2944     try:
2945       utils.RemoveFile(fpath)
2946     except EnvironmentError, err:
2947       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)