Rename OpRecreateInstanceDisks and LURecreateInstanceDisks
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.RAPI_USERS_FILE,
200     constants.CONFD_HMAC_KEY,
201     constants.CLUSTER_DOMAIN_SECRET_FILE,
202     ])
203
204   for hv_name in constants.HYPER_TYPES:
205     hv_class = hypervisor.GetHypervisorClass(hv_name)
206     allowed_files.update(hv_class.GetAncillaryFiles())
207
208   return frozenset(allowed_files)
209
210
211 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212
213
214 def JobQueuePurge():
215   """Removes job queue files and archived jobs.
216
217   @rtype: tuple
218   @return: True, None
219
220   """
221   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
222   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223
224
225 def GetMasterInfo():
226   """Returns master information.
227
228   This is an utility function to compute master information, either
229   for consumption here or from the node daemon.
230
231   @rtype: tuple
232   @return: master_netdev, master_ip, master_name, primary_ip_family
233   @raise RPCFail: in case of errors
234
235   """
236   try:
237     cfg = _GetConfig()
238     master_netdev = cfg.GetMasterNetdev()
239     master_ip = cfg.GetMasterIP()
240     master_node = cfg.GetMasterNode()
241     primary_ip_family = cfg.GetPrimaryIPFamily()
242   except errors.ConfigurationError, err:
243     _Fail("Cluster configuration incomplete: %s", err, exc=True)
244   return (master_netdev, master_ip, master_node, primary_ip_family)
245
246
247 def StartMaster(start_daemons, no_voting):
248   """Activate local node as master node.
249
250   The function will either try activate the IP address of the master
251   (unless someone else has it) or also start the master daemons, based
252   on the start_daemons parameter.
253
254   @type start_daemons: boolean
255   @param start_daemons: whether to start the master daemons
256       (ganeti-masterd and ganeti-rapi), or (if false) activate the
257       master ip
258   @type no_voting: boolean
259   @param no_voting: whether to start ganeti-masterd without a node vote
260       (if start_daemons is True), but still non-interactively
261   @rtype: None
262
263   """
264   # GetMasterInfo will raise an exception if not able to return data
265   master_netdev, master_ip, _, family = GetMasterInfo()
266
267   err_msgs = []
268   # either start the master and rapi daemons
269   if start_daemons:
270     if no_voting:
271       masterd_args = "--no-voting --yes-do-it"
272     else:
273       masterd_args = ""
274
275     env = {
276       "EXTRA_MASTERD_ARGS": masterd_args,
277       }
278
279     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
280     if result.failed:
281       msg = "Can't start Ganeti master: %s" % result.output
282       logging.error(msg)
283       err_msgs.append(msg)
284   # or activate the IP
285   else:
286     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
287       if netutils.IPAddress.Own(master_ip):
288         # we already have the ip:
289         logging.debug("Master IP already configured, doing nothing")
290       else:
291         msg = "Someone else has the master ip, not activating"
292         logging.error(msg)
293         err_msgs.append(msg)
294     else:
295       ipcls = netutils.IP4Address
296       if family == netutils.IP6Address.family:
297         ipcls = netutils.IP6Address
298
299       result = utils.RunCmd(["ip", "address", "add",
300                              "%s/%d" % (master_ip, ipcls.iplen),
301                              "dev", master_netdev, "label",
302                              "%s:0" % master_netdev])
303       if result.failed:
304         msg = "Can't activate master IP: %s" % result.output
305         logging.error(msg)
306         err_msgs.append(msg)
307
308       # we ignore the exit code of the following cmds
309       if ipcls == netutils.IP4Address:
310         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
311                       master_ip, master_ip])
312       elif ipcls == netutils.IP6Address:
313         try:
314           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
315         except errors.OpExecError:
316           # TODO: Better error reporting
317           logging.warning("Can't execute ndisc6, please install if missing")
318
319   if err_msgs:
320     _Fail("; ".join(err_msgs))
321
322
323 def StopMaster(stop_daemons):
324   """Deactivate this node as master.
325
326   The function will always try to deactivate the IP address of the
327   master. It will also stop the master daemons depending on the
328   stop_daemons parameter.
329
330   @type stop_daemons: boolean
331   @param stop_daemons: whether to also stop the master daemons
332       (ganeti-masterd and ganeti-rapi)
333   @rtype: None
334
335   """
336   # TODO: log and report back to the caller the error failures; we
337   # need to decide in which case we fail the RPC for this
338
339   # GetMasterInfo will raise an exception if not able to return data
340   master_netdev, master_ip, _, family = GetMasterInfo()
341
342   ipcls = netutils.IP4Address
343   if family == netutils.IP6Address.family:
344     ipcls = netutils.IP6Address
345
346   result = utils.RunCmd(["ip", "address", "del",
347                          "%s/%d" % (master_ip, ipcls.iplen),
348                          "dev", master_netdev])
349   if result.failed:
350     logging.error("Can't remove the master IP, error: %s", result.output)
351     # but otherwise ignore the failure
352
353   if stop_daemons:
354     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
355     if result.failed:
356       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
357                     " and error %s",
358                     result.cmd, result.exit_code, result.output)
359
360
361 def EtcHostsModify(mode, host, ip):
362   """Modify a host entry in /etc/hosts.
363
364   @param mode: The mode to operate. Either add or remove entry
365   @param host: The host to operate on
366   @param ip: The ip associated with the entry
367
368   """
369   if mode == constants.ETC_HOSTS_ADD:
370     if not ip:
371       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
372               " present")
373     utils.AddHostToEtcHosts(host, ip)
374   elif mode == constants.ETC_HOSTS_REMOVE:
375     if ip:
376       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
377               " parameter is present")
378     utils.RemoveHostFromEtcHosts(host)
379   else:
380     RPCFail("Mode not supported")
381
382
383 def LeaveCluster(modify_ssh_setup):
384   """Cleans up and remove the current node.
385
386   This function cleans up and prepares the current node to be removed
387   from the cluster.
388
389   If processing is successful, then it raises an
390   L{errors.QuitGanetiException} which is used as a special case to
391   shutdown the node daemon.
392
393   @param modify_ssh_setup: boolean
394
395   """
396   _CleanDirectory(constants.DATA_DIR)
397   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
398   JobQueuePurge()
399
400   if modify_ssh_setup:
401     try:
402       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
403
404       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
405
406       utils.RemoveFile(priv_key)
407       utils.RemoveFile(pub_key)
408     except errors.OpExecError:
409       logging.exception("Error while processing ssh files")
410
411   try:
412     utils.RemoveFile(constants.CONFD_HMAC_KEY)
413     utils.RemoveFile(constants.RAPI_CERT_FILE)
414     utils.RemoveFile(constants.NODED_CERT_FILE)
415   except: # pylint: disable-msg=W0702
416     logging.exception("Error while removing cluster secrets")
417
418   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
419   if result.failed:
420     logging.error("Command %s failed with exitcode %s and error %s",
421                   result.cmd, result.exit_code, result.output)
422
423   # Raise a custom exception (handled in ganeti-noded)
424   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
425
426
427 def GetNodeInfo(vgname, hypervisor_type):
428   """Gives back a hash with different information about the node.
429
430   @type vgname: C{string}
431   @param vgname: the name of the volume group to ask for disk space information
432   @type hypervisor_type: C{str}
433   @param hypervisor_type: the name of the hypervisor to ask for
434       memory information
435   @rtype: C{dict}
436   @return: dictionary with the following keys:
437       - vg_size is the size of the configured volume group in MiB
438       - vg_free is the free size of the volume group in MiB
439       - memory_dom0 is the memory allocated for domain0 in MiB
440       - memory_free is the currently available (free) ram in MiB
441       - memory_total is the total number of ram in MiB
442
443   """
444   outputarray = {}
445
446   if vgname is not None:
447     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
448     vg_free = vg_size = None
449     if vginfo:
450       vg_free = int(round(vginfo[0][0], 0))
451       vg_size = int(round(vginfo[0][1], 0))
452     outputarray['vg_size'] = vg_size
453     outputarray['vg_free'] = vg_free
454
455   if hypervisor_type is not None:
456     hyper = hypervisor.GetHypervisor(hypervisor_type)
457     hyp_info = hyper.GetNodeInfo()
458     if hyp_info is not None:
459       outputarray.update(hyp_info)
460
461   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462
463   return outputarray
464
465
466 def VerifyNode(what, cluster_name):
467   """Verify the status of the local node.
468
469   Based on the input L{what} parameter, various checks are done on the
470   local node.
471
472   If the I{filelist} key is present, this list of
473   files is checksummed and the file/checksum pairs are returned.
474
475   If the I{nodelist} key is present, we check that we have
476   connectivity via ssh with the target nodes (and check the hostname
477   report).
478
479   If the I{node-net-test} key is present, we check that we have
480   connectivity to the given nodes via both primary IP and, if
481   applicable, secondary IPs.
482
483   @type what: C{dict}
484   @param what: a dictionary of things to check:
485       - filelist: list of files for which to compute checksums
486       - nodelist: list of nodes we should check ssh communication with
487       - node-net-test: list of nodes we should check node daemon port
488         connectivity with
489       - hypervisor: list with hypervisors to run the verify for
490   @rtype: dict
491   @return: a dictionary with the same keys as the input dict, and
492       values representing the result of the checks
493
494   """
495   result = {}
496   my_name = netutils.Hostname.GetSysName()
497   port = netutils.GetDaemonPort(constants.NODED)
498   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
499
500   if constants.NV_HYPERVISOR in what and vm_capable:
501     result[constants.NV_HYPERVISOR] = tmp = {}
502     for hv_name in what[constants.NV_HYPERVISOR]:
503       try:
504         val = hypervisor.GetHypervisor(hv_name).Verify()
505       except errors.HypervisorError, err:
506         val = "Error while checking hypervisor: %s" % str(err)
507       tmp[hv_name] = val
508
509   if constants.NV_FILELIST in what:
510     result[constants.NV_FILELIST] = utils.FingerprintFiles(
511       what[constants.NV_FILELIST])
512
513   if constants.NV_NODELIST in what:
514     result[constants.NV_NODELIST] = tmp = {}
515     random.shuffle(what[constants.NV_NODELIST])
516     for node in what[constants.NV_NODELIST]:
517       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
518       if not success:
519         tmp[node] = message
520
521   if constants.NV_NODENETTEST in what:
522     result[constants.NV_NODENETTEST] = tmp = {}
523     my_pip = my_sip = None
524     for name, pip, sip in what[constants.NV_NODENETTEST]:
525       if name == my_name:
526         my_pip = pip
527         my_sip = sip
528         break
529     if not my_pip:
530       tmp[my_name] = ("Can't find my own primary/secondary IP"
531                       " in the node list")
532     else:
533       for name, pip, sip in what[constants.NV_NODENETTEST]:
534         fail = []
535         if not netutils.TcpPing(pip, port, source=my_pip):
536           fail.append("primary")
537         if sip != pip:
538           if not netutils.TcpPing(sip, port, source=my_sip):
539             fail.append("secondary")
540         if fail:
541           tmp[name] = ("failure using the %s interface(s)" %
542                        " and ".join(fail))
543
544   if constants.NV_MASTERIP in what:
545     # FIXME: add checks on incoming data structures (here and in the
546     # rest of the function)
547     master_name, master_ip = what[constants.NV_MASTERIP]
548     if master_name == my_name:
549       source = constants.IP4_ADDRESS_LOCALHOST
550     else:
551       source = None
552     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
553                                                   source=source)
554
555   if constants.NV_OOB_PATHS in what:
556     result[constants.NV_OOB_PATHS] = tmp = []
557     for path in what[constants.NV_OOB_PATHS]:
558       try:
559         st = os.stat(path)
560       except OSError, err:
561         tmp.append("error stating out of band helper: %s" % err)
562       else:
563         if stat.S_ISREG(st.st_mode):
564           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
565             tmp.append(None)
566           else:
567             tmp.append("out of band helper %s is not executable" % path)
568         else:
569           tmp.append("out of band helper %s is not a file" % path)
570
571   if constants.NV_LVLIST in what and vm_capable:
572     try:
573       val = GetVolumeList(utils.ListVolumeGroups().keys())
574     except RPCFail, err:
575       val = str(err)
576     result[constants.NV_LVLIST] = val
577
578   if constants.NV_INSTANCELIST in what and vm_capable:
579     # GetInstanceList can fail
580     try:
581       val = GetInstanceList(what[constants.NV_INSTANCELIST])
582     except RPCFail, err:
583       val = str(err)
584     result[constants.NV_INSTANCELIST] = val
585
586   if constants.NV_VGLIST in what and vm_capable:
587     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
588
589   if constants.NV_PVLIST in what and vm_capable:
590     result[constants.NV_PVLIST] = \
591       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
592                                    filter_allocatable=False)
593
594   if constants.NV_VERSION in what:
595     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
596                                     constants.RELEASE_VERSION)
597
598   if constants.NV_HVINFO in what and vm_capable:
599     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
600     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
601
602   if constants.NV_DRBDLIST in what and vm_capable:
603     try:
604       used_minors = bdev.DRBD8.GetUsedDevs().keys()
605     except errors.BlockDeviceError, err:
606       logging.warning("Can't get used minors list", exc_info=True)
607       used_minors = str(err)
608     result[constants.NV_DRBDLIST] = used_minors
609
610   if constants.NV_DRBDHELPER in what and vm_capable:
611     status = True
612     try:
613       payload = bdev.BaseDRBD.GetUsermodeHelper()
614     except errors.BlockDeviceError, err:
615       logging.error("Can't get DRBD usermode helper: %s", str(err))
616       status = False
617       payload = str(err)
618     result[constants.NV_DRBDHELPER] = (status, payload)
619
620   if constants.NV_NODESETUP in what:
621     result[constants.NV_NODESETUP] = tmpr = []
622     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
623       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
624                   " under /sys, missing required directories /sys/block"
625                   " and /sys/class/net")
626     if (not os.path.isdir("/proc/sys") or
627         not os.path.isfile("/proc/sysrq-trigger")):
628       tmpr.append("The procfs filesystem doesn't seem to be mounted"
629                   " under /proc, missing required directory /proc/sys and"
630                   " the file /proc/sysrq-trigger")
631
632   if constants.NV_TIME in what:
633     result[constants.NV_TIME] = utils.SplitTime(time.time())
634
635   if constants.NV_OSLIST in what and vm_capable:
636     result[constants.NV_OSLIST] = DiagnoseOS()
637
638   return result
639
640
641 def GetVolumeList(vg_names):
642   """Compute list of logical volumes and their size.
643
644   @type vg_names: list
645   @param vg_names: the volume groups whose LVs we should list
646   @rtype: dict
647   @return:
648       dictionary of all partions (key) with value being a tuple of
649       their size (in MiB), inactive and online status::
650
651         {'xenvg/test1': ('20.06', True, True)}
652
653       in case of errors, a string is returned with the error
654       details.
655
656   """
657   lvs = {}
658   sep = '|'
659   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
660                          "--separator=%s" % sep,
661                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
662   if result.failed:
663     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
664
665   for line in result.stdout.splitlines():
666     line = line.strip()
667     match = _LVSLINE_REGEX.match(line)
668     if not match:
669       logging.error("Invalid line returned from lvs output: '%s'", line)
670       continue
671     vg_name, name, size, attr = match.groups()
672     inactive = attr[4] == '-'
673     online = attr[5] == 'o'
674     virtual = attr[0] == 'v'
675     if virtual:
676       # we don't want to report such volumes as existing, since they
677       # don't really hold data
678       continue
679     lvs[vg_name+"/"+name] = (size, inactive, online)
680
681   return lvs
682
683
684 def ListVolumeGroups():
685   """List the volume groups and their size.
686
687   @rtype: dict
688   @return: dictionary with keys volume name and values the
689       size of the volume
690
691   """
692   return utils.ListVolumeGroups()
693
694
695 def NodeVolumes():
696   """List all volumes on this node.
697
698   @rtype: list
699   @return:
700     A list of dictionaries, each having four keys:
701       - name: the logical volume name,
702       - size: the size of the logical volume
703       - dev: the physical device on which the LV lives
704       - vg: the volume group to which it belongs
705
706     In case of errors, we return an empty list and log the
707     error.
708
709     Note that since a logical volume can live on multiple physical
710     volumes, the resulting list might include a logical volume
711     multiple times.
712
713   """
714   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
715                          "--separator=|",
716                          "--options=lv_name,lv_size,devices,vg_name"])
717   if result.failed:
718     _Fail("Failed to list logical volumes, lvs output: %s",
719           result.output)
720
721   def parse_dev(dev):
722     return dev.split('(')[0]
723
724   def handle_dev(dev):
725     return [parse_dev(x) for x in dev.split(",")]
726
727   def map_line(line):
728     line = [v.strip() for v in line]
729     return [{'name': line[0], 'size': line[1],
730              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
731
732   all_devs = []
733   for line in result.stdout.splitlines():
734     if line.count('|') >= 3:
735       all_devs.extend(map_line(line.split('|')))
736     else:
737       logging.warning("Strange line in the output from lvs: '%s'", line)
738   return all_devs
739
740
741 def BridgesExist(bridges_list):
742   """Check if a list of bridges exist on the current node.
743
744   @rtype: boolean
745   @return: C{True} if all of them exist, C{False} otherwise
746
747   """
748   missing = []
749   for bridge in bridges_list:
750     if not utils.BridgeExists(bridge):
751       missing.append(bridge)
752
753   if missing:
754     _Fail("Missing bridges %s", utils.CommaJoin(missing))
755
756
757 def GetInstanceList(hypervisor_list):
758   """Provides a list of instances.
759
760   @type hypervisor_list: list
761   @param hypervisor_list: the list of hypervisors to query information
762
763   @rtype: list
764   @return: a list of all running instances on the current node
765     - instance1.example.com
766     - instance2.example.com
767
768   """
769   results = []
770   for hname in hypervisor_list:
771     try:
772       names = hypervisor.GetHypervisor(hname).ListInstances()
773       results.extend(names)
774     except errors.HypervisorError, err:
775       _Fail("Error enumerating instances (hypervisor %s): %s",
776             hname, err, exc=True)
777
778   return results
779
780
781 def GetInstanceInfo(instance, hname):
782   """Gives back the information about an instance as a dictionary.
783
784   @type instance: string
785   @param instance: the instance name
786   @type hname: string
787   @param hname: the hypervisor type of the instance
788
789   @rtype: dict
790   @return: dictionary with the following keys:
791       - memory: memory size of instance (int)
792       - state: xen state of instance (string)
793       - time: cpu time of instance (float)
794
795   """
796   output = {}
797
798   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
799   if iinfo is not None:
800     output['memory'] = iinfo[2]
801     output['state'] = iinfo[4]
802     output['time'] = iinfo[5]
803
804   return output
805
806
807 def GetInstanceMigratable(instance):
808   """Gives whether an instance can be migrated.
809
810   @type instance: L{objects.Instance}
811   @param instance: object representing the instance to be checked.
812
813   @rtype: tuple
814   @return: tuple of (result, description) where:
815       - result: whether the instance can be migrated or not
816       - description: a description of the issue, if relevant
817
818   """
819   hyper = hypervisor.GetHypervisor(instance.hypervisor)
820   iname = instance.name
821   if iname not in hyper.ListInstances():
822     _Fail("Instance %s is not running", iname)
823
824   for idx in range(len(instance.disks)):
825     link_name = _GetBlockDevSymlinkPath(iname, idx)
826     if not os.path.islink(link_name):
827       logging.warning("Instance %s is missing symlink %s for disk %d",
828                       iname, link_name, idx)
829
830
831 def GetAllInstancesInfo(hypervisor_list):
832   """Gather data about all instances.
833
834   This is the equivalent of L{GetInstanceInfo}, except that it
835   computes data for all instances at once, thus being faster if one
836   needs data about more than one instance.
837
838   @type hypervisor_list: list
839   @param hypervisor_list: list of hypervisors to query for instance data
840
841   @rtype: dict
842   @return: dictionary of instance: data, with data having the following keys:
843       - memory: memory size of instance (int)
844       - state: xen state of instance (string)
845       - time: cpu time of instance (float)
846       - vcpus: the number of vcpus
847
848   """
849   output = {}
850
851   for hname in hypervisor_list:
852     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
853     if iinfo:
854       for name, _, memory, vcpus, state, times in iinfo:
855         value = {
856           'memory': memory,
857           'vcpus': vcpus,
858           'state': state,
859           'time': times,
860           }
861         if name in output:
862           # we only check static parameters, like memory and vcpus,
863           # and not state and time which can change between the
864           # invocations of the different hypervisors
865           for key in 'memory', 'vcpus':
866             if value[key] != output[name][key]:
867               _Fail("Instance %s is running twice"
868                     " with different parameters", name)
869         output[name] = value
870
871   return output
872
873
874 def _InstanceLogName(kind, os_name, instance):
875   """Compute the OS log filename for a given instance and operation.
876
877   The instance name and os name are passed in as strings since not all
878   operations have these as part of an instance object.
879
880   @type kind: string
881   @param kind: the operation type (e.g. add, import, etc.)
882   @type os_name: string
883   @param os_name: the os name
884   @type instance: string
885   @param instance: the name of the instance being imported/added/etc.
886
887   """
888   # TODO: Use tempfile.mkstemp to create unique filename
889   base = ("%s-%s-%s-%s.log" %
890           (kind, os_name, instance, utils.TimestampForFilename()))
891   return utils.PathJoin(constants.LOG_OS_DIR, base)
892
893
894 def InstanceOsAdd(instance, reinstall, debug):
895   """Add an OS to an instance.
896
897   @type instance: L{objects.Instance}
898   @param instance: Instance whose OS is to be installed
899   @type reinstall: boolean
900   @param reinstall: whether this is an instance reinstall
901   @type debug: integer
902   @param debug: debug level, passed to the OS scripts
903   @rtype: None
904
905   """
906   inst_os = OSFromDisk(instance.os)
907
908   create_env = OSEnvironment(instance, inst_os, debug)
909   if reinstall:
910     create_env['INSTANCE_REINSTALL'] = "1"
911
912   logfile = _InstanceLogName("add", instance.os, instance.name)
913
914   result = utils.RunCmd([inst_os.create_script], env=create_env,
915                         cwd=inst_os.path, output=logfile,)
916   if result.failed:
917     logging.error("os create command '%s' returned error: %s, logfile: %s,"
918                   " output: %s", result.cmd, result.fail_reason, logfile,
919                   result.output)
920     lines = [utils.SafeEncode(val)
921              for val in utils.TailFile(logfile, lines=20)]
922     _Fail("OS create script failed (%s), last lines in the"
923           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
924
925
926 def RunRenameInstance(instance, old_name, debug):
927   """Run the OS rename script for an instance.
928
929   @type instance: L{objects.Instance}
930   @param instance: Instance whose OS is to be installed
931   @type old_name: string
932   @param old_name: previous instance name
933   @type debug: integer
934   @param debug: debug level, passed to the OS scripts
935   @rtype: boolean
936   @return: the success of the operation
937
938   """
939   inst_os = OSFromDisk(instance.os)
940
941   rename_env = OSEnvironment(instance, inst_os, debug)
942   rename_env['OLD_INSTANCE_NAME'] = old_name
943
944   logfile = _InstanceLogName("rename", instance.os,
945                              "%s-%s" % (old_name, instance.name))
946
947   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
948                         cwd=inst_os.path, output=logfile)
949
950   if result.failed:
951     logging.error("os create command '%s' returned error: %s output: %s",
952                   result.cmd, result.fail_reason, result.output)
953     lines = [utils.SafeEncode(val)
954              for val in utils.TailFile(logfile, lines=20)]
955     _Fail("OS rename script failed (%s), last lines in the"
956           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
957
958
959 def _GetBlockDevSymlinkPath(instance_name, idx):
960   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
961                         (instance_name, constants.DISK_SEPARATOR, idx))
962
963
964 def _SymlinkBlockDev(instance_name, device_path, idx):
965   """Set up symlinks to a instance's block device.
966
967   This is an auxiliary function run when an instance is start (on the primary
968   node) or when an instance is migrated (on the target node).
969
970
971   @param instance_name: the name of the target instance
972   @param device_path: path of the physical block device, on the node
973   @param idx: the disk index
974   @return: absolute path to the disk's symlink
975
976   """
977   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
978   try:
979     os.symlink(device_path, link_name)
980   except OSError, err:
981     if err.errno == errno.EEXIST:
982       if (not os.path.islink(link_name) or
983           os.readlink(link_name) != device_path):
984         os.remove(link_name)
985         os.symlink(device_path, link_name)
986     else:
987       raise
988
989   return link_name
990
991
992 def _RemoveBlockDevLinks(instance_name, disks):
993   """Remove the block device symlinks belonging to the given instance.
994
995   """
996   for idx, _ in enumerate(disks):
997     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
998     if os.path.islink(link_name):
999       try:
1000         os.remove(link_name)
1001       except OSError:
1002         logging.exception("Can't remove symlink '%s'", link_name)
1003
1004
1005 def _GatherAndLinkBlockDevs(instance):
1006   """Set up an instance's block device(s).
1007
1008   This is run on the primary node at instance startup. The block
1009   devices must be already assembled.
1010
1011   @type instance: L{objects.Instance}
1012   @param instance: the instance whose disks we shoul assemble
1013   @rtype: list
1014   @return: list of (disk_object, device_path)
1015
1016   """
1017   block_devices = []
1018   for idx, disk in enumerate(instance.disks):
1019     device = _RecursiveFindBD(disk)
1020     if device is None:
1021       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1022                                     str(disk))
1023     device.Open()
1024     try:
1025       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1026     except OSError, e:
1027       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1028                                     e.strerror)
1029
1030     block_devices.append((disk, link_name))
1031
1032   return block_devices
1033
1034
1035 def StartInstance(instance):
1036   """Start an instance.
1037
1038   @type instance: L{objects.Instance}
1039   @param instance: the instance object
1040   @rtype: None
1041
1042   """
1043   running_instances = GetInstanceList([instance.hypervisor])
1044
1045   if instance.name in running_instances:
1046     logging.info("Instance %s already running, not starting", instance.name)
1047     return
1048
1049   try:
1050     block_devices = _GatherAndLinkBlockDevs(instance)
1051     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1052     hyper.StartInstance(instance, block_devices)
1053   except errors.BlockDeviceError, err:
1054     _Fail("Block device error: %s", err, exc=True)
1055   except errors.HypervisorError, err:
1056     _RemoveBlockDevLinks(instance.name, instance.disks)
1057     _Fail("Hypervisor error: %s", err, exc=True)
1058
1059
1060 def InstanceShutdown(instance, timeout):
1061   """Shut an instance down.
1062
1063   @note: this functions uses polling with a hardcoded timeout.
1064
1065   @type instance: L{objects.Instance}
1066   @param instance: the instance object
1067   @type timeout: integer
1068   @param timeout: maximum timeout for soft shutdown
1069   @rtype: None
1070
1071   """
1072   hv_name = instance.hypervisor
1073   hyper = hypervisor.GetHypervisor(hv_name)
1074   iname = instance.name
1075
1076   if instance.name not in hyper.ListInstances():
1077     logging.info("Instance %s not running, doing nothing", iname)
1078     return
1079
1080   class _TryShutdown:
1081     def __init__(self):
1082       self.tried_once = False
1083
1084     def __call__(self):
1085       if iname not in hyper.ListInstances():
1086         return
1087
1088       try:
1089         hyper.StopInstance(instance, retry=self.tried_once)
1090       except errors.HypervisorError, err:
1091         if iname not in hyper.ListInstances():
1092           # if the instance is no longer existing, consider this a
1093           # success and go to cleanup
1094           return
1095
1096         _Fail("Failed to stop instance %s: %s", iname, err)
1097
1098       self.tried_once = True
1099
1100       raise utils.RetryAgain()
1101
1102   try:
1103     utils.Retry(_TryShutdown(), 5, timeout)
1104   except utils.RetryTimeout:
1105     # the shutdown did not succeed
1106     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1107
1108     try:
1109       hyper.StopInstance(instance, force=True)
1110     except errors.HypervisorError, err:
1111       if iname in hyper.ListInstances():
1112         # only raise an error if the instance still exists, otherwise
1113         # the error could simply be "instance ... unknown"!
1114         _Fail("Failed to force stop instance %s: %s", iname, err)
1115
1116     time.sleep(1)
1117
1118     if iname in hyper.ListInstances():
1119       _Fail("Could not shutdown instance %s even by destroy", iname)
1120
1121   try:
1122     hyper.CleanupInstance(instance.name)
1123   except errors.HypervisorError, err:
1124     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1125
1126   _RemoveBlockDevLinks(iname, instance.disks)
1127
1128
1129 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1130   """Reboot an instance.
1131
1132   @type instance: L{objects.Instance}
1133   @param instance: the instance object to reboot
1134   @type reboot_type: str
1135   @param reboot_type: the type of reboot, one the following
1136     constants:
1137       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1138         instance OS, do not recreate the VM
1139       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1140         restart the VM (at the hypervisor level)
1141       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1142         not accepted here, since that mode is handled differently, in
1143         cmdlib, and translates into full stop and start of the
1144         instance (instead of a call_instance_reboot RPC)
1145   @type shutdown_timeout: integer
1146   @param shutdown_timeout: maximum timeout for soft shutdown
1147   @rtype: None
1148
1149   """
1150   running_instances = GetInstanceList([instance.hypervisor])
1151
1152   if instance.name not in running_instances:
1153     _Fail("Cannot reboot instance %s that is not running", instance.name)
1154
1155   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1156   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1157     try:
1158       hyper.RebootInstance(instance)
1159     except errors.HypervisorError, err:
1160       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1161   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1162     try:
1163       InstanceShutdown(instance, shutdown_timeout)
1164       return StartInstance(instance)
1165     except errors.HypervisorError, err:
1166       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1167   else:
1168     _Fail("Invalid reboot_type received: %s", reboot_type)
1169
1170
1171 def MigrationInfo(instance):
1172   """Gather information about an instance to be migrated.
1173
1174   @type instance: L{objects.Instance}
1175   @param instance: the instance definition
1176
1177   """
1178   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1179   try:
1180     info = hyper.MigrationInfo(instance)
1181   except errors.HypervisorError, err:
1182     _Fail("Failed to fetch migration information: %s", err, exc=True)
1183   return info
1184
1185
1186 def AcceptInstance(instance, info, target):
1187   """Prepare the node to accept an instance.
1188
1189   @type instance: L{objects.Instance}
1190   @param instance: the instance definition
1191   @type info: string/data (opaque)
1192   @param info: migration information, from the source node
1193   @type target: string
1194   @param target: target host (usually ip), on this node
1195
1196   """
1197   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1198   try:
1199     hyper.AcceptInstance(instance, info, target)
1200   except errors.HypervisorError, err:
1201     _Fail("Failed to accept instance: %s", err, exc=True)
1202
1203
1204 def FinalizeMigration(instance, info, success):
1205   """Finalize any preparation to accept an instance.
1206
1207   @type instance: L{objects.Instance}
1208   @param instance: the instance definition
1209   @type info: string/data (opaque)
1210   @param info: migration information, from the source node
1211   @type success: boolean
1212   @param success: whether the migration was a success or a failure
1213
1214   """
1215   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1216   try:
1217     hyper.FinalizeMigration(instance, info, success)
1218   except errors.HypervisorError, err:
1219     _Fail("Failed to finalize migration: %s", err, exc=True)
1220
1221
1222 def MigrateInstance(instance, target, live):
1223   """Migrates an instance to another node.
1224
1225   @type instance: L{objects.Instance}
1226   @param instance: the instance definition
1227   @type target: string
1228   @param target: the target node name
1229   @type live: boolean
1230   @param live: whether the migration should be done live or not (the
1231       interpretation of this parameter is left to the hypervisor)
1232   @rtype: tuple
1233   @return: a tuple of (success, msg) where:
1234       - succes is a boolean denoting the success/failure of the operation
1235       - msg is a string with details in case of failure
1236
1237   """
1238   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1239
1240   try:
1241     hyper.MigrateInstance(instance, target, live)
1242   except errors.HypervisorError, err:
1243     _Fail("Failed to migrate instance: %s", err, exc=True)
1244
1245
1246 def BlockdevCreate(disk, size, owner, on_primary, info):
1247   """Creates a block device for an instance.
1248
1249   @type disk: L{objects.Disk}
1250   @param disk: the object describing the disk we should create
1251   @type size: int
1252   @param size: the size of the physical underlying device, in MiB
1253   @type owner: str
1254   @param owner: the name of the instance for which disk is created,
1255       used for device cache data
1256   @type on_primary: boolean
1257   @param on_primary:  indicates if it is the primary node or not
1258   @type info: string
1259   @param info: string that will be sent to the physical device
1260       creation, used for example to set (LVM) tags on LVs
1261
1262   @return: the new unique_id of the device (this can sometime be
1263       computed only after creation), or None. On secondary nodes,
1264       it's not required to return anything.
1265
1266   """
1267   # TODO: remove the obsolete 'size' argument
1268   # pylint: disable-msg=W0613
1269   clist = []
1270   if disk.children:
1271     for child in disk.children:
1272       try:
1273         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1274       except errors.BlockDeviceError, err:
1275         _Fail("Can't assemble device %s: %s", child, err)
1276       if on_primary or disk.AssembleOnSecondary():
1277         # we need the children open in case the device itself has to
1278         # be assembled
1279         try:
1280           # pylint: disable-msg=E1103
1281           crdev.Open()
1282         except errors.BlockDeviceError, err:
1283           _Fail("Can't make child '%s' read-write: %s", child, err)
1284       clist.append(crdev)
1285
1286   try:
1287     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1288   except errors.BlockDeviceError, err:
1289     _Fail("Can't create block device: %s", err)
1290
1291   if on_primary or disk.AssembleOnSecondary():
1292     try:
1293       device.Assemble()
1294     except errors.BlockDeviceError, err:
1295       _Fail("Can't assemble device after creation, unusual event: %s", err)
1296     device.SetSyncSpeed(constants.SYNC_SPEED)
1297     if on_primary or disk.OpenOnSecondary():
1298       try:
1299         device.Open(force=True)
1300       except errors.BlockDeviceError, err:
1301         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1302     DevCacheManager.UpdateCache(device.dev_path, owner,
1303                                 on_primary, disk.iv_name)
1304
1305   device.SetInfo(info)
1306
1307   return device.unique_id
1308
1309
1310 def _WipeDevice(path, offset, size):
1311   """This function actually wipes the device.
1312
1313   @param path: The path to the device to wipe
1314   @param offset: The offset in MiB in the file
1315   @param size: The size in MiB to write
1316
1317   """
1318   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1319          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1320          "count=%d" % size]
1321   result = utils.RunCmd(cmd)
1322
1323   if result.failed:
1324     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1325           result.fail_reason, result.output)
1326
1327
1328 def BlockdevWipe(disk, offset, size):
1329   """Wipes a block device.
1330
1331   @type disk: L{objects.Disk}
1332   @param disk: the disk object we want to wipe
1333   @type offset: int
1334   @param offset: The offset in MiB in the file
1335   @type size: int
1336   @param size: The size in MiB to write
1337
1338   """
1339   try:
1340     rdev = _RecursiveFindBD(disk)
1341   except errors.BlockDeviceError:
1342     rdev = None
1343
1344   if not rdev:
1345     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1346
1347   # Do cross verify some of the parameters
1348   if offset > rdev.size:
1349     _Fail("Offset is bigger than device size")
1350   if (offset + size) > rdev.size:
1351     _Fail("The provided offset and size to wipe is bigger than device size")
1352
1353   _WipeDevice(rdev.dev_path, offset, size)
1354
1355
1356 def BlockdevPauseResumeSync(disks, pause):
1357   """Pause or resume the sync of the block device.
1358
1359   @type disks: list of L{objects.Disk}
1360   @param disks: the disks object we want to pause/resume
1361   @type pause: bool
1362   @param pause: Wheater to pause or resume
1363
1364   """
1365   success = []
1366   for disk in disks:
1367     try:
1368       rdev = _RecursiveFindBD(disk)
1369     except errors.BlockDeviceError:
1370       rdev = None
1371
1372     if not rdev:
1373       success.append((False, ("Cannot change sync for device %s:"
1374                               " device not found" % disk.iv_name)))
1375       continue
1376
1377     result = rdev.PauseResumeSync(pause)
1378
1379     if result:
1380       success.append((result, None))
1381     else:
1382       if pause:
1383         msg = "Pause"
1384       else:
1385         msg = "Resume"
1386       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1387
1388   return success
1389
1390
1391 def BlockdevRemove(disk):
1392   """Remove a block device.
1393
1394   @note: This is intended to be called recursively.
1395
1396   @type disk: L{objects.Disk}
1397   @param disk: the disk object we should remove
1398   @rtype: boolean
1399   @return: the success of the operation
1400
1401   """
1402   msgs = []
1403   try:
1404     rdev = _RecursiveFindBD(disk)
1405   except errors.BlockDeviceError, err:
1406     # probably can't attach
1407     logging.info("Can't attach to device %s in remove", disk)
1408     rdev = None
1409   if rdev is not None:
1410     r_path = rdev.dev_path
1411     try:
1412       rdev.Remove()
1413     except errors.BlockDeviceError, err:
1414       msgs.append(str(err))
1415     if not msgs:
1416       DevCacheManager.RemoveCache(r_path)
1417
1418   if disk.children:
1419     for child in disk.children:
1420       try:
1421         BlockdevRemove(child)
1422       except RPCFail, err:
1423         msgs.append(str(err))
1424
1425   if msgs:
1426     _Fail("; ".join(msgs))
1427
1428
1429 def _RecursiveAssembleBD(disk, owner, as_primary):
1430   """Activate a block device for an instance.
1431
1432   This is run on the primary and secondary nodes for an instance.
1433
1434   @note: this function is called recursively.
1435
1436   @type disk: L{objects.Disk}
1437   @param disk: the disk we try to assemble
1438   @type owner: str
1439   @param owner: the name of the instance which owns the disk
1440   @type as_primary: boolean
1441   @param as_primary: if we should make the block device
1442       read/write
1443
1444   @return: the assembled device or None (in case no device
1445       was assembled)
1446   @raise errors.BlockDeviceError: in case there is an error
1447       during the activation of the children or the device
1448       itself
1449
1450   """
1451   children = []
1452   if disk.children:
1453     mcn = disk.ChildrenNeeded()
1454     if mcn == -1:
1455       mcn = 0 # max number of Nones allowed
1456     else:
1457       mcn = len(disk.children) - mcn # max number of Nones
1458     for chld_disk in disk.children:
1459       try:
1460         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1461       except errors.BlockDeviceError, err:
1462         if children.count(None) >= mcn:
1463           raise
1464         cdev = None
1465         logging.error("Error in child activation (but continuing): %s",
1466                       str(err))
1467       children.append(cdev)
1468
1469   if as_primary or disk.AssembleOnSecondary():
1470     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1471     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1472     result = r_dev
1473     if as_primary or disk.OpenOnSecondary():
1474       r_dev.Open()
1475     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1476                                 as_primary, disk.iv_name)
1477
1478   else:
1479     result = True
1480   return result
1481
1482
1483 def BlockdevAssemble(disk, owner, as_primary):
1484   """Activate a block device for an instance.
1485
1486   This is a wrapper over _RecursiveAssembleBD.
1487
1488   @rtype: str or boolean
1489   @return: a C{/dev/...} path for primary nodes, and
1490       C{True} for secondary nodes
1491
1492   """
1493   try:
1494     result = _RecursiveAssembleBD(disk, owner, as_primary)
1495     if isinstance(result, bdev.BlockDev):
1496       # pylint: disable-msg=E1103
1497       result = result.dev_path
1498   except errors.BlockDeviceError, err:
1499     _Fail("Error while assembling disk: %s", err, exc=True)
1500
1501   return result
1502
1503
1504 def BlockdevShutdown(disk):
1505   """Shut down a block device.
1506
1507   First, if the device is assembled (Attach() is successful), then
1508   the device is shutdown. Then the children of the device are
1509   shutdown.
1510
1511   This function is called recursively. Note that we don't cache the
1512   children or such, as oppossed to assemble, shutdown of different
1513   devices doesn't require that the upper device was active.
1514
1515   @type disk: L{objects.Disk}
1516   @param disk: the description of the disk we should
1517       shutdown
1518   @rtype: None
1519
1520   """
1521   msgs = []
1522   r_dev = _RecursiveFindBD(disk)
1523   if r_dev is not None:
1524     r_path = r_dev.dev_path
1525     try:
1526       r_dev.Shutdown()
1527       DevCacheManager.RemoveCache(r_path)
1528     except errors.BlockDeviceError, err:
1529       msgs.append(str(err))
1530
1531   if disk.children:
1532     for child in disk.children:
1533       try:
1534         BlockdevShutdown(child)
1535       except RPCFail, err:
1536         msgs.append(str(err))
1537
1538   if msgs:
1539     _Fail("; ".join(msgs))
1540
1541
1542 def BlockdevAddchildren(parent_cdev, new_cdevs):
1543   """Extend a mirrored block device.
1544
1545   @type parent_cdev: L{objects.Disk}
1546   @param parent_cdev: the disk to which we should add children
1547   @type new_cdevs: list of L{objects.Disk}
1548   @param new_cdevs: the list of children which we should add
1549   @rtype: None
1550
1551   """
1552   parent_bdev = _RecursiveFindBD(parent_cdev)
1553   if parent_bdev is None:
1554     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1555   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1556   if new_bdevs.count(None) > 0:
1557     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1558   parent_bdev.AddChildren(new_bdevs)
1559
1560
1561 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1562   """Shrink a mirrored block device.
1563
1564   @type parent_cdev: L{objects.Disk}
1565   @param parent_cdev: the disk from which we should remove children
1566   @type new_cdevs: list of L{objects.Disk}
1567   @param new_cdevs: the list of children which we should remove
1568   @rtype: None
1569
1570   """
1571   parent_bdev = _RecursiveFindBD(parent_cdev)
1572   if parent_bdev is None:
1573     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1574   devs = []
1575   for disk in new_cdevs:
1576     rpath = disk.StaticDevPath()
1577     if rpath is None:
1578       bd = _RecursiveFindBD(disk)
1579       if bd is None:
1580         _Fail("Can't find device %s while removing children", disk)
1581       else:
1582         devs.append(bd.dev_path)
1583     else:
1584       if not utils.IsNormAbsPath(rpath):
1585         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1586       devs.append(rpath)
1587   parent_bdev.RemoveChildren(devs)
1588
1589
1590 def BlockdevGetmirrorstatus(disks):
1591   """Get the mirroring status of a list of devices.
1592
1593   @type disks: list of L{objects.Disk}
1594   @param disks: the list of disks which we should query
1595   @rtype: disk
1596   @return: List of L{objects.BlockDevStatus}, one for each disk
1597   @raise errors.BlockDeviceError: if any of the disks cannot be
1598       found
1599
1600   """
1601   stats = []
1602   for dsk in disks:
1603     rbd = _RecursiveFindBD(dsk)
1604     if rbd is None:
1605       _Fail("Can't find device %s", dsk)
1606
1607     stats.append(rbd.CombinedSyncStatus())
1608
1609   return stats
1610
1611
1612 def BlockdevGetmirrorstatusMulti(disks):
1613   """Get the mirroring status of a list of devices.
1614
1615   @type disks: list of L{objects.Disk}
1616   @param disks: the list of disks which we should query
1617   @rtype: disk
1618   @return: List of tuples, (bool, status), one for each disk; bool denotes
1619     success/failure, status is L{objects.BlockDevStatus} on success, string
1620     otherwise
1621
1622   """
1623   result = []
1624   for disk in disks:
1625     try:
1626       rbd = _RecursiveFindBD(disk)
1627       if rbd is None:
1628         result.append((False, "Can't find device %s" % disk))
1629         continue
1630
1631       status = rbd.CombinedSyncStatus()
1632     except errors.BlockDeviceError, err:
1633       logging.exception("Error while getting disk status")
1634       result.append((False, str(err)))
1635     else:
1636       result.append((True, status))
1637
1638   assert len(disks) == len(result)
1639
1640   return result
1641
1642
1643 def _RecursiveFindBD(disk):
1644   """Check if a device is activated.
1645
1646   If so, return information about the real device.
1647
1648   @type disk: L{objects.Disk}
1649   @param disk: the disk object we need to find
1650
1651   @return: None if the device can't be found,
1652       otherwise the device instance
1653
1654   """
1655   children = []
1656   if disk.children:
1657     for chdisk in disk.children:
1658       children.append(_RecursiveFindBD(chdisk))
1659
1660   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1661
1662
1663 def _OpenRealBD(disk):
1664   """Opens the underlying block device of a disk.
1665
1666   @type disk: L{objects.Disk}
1667   @param disk: the disk object we want to open
1668
1669   """
1670   real_disk = _RecursiveFindBD(disk)
1671   if real_disk is None:
1672     _Fail("Block device '%s' is not set up", disk)
1673
1674   real_disk.Open()
1675
1676   return real_disk
1677
1678
1679 def BlockdevFind(disk):
1680   """Check if a device is activated.
1681
1682   If it is, return information about the real device.
1683
1684   @type disk: L{objects.Disk}
1685   @param disk: the disk to find
1686   @rtype: None or objects.BlockDevStatus
1687   @return: None if the disk cannot be found, otherwise a the current
1688            information
1689
1690   """
1691   try:
1692     rbd = _RecursiveFindBD(disk)
1693   except errors.BlockDeviceError, err:
1694     _Fail("Failed to find device: %s", err, exc=True)
1695
1696   if rbd is None:
1697     return None
1698
1699   return rbd.GetSyncStatus()
1700
1701
1702 def BlockdevGetsize(disks):
1703   """Computes the size of the given disks.
1704
1705   If a disk is not found, returns None instead.
1706
1707   @type disks: list of L{objects.Disk}
1708   @param disks: the list of disk to compute the size for
1709   @rtype: list
1710   @return: list with elements None if the disk cannot be found,
1711       otherwise the size
1712
1713   """
1714   result = []
1715   for cf in disks:
1716     try:
1717       rbd = _RecursiveFindBD(cf)
1718     except errors.BlockDeviceError:
1719       result.append(None)
1720       continue
1721     if rbd is None:
1722       result.append(None)
1723     else:
1724       result.append(rbd.GetActualSize())
1725   return result
1726
1727
1728 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1729   """Export a block device to a remote node.
1730
1731   @type disk: L{objects.Disk}
1732   @param disk: the description of the disk to export
1733   @type dest_node: str
1734   @param dest_node: the destination node to export to
1735   @type dest_path: str
1736   @param dest_path: the destination path on the target node
1737   @type cluster_name: str
1738   @param cluster_name: the cluster name, needed for SSH hostalias
1739   @rtype: None
1740
1741   """
1742   real_disk = _OpenRealBD(disk)
1743
1744   # the block size on the read dd is 1MiB to match our units
1745   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1746                                "dd if=%s bs=1048576 count=%s",
1747                                real_disk.dev_path, str(disk.size))
1748
1749   # we set here a smaller block size as, due to ssh buffering, more
1750   # than 64-128k will mostly ignored; we use nocreat to fail if the
1751   # device is not already there or we pass a wrong path; we use
1752   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1753   # to not buffer too much memory; this means that at best, we flush
1754   # every 64k, which will not be very fast
1755   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1756                                 " oflag=dsync", dest_path)
1757
1758   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1759                                                    constants.GANETI_RUNAS,
1760                                                    destcmd)
1761
1762   # all commands have been checked, so we're safe to combine them
1763   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1764
1765   result = utils.RunCmd(["bash", "-c", command])
1766
1767   if result.failed:
1768     _Fail("Disk copy command '%s' returned error: %s"
1769           " output: %s", command, result.fail_reason, result.output)
1770
1771
1772 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1773   """Write a file to the filesystem.
1774
1775   This allows the master to overwrite(!) a file. It will only perform
1776   the operation if the file belongs to a list of configuration files.
1777
1778   @type file_name: str
1779   @param file_name: the target file name
1780   @type data: str
1781   @param data: the new contents of the file
1782   @type mode: int
1783   @param mode: the mode to give the file (can be None)
1784   @type uid: int
1785   @param uid: the owner of the file (can be -1 for default)
1786   @type gid: int
1787   @param gid: the group of the file (can be -1 for default)
1788   @type atime: float
1789   @param atime: the atime to set on the file (can be None)
1790   @type mtime: float
1791   @param mtime: the mtime to set on the file (can be None)
1792   @rtype: None
1793
1794   """
1795   if not os.path.isabs(file_name):
1796     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1797
1798   if file_name not in _ALLOWED_UPLOAD_FILES:
1799     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1800           file_name)
1801
1802   raw_data = _Decompress(data)
1803
1804   utils.SafeWriteFile(file_name, None,
1805                       data=raw_data, mode=mode, uid=uid, gid=gid,
1806                       atime=atime, mtime=mtime)
1807
1808
1809 def RunOob(oob_program, command, node, timeout):
1810   """Executes oob_program with given command on given node.
1811
1812   @param oob_program: The path to the executable oob_program
1813   @param command: The command to invoke on oob_program
1814   @param node: The node given as an argument to the program
1815   @param timeout: Timeout after which we kill the oob program
1816
1817   @return: stdout
1818   @raise RPCFail: If execution fails for some reason
1819
1820   """
1821   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1822
1823   if result.failed:
1824     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1825           result.fail_reason, result.output)
1826
1827   return result.stdout
1828
1829
1830 def WriteSsconfFiles(values):
1831   """Update all ssconf files.
1832
1833   Wrapper around the SimpleStore.WriteFiles.
1834
1835   """
1836   ssconf.SimpleStore().WriteFiles(values)
1837
1838
1839 def _ErrnoOrStr(err):
1840   """Format an EnvironmentError exception.
1841
1842   If the L{err} argument has an errno attribute, it will be looked up
1843   and converted into a textual C{E...} description. Otherwise the
1844   string representation of the error will be returned.
1845
1846   @type err: L{EnvironmentError}
1847   @param err: the exception to format
1848
1849   """
1850   if hasattr(err, 'errno'):
1851     detail = errno.errorcode[err.errno]
1852   else:
1853     detail = str(err)
1854   return detail
1855
1856
1857 def _OSOndiskAPIVersion(os_dir):
1858   """Compute and return the API version of a given OS.
1859
1860   This function will try to read the API version of the OS residing in
1861   the 'os_dir' directory.
1862
1863   @type os_dir: str
1864   @param os_dir: the directory in which we should look for the OS
1865   @rtype: tuple
1866   @return: tuple (status, data) with status denoting the validity and
1867       data holding either the vaid versions or an error message
1868
1869   """
1870   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1871
1872   try:
1873     st = os.stat(api_file)
1874   except EnvironmentError, err:
1875     return False, ("Required file '%s' not found under path %s: %s" %
1876                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1877
1878   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1879     return False, ("File '%s' in %s is not a regular file" %
1880                    (constants.OS_API_FILE, os_dir))
1881
1882   try:
1883     api_versions = utils.ReadFile(api_file).splitlines()
1884   except EnvironmentError, err:
1885     return False, ("Error while reading the API version file at %s: %s" %
1886                    (api_file, _ErrnoOrStr(err)))
1887
1888   try:
1889     api_versions = [int(version.strip()) for version in api_versions]
1890   except (TypeError, ValueError), err:
1891     return False, ("API version(s) can't be converted to integer: %s" %
1892                    str(err))
1893
1894   return True, api_versions
1895
1896
1897 def DiagnoseOS(top_dirs=None):
1898   """Compute the validity for all OSes.
1899
1900   @type top_dirs: list
1901   @param top_dirs: the list of directories in which to
1902       search (if not given defaults to
1903       L{constants.OS_SEARCH_PATH})
1904   @rtype: list of L{objects.OS}
1905   @return: a list of tuples (name, path, status, diagnose, variants,
1906       parameters, api_version) for all (potential) OSes under all
1907       search paths, where:
1908           - name is the (potential) OS name
1909           - path is the full path to the OS
1910           - status True/False is the validity of the OS
1911           - diagnose is the error message for an invalid OS, otherwise empty
1912           - variants is a list of supported OS variants, if any
1913           - parameters is a list of (name, help) parameters, if any
1914           - api_version is a list of support OS API versions
1915
1916   """
1917   if top_dirs is None:
1918     top_dirs = constants.OS_SEARCH_PATH
1919
1920   result = []
1921   for dir_name in top_dirs:
1922     if os.path.isdir(dir_name):
1923       try:
1924         f_names = utils.ListVisibleFiles(dir_name)
1925       except EnvironmentError, err:
1926         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1927         break
1928       for name in f_names:
1929         os_path = utils.PathJoin(dir_name, name)
1930         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1931         if status:
1932           diagnose = ""
1933           variants = os_inst.supported_variants
1934           parameters = os_inst.supported_parameters
1935           api_versions = os_inst.api_versions
1936         else:
1937           diagnose = os_inst
1938           variants = parameters = api_versions = []
1939         result.append((name, os_path, status, diagnose, variants,
1940                        parameters, api_versions))
1941
1942   return result
1943
1944
1945 def _TryOSFromDisk(name, base_dir=None):
1946   """Create an OS instance from disk.
1947
1948   This function will return an OS instance if the given name is a
1949   valid OS name.
1950
1951   @type base_dir: string
1952   @keyword base_dir: Base directory containing OS installations.
1953                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1954   @rtype: tuple
1955   @return: success and either the OS instance if we find a valid one,
1956       or error message
1957
1958   """
1959   if base_dir is None:
1960     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1961   else:
1962     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1963
1964   if os_dir is None:
1965     return False, "Directory for OS %s not found in search path" % name
1966
1967   status, api_versions = _OSOndiskAPIVersion(os_dir)
1968   if not status:
1969     # push the error up
1970     return status, api_versions
1971
1972   if not constants.OS_API_VERSIONS.intersection(api_versions):
1973     return False, ("API version mismatch for path '%s': found %s, want %s." %
1974                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1975
1976   # OS Files dictionary, we will populate it with the absolute path names
1977   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1978
1979   if max(api_versions) >= constants.OS_API_V15:
1980     os_files[constants.OS_VARIANTS_FILE] = ''
1981
1982   if max(api_versions) >= constants.OS_API_V20:
1983     os_files[constants.OS_PARAMETERS_FILE] = ''
1984   else:
1985     del os_files[constants.OS_SCRIPT_VERIFY]
1986
1987   for filename in os_files:
1988     os_files[filename] = utils.PathJoin(os_dir, filename)
1989
1990     try:
1991       st = os.stat(os_files[filename])
1992     except EnvironmentError, err:
1993       return False, ("File '%s' under path '%s' is missing (%s)" %
1994                      (filename, os_dir, _ErrnoOrStr(err)))
1995
1996     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1997       return False, ("File '%s' under path '%s' is not a regular file" %
1998                      (filename, os_dir))
1999
2000     if filename in constants.OS_SCRIPTS:
2001       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2002         return False, ("File '%s' under path '%s' is not executable" %
2003                        (filename, os_dir))
2004
2005   variants = []
2006   if constants.OS_VARIANTS_FILE in os_files:
2007     variants_file = os_files[constants.OS_VARIANTS_FILE]
2008     try:
2009       variants = utils.ReadFile(variants_file).splitlines()
2010     except EnvironmentError, err:
2011       return False, ("Error while reading the OS variants file at %s: %s" %
2012                      (variants_file, _ErrnoOrStr(err)))
2013     if not variants:
2014       return False, ("No supported os variant found")
2015
2016   parameters = []
2017   if constants.OS_PARAMETERS_FILE in os_files:
2018     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2019     try:
2020       parameters = utils.ReadFile(parameters_file).splitlines()
2021     except EnvironmentError, err:
2022       return False, ("Error while reading the OS parameters file at %s: %s" %
2023                      (parameters_file, _ErrnoOrStr(err)))
2024     parameters = [v.split(None, 1) for v in parameters]
2025
2026   os_obj = objects.OS(name=name, path=os_dir,
2027                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2028                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2029                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2030                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2031                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2032                                                  None),
2033                       supported_variants=variants,
2034                       supported_parameters=parameters,
2035                       api_versions=api_versions)
2036   return True, os_obj
2037
2038
2039 def OSFromDisk(name, base_dir=None):
2040   """Create an OS instance from disk.
2041
2042   This function will return an OS instance if the given name is a
2043   valid OS name. Otherwise, it will raise an appropriate
2044   L{RPCFail} exception, detailing why this is not a valid OS.
2045
2046   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2047   an exception but returns true/false status data.
2048
2049   @type base_dir: string
2050   @keyword base_dir: Base directory containing OS installations.
2051                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2052   @rtype: L{objects.OS}
2053   @return: the OS instance if we find a valid one
2054   @raise RPCFail: if we don't find a valid OS
2055
2056   """
2057   name_only = objects.OS.GetName(name)
2058   status, payload = _TryOSFromDisk(name_only, base_dir)
2059
2060   if not status:
2061     _Fail(payload)
2062
2063   return payload
2064
2065
2066 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2067   """Calculate the basic environment for an os script.
2068
2069   @type os_name: str
2070   @param os_name: full operating system name (including variant)
2071   @type inst_os: L{objects.OS}
2072   @param inst_os: operating system for which the environment is being built
2073   @type os_params: dict
2074   @param os_params: the OS parameters
2075   @type debug: integer
2076   @param debug: debug level (0 or 1, for OS Api 10)
2077   @rtype: dict
2078   @return: dict of environment variables
2079   @raise errors.BlockDeviceError: if the block device
2080       cannot be found
2081
2082   """
2083   result = {}
2084   api_version = \
2085     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2086   result['OS_API_VERSION'] = '%d' % api_version
2087   result['OS_NAME'] = inst_os.name
2088   result['DEBUG_LEVEL'] = '%d' % debug
2089
2090   # OS variants
2091   if api_version >= constants.OS_API_V15:
2092     variant = objects.OS.GetVariant(os_name)
2093     if not variant:
2094       variant = inst_os.supported_variants[0]
2095     result['OS_VARIANT'] = variant
2096
2097   # OS params
2098   for pname, pvalue in os_params.items():
2099     result['OSP_%s' % pname.upper()] = pvalue
2100
2101   return result
2102
2103
2104 def OSEnvironment(instance, inst_os, debug=0):
2105   """Calculate the environment for an os script.
2106
2107   @type instance: L{objects.Instance}
2108   @param instance: target instance for the os script run
2109   @type inst_os: L{objects.OS}
2110   @param inst_os: operating system for which the environment is being built
2111   @type debug: integer
2112   @param debug: debug level (0 or 1, for OS Api 10)
2113   @rtype: dict
2114   @return: dict of environment variables
2115   @raise errors.BlockDeviceError: if the block device
2116       cannot be found
2117
2118   """
2119   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2120
2121   for attr in ["name", "os", "uuid", "ctime", "mtime"]:
2122     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2123
2124   result['HYPERVISOR'] = instance.hypervisor
2125   result['DISK_COUNT'] = '%d' % len(instance.disks)
2126   result['NIC_COUNT'] = '%d' % len(instance.nics)
2127
2128   # Disks
2129   for idx, disk in enumerate(instance.disks):
2130     real_disk = _OpenRealBD(disk)
2131     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2132     result['DISK_%d_ACCESS' % idx] = disk.mode
2133     if constants.HV_DISK_TYPE in instance.hvparams:
2134       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2135         instance.hvparams[constants.HV_DISK_TYPE]
2136     if disk.dev_type in constants.LDS_BLOCK:
2137       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2138     elif disk.dev_type == constants.LD_FILE:
2139       result['DISK_%d_BACKEND_TYPE' % idx] = \
2140         'file:%s' % disk.physical_id[0]
2141
2142   # NICs
2143   for idx, nic in enumerate(instance.nics):
2144     result['NIC_%d_MAC' % idx] = nic.mac
2145     if nic.ip:
2146       result['NIC_%d_IP' % idx] = nic.ip
2147     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2148     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2149       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2150     if nic.nicparams[constants.NIC_LINK]:
2151       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2152     if constants.HV_NIC_TYPE in instance.hvparams:
2153       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2154         instance.hvparams[constants.HV_NIC_TYPE]
2155
2156   # HV/BE params
2157   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2158     for key, value in source.items():
2159       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2160
2161   return result
2162
2163
2164 def BlockdevGrow(disk, amount):
2165   """Grow a stack of block devices.
2166
2167   This function is called recursively, with the childrens being the
2168   first ones to resize.
2169
2170   @type disk: L{objects.Disk}
2171   @param disk: the disk to be grown
2172   @rtype: (status, result)
2173   @return: a tuple with the status of the operation
2174       (True/False), and the errors message if status
2175       is False
2176
2177   """
2178   r_dev = _RecursiveFindBD(disk)
2179   if r_dev is None:
2180     _Fail("Cannot find block device %s", disk)
2181
2182   try:
2183     r_dev.Grow(amount)
2184   except errors.BlockDeviceError, err:
2185     _Fail("Failed to grow block device: %s", err, exc=True)
2186
2187
2188 def BlockdevSnapshot(disk):
2189   """Create a snapshot copy of a block device.
2190
2191   This function is called recursively, and the snapshot is actually created
2192   just for the leaf lvm backend device.
2193
2194   @type disk: L{objects.Disk}
2195   @param disk: the disk to be snapshotted
2196   @rtype: string
2197   @return: snapshot disk ID as (vg, lv)
2198
2199   """
2200   if disk.dev_type == constants.LD_DRBD8:
2201     if not disk.children:
2202       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2203             disk.unique_id)
2204     return BlockdevSnapshot(disk.children[0])
2205   elif disk.dev_type == constants.LD_LV:
2206     r_dev = _RecursiveFindBD(disk)
2207     if r_dev is not None:
2208       # FIXME: choose a saner value for the snapshot size
2209       # let's stay on the safe side and ask for the full size, for now
2210       return r_dev.Snapshot(disk.size)
2211     else:
2212       _Fail("Cannot find block device %s", disk)
2213   else:
2214     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2215           disk.unique_id, disk.dev_type)
2216
2217
2218 def FinalizeExport(instance, snap_disks):
2219   """Write out the export configuration information.
2220
2221   @type instance: L{objects.Instance}
2222   @param instance: the instance which we export, used for
2223       saving configuration
2224   @type snap_disks: list of L{objects.Disk}
2225   @param snap_disks: list of snapshot block devices, which
2226       will be used to get the actual name of the dump file
2227
2228   @rtype: None
2229
2230   """
2231   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2232   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2233
2234   config = objects.SerializableConfigParser()
2235
2236   config.add_section(constants.INISECT_EXP)
2237   config.set(constants.INISECT_EXP, 'version', '0')
2238   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2239   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2240   config.set(constants.INISECT_EXP, 'os', instance.os)
2241   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2242
2243   config.add_section(constants.INISECT_INS)
2244   config.set(constants.INISECT_INS, 'name', instance.name)
2245   config.set(constants.INISECT_INS, 'memory', '%d' %
2246              instance.beparams[constants.BE_MEMORY])
2247   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2248              instance.beparams[constants.BE_VCPUS])
2249   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2250   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2251
2252   nic_total = 0
2253   for nic_count, nic in enumerate(instance.nics):
2254     nic_total += 1
2255     config.set(constants.INISECT_INS, 'nic%d_mac' %
2256                nic_count, '%s' % nic.mac)
2257     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2258     for param in constants.NICS_PARAMETER_TYPES:
2259       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2260                  '%s' % nic.nicparams.get(param, None))
2261   # TODO: redundant: on load can read nics until it doesn't exist
2262   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2263
2264   disk_total = 0
2265   for disk_count, disk in enumerate(snap_disks):
2266     if disk:
2267       disk_total += 1
2268       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2269                  ('%s' % disk.iv_name))
2270       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2271                  ('%s' % disk.physical_id[1]))
2272       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2273                  ('%d' % disk.size))
2274
2275   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2276
2277   # New-style hypervisor/backend parameters
2278
2279   config.add_section(constants.INISECT_HYP)
2280   for name, value in instance.hvparams.items():
2281     if name not in constants.HVC_GLOBALS:
2282       config.set(constants.INISECT_HYP, name, str(value))
2283
2284   config.add_section(constants.INISECT_BEP)
2285   for name, value in instance.beparams.items():
2286     config.set(constants.INISECT_BEP, name, str(value))
2287
2288   config.add_section(constants.INISECT_OSP)
2289   for name, value in instance.osparams.items():
2290     config.set(constants.INISECT_OSP, name, str(value))
2291
2292   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2293                   data=config.Dumps())
2294   shutil.rmtree(finaldestdir, ignore_errors=True)
2295   shutil.move(destdir, finaldestdir)
2296
2297
2298 def ExportInfo(dest):
2299   """Get export configuration information.
2300
2301   @type dest: str
2302   @param dest: directory containing the export
2303
2304   @rtype: L{objects.SerializableConfigParser}
2305   @return: a serializable config file containing the
2306       export info
2307
2308   """
2309   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2310
2311   config = objects.SerializableConfigParser()
2312   config.read(cff)
2313
2314   if (not config.has_section(constants.INISECT_EXP) or
2315       not config.has_section(constants.INISECT_INS)):
2316     _Fail("Export info file doesn't have the required fields")
2317
2318   return config.Dumps()
2319
2320
2321 def ListExports():
2322   """Return a list of exports currently available on this machine.
2323
2324   @rtype: list
2325   @return: list of the exports
2326
2327   """
2328   if os.path.isdir(constants.EXPORT_DIR):
2329     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2330   else:
2331     _Fail("No exports directory")
2332
2333
2334 def RemoveExport(export):
2335   """Remove an existing export from the node.
2336
2337   @type export: str
2338   @param export: the name of the export to remove
2339   @rtype: None
2340
2341   """
2342   target = utils.PathJoin(constants.EXPORT_DIR, export)
2343
2344   try:
2345     shutil.rmtree(target)
2346   except EnvironmentError, err:
2347     _Fail("Error while removing the export: %s", err, exc=True)
2348
2349
2350 def BlockdevRename(devlist):
2351   """Rename a list of block devices.
2352
2353   @type devlist: list of tuples
2354   @param devlist: list of tuples of the form  (disk,
2355       new_logical_id, new_physical_id); disk is an
2356       L{objects.Disk} object describing the current disk,
2357       and new logical_id/physical_id is the name we
2358       rename it to
2359   @rtype: boolean
2360   @return: True if all renames succeeded, False otherwise
2361
2362   """
2363   msgs = []
2364   result = True
2365   for disk, unique_id in devlist:
2366     dev = _RecursiveFindBD(disk)
2367     if dev is None:
2368       msgs.append("Can't find device %s in rename" % str(disk))
2369       result = False
2370       continue
2371     try:
2372       old_rpath = dev.dev_path
2373       dev.Rename(unique_id)
2374       new_rpath = dev.dev_path
2375       if old_rpath != new_rpath:
2376         DevCacheManager.RemoveCache(old_rpath)
2377         # FIXME: we should add the new cache information here, like:
2378         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2379         # but we don't have the owner here - maybe parse from existing
2380         # cache? for now, we only lose lvm data when we rename, which
2381         # is less critical than DRBD or MD
2382     except errors.BlockDeviceError, err:
2383       msgs.append("Can't rename device '%s' to '%s': %s" %
2384                   (dev, unique_id, err))
2385       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2386       result = False
2387   if not result:
2388     _Fail("; ".join(msgs))
2389
2390
2391 def _TransformFileStorageDir(file_storage_dir):
2392   """Checks whether given file_storage_dir is valid.
2393
2394   Checks wheter the given file_storage_dir is within the cluster-wide
2395   default file_storage_dir stored in SimpleStore. Only paths under that
2396   directory are allowed.
2397
2398   @type file_storage_dir: str
2399   @param file_storage_dir: the path to check
2400
2401   @return: the normalized path if valid, None otherwise
2402
2403   """
2404   if not constants.ENABLE_FILE_STORAGE:
2405     _Fail("File storage disabled at configure time")
2406   cfg = _GetConfig()
2407   file_storage_dir = os.path.normpath(file_storage_dir)
2408   base_file_storage_dir = cfg.GetFileStorageDir()
2409   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2410       base_file_storage_dir):
2411     _Fail("File storage directory '%s' is not under base file"
2412           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2413   return file_storage_dir
2414
2415
2416 def CreateFileStorageDir(file_storage_dir):
2417   """Create file storage directory.
2418
2419   @type file_storage_dir: str
2420   @param file_storage_dir: directory to create
2421
2422   @rtype: tuple
2423   @return: tuple with first element a boolean indicating wheter dir
2424       creation was successful or not
2425
2426   """
2427   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2428   if os.path.exists(file_storage_dir):
2429     if not os.path.isdir(file_storage_dir):
2430       _Fail("Specified storage dir '%s' is not a directory",
2431             file_storage_dir)
2432   else:
2433     try:
2434       os.makedirs(file_storage_dir, 0750)
2435     except OSError, err:
2436       _Fail("Cannot create file storage directory '%s': %s",
2437             file_storage_dir, err, exc=True)
2438
2439
2440 def RemoveFileStorageDir(file_storage_dir):
2441   """Remove file storage directory.
2442
2443   Remove it only if it's empty. If not log an error and return.
2444
2445   @type file_storage_dir: str
2446   @param file_storage_dir: the directory we should cleanup
2447   @rtype: tuple (success,)
2448   @return: tuple of one element, C{success}, denoting
2449       whether the operation was successful
2450
2451   """
2452   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2453   if os.path.exists(file_storage_dir):
2454     if not os.path.isdir(file_storage_dir):
2455       _Fail("Specified Storage directory '%s' is not a directory",
2456             file_storage_dir)
2457     # deletes dir only if empty, otherwise we want to fail the rpc call
2458     try:
2459       os.rmdir(file_storage_dir)
2460     except OSError, err:
2461       _Fail("Cannot remove file storage directory '%s': %s",
2462             file_storage_dir, err)
2463
2464
2465 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2466   """Rename the file storage directory.
2467
2468   @type old_file_storage_dir: str
2469   @param old_file_storage_dir: the current path
2470   @type new_file_storage_dir: str
2471   @param new_file_storage_dir: the name we should rename to
2472   @rtype: tuple (success,)
2473   @return: tuple of one element, C{success}, denoting
2474       whether the operation was successful
2475
2476   """
2477   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2478   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2479   if not os.path.exists(new_file_storage_dir):
2480     if os.path.isdir(old_file_storage_dir):
2481       try:
2482         os.rename(old_file_storage_dir, new_file_storage_dir)
2483       except OSError, err:
2484         _Fail("Cannot rename '%s' to '%s': %s",
2485               old_file_storage_dir, new_file_storage_dir, err)
2486     else:
2487       _Fail("Specified storage dir '%s' is not a directory",
2488             old_file_storage_dir)
2489   else:
2490     if os.path.exists(old_file_storage_dir):
2491       _Fail("Cannot rename '%s' to '%s': both locations exist",
2492             old_file_storage_dir, new_file_storage_dir)
2493
2494
2495 def _EnsureJobQueueFile(file_name):
2496   """Checks whether the given filename is in the queue directory.
2497
2498   @type file_name: str
2499   @param file_name: the file name we should check
2500   @rtype: None
2501   @raises RPCFail: if the file is not valid
2502
2503   """
2504   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2505   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2506
2507   if not result:
2508     _Fail("Passed job queue file '%s' does not belong to"
2509           " the queue directory '%s'", file_name, queue_dir)
2510
2511
2512 def JobQueueUpdate(file_name, content):
2513   """Updates a file in the queue directory.
2514
2515   This is just a wrapper over L{utils.io.WriteFile}, with proper
2516   checking.
2517
2518   @type file_name: str
2519   @param file_name: the job file name
2520   @type content: str
2521   @param content: the new job contents
2522   @rtype: boolean
2523   @return: the success of the operation
2524
2525   """
2526   _EnsureJobQueueFile(file_name)
2527   getents = runtime.GetEnts()
2528
2529   # Write and replace the file atomically
2530   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2531                   gid=getents.masterd_gid)
2532
2533
2534 def JobQueueRename(old, new):
2535   """Renames a job queue file.
2536
2537   This is just a wrapper over os.rename with proper checking.
2538
2539   @type old: str
2540   @param old: the old (actual) file name
2541   @type new: str
2542   @param new: the desired file name
2543   @rtype: tuple
2544   @return: the success of the operation and payload
2545
2546   """
2547   _EnsureJobQueueFile(old)
2548   _EnsureJobQueueFile(new)
2549
2550   utils.RenameFile(old, new, mkdir=True)
2551
2552
2553 def BlockdevClose(instance_name, disks):
2554   """Closes the given block devices.
2555
2556   This means they will be switched to secondary mode (in case of
2557   DRBD).
2558
2559   @param instance_name: if the argument is not empty, the symlinks
2560       of this instance will be removed
2561   @type disks: list of L{objects.Disk}
2562   @param disks: the list of disks to be closed
2563   @rtype: tuple (success, message)
2564   @return: a tuple of success and message, where success
2565       indicates the succes of the operation, and message
2566       which will contain the error details in case we
2567       failed
2568
2569   """
2570   bdevs = []
2571   for cf in disks:
2572     rd = _RecursiveFindBD(cf)
2573     if rd is None:
2574       _Fail("Can't find device %s", cf)
2575     bdevs.append(rd)
2576
2577   msg = []
2578   for rd in bdevs:
2579     try:
2580       rd.Close()
2581     except errors.BlockDeviceError, err:
2582       msg.append(str(err))
2583   if msg:
2584     _Fail("Can't make devices secondary: %s", ",".join(msg))
2585   else:
2586     if instance_name:
2587       _RemoveBlockDevLinks(instance_name, disks)
2588
2589
2590 def ValidateHVParams(hvname, hvparams):
2591   """Validates the given hypervisor parameters.
2592
2593   @type hvname: string
2594   @param hvname: the hypervisor name
2595   @type hvparams: dict
2596   @param hvparams: the hypervisor parameters to be validated
2597   @rtype: None
2598
2599   """
2600   try:
2601     hv_type = hypervisor.GetHypervisor(hvname)
2602     hv_type.ValidateParameters(hvparams)
2603   except errors.HypervisorError, err:
2604     _Fail(str(err), log=False)
2605
2606
2607 def _CheckOSPList(os_obj, parameters):
2608   """Check whether a list of parameters is supported by the OS.
2609
2610   @type os_obj: L{objects.OS}
2611   @param os_obj: OS object to check
2612   @type parameters: list
2613   @param parameters: the list of parameters to check
2614
2615   """
2616   supported = [v[0] for v in os_obj.supported_parameters]
2617   delta = frozenset(parameters).difference(supported)
2618   if delta:
2619     _Fail("The following parameters are not supported"
2620           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2621
2622
2623 def ValidateOS(required, osname, checks, osparams):
2624   """Validate the given OS' parameters.
2625
2626   @type required: boolean
2627   @param required: whether absence of the OS should translate into
2628       failure or not
2629   @type osname: string
2630   @param osname: the OS to be validated
2631   @type checks: list
2632   @param checks: list of the checks to run (currently only 'parameters')
2633   @type osparams: dict
2634   @param osparams: dictionary with OS parameters
2635   @rtype: boolean
2636   @return: True if the validation passed, or False if the OS was not
2637       found and L{required} was false
2638
2639   """
2640   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2641     _Fail("Unknown checks required for OS %s: %s", osname,
2642           set(checks).difference(constants.OS_VALIDATE_CALLS))
2643
2644   name_only = objects.OS.GetName(osname)
2645   status, tbv = _TryOSFromDisk(name_only, None)
2646
2647   if not status:
2648     if required:
2649       _Fail(tbv)
2650     else:
2651       return False
2652
2653   if max(tbv.api_versions) < constants.OS_API_V20:
2654     return True
2655
2656   if constants.OS_VALIDATE_PARAMETERS in checks:
2657     _CheckOSPList(tbv, osparams.keys())
2658
2659   validate_env = OSCoreEnv(osname, tbv, osparams)
2660   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2661                         cwd=tbv.path)
2662   if result.failed:
2663     logging.error("os validate command '%s' returned error: %s output: %s",
2664                   result.cmd, result.fail_reason, result.output)
2665     _Fail("OS validation script failed (%s), output: %s",
2666           result.fail_reason, result.output, log=False)
2667
2668   return True
2669
2670
2671 def DemoteFromMC():
2672   """Demotes the current node from master candidate role.
2673
2674   """
2675   # try to ensure we're not the master by mistake
2676   master, myself = ssconf.GetMasterAndMyself()
2677   if master == myself:
2678     _Fail("ssconf status shows I'm the master node, will not demote")
2679
2680   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2681   if not result.failed:
2682     _Fail("The master daemon is running, will not demote")
2683
2684   try:
2685     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2686       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2687   except EnvironmentError, err:
2688     if err.errno != errno.ENOENT:
2689       _Fail("Error while backing up cluster file: %s", err, exc=True)
2690
2691   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2692
2693
2694 def _GetX509Filenames(cryptodir, name):
2695   """Returns the full paths for the private key and certificate.
2696
2697   """
2698   return (utils.PathJoin(cryptodir, name),
2699           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2700           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2701
2702
2703 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2704   """Creates a new X509 certificate for SSL/TLS.
2705
2706   @type validity: int
2707   @param validity: Validity in seconds
2708   @rtype: tuple; (string, string)
2709   @return: Certificate name and public part
2710
2711   """
2712   (key_pem, cert_pem) = \
2713     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2714                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2715
2716   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2717                               prefix="x509-%s-" % utils.TimestampForFilename())
2718   try:
2719     name = os.path.basename(cert_dir)
2720     assert len(name) > 5
2721
2722     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2723
2724     utils.WriteFile(key_file, mode=0400, data=key_pem)
2725     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2726
2727     # Never return private key as it shouldn't leave the node
2728     return (name, cert_pem)
2729   except Exception:
2730     shutil.rmtree(cert_dir, ignore_errors=True)
2731     raise
2732
2733
2734 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2735   """Removes a X509 certificate.
2736
2737   @type name: string
2738   @param name: Certificate name
2739
2740   """
2741   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2742
2743   utils.RemoveFile(key_file)
2744   utils.RemoveFile(cert_file)
2745
2746   try:
2747     os.rmdir(cert_dir)
2748   except EnvironmentError, err:
2749     _Fail("Cannot remove certificate directory '%s': %s",
2750           cert_dir, err)
2751
2752
2753 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2754   """Returns the command for the requested input/output.
2755
2756   @type instance: L{objects.Instance}
2757   @param instance: The instance object
2758   @param mode: Import/export mode
2759   @param ieio: Input/output type
2760   @param ieargs: Input/output arguments
2761
2762   """
2763   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2764
2765   env = None
2766   prefix = None
2767   suffix = None
2768   exp_size = None
2769
2770   if ieio == constants.IEIO_FILE:
2771     (filename, ) = ieargs
2772
2773     if not utils.IsNormAbsPath(filename):
2774       _Fail("Path '%s' is not normalized or absolute", filename)
2775
2776     directory = os.path.normpath(os.path.dirname(filename))
2777
2778     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2779         constants.EXPORT_DIR):
2780       _Fail("File '%s' is not under exports directory '%s'",
2781             filename, constants.EXPORT_DIR)
2782
2783     # Create directory
2784     utils.Makedirs(directory, mode=0750)
2785
2786     quoted_filename = utils.ShellQuote(filename)
2787
2788     if mode == constants.IEM_IMPORT:
2789       suffix = "> %s" % quoted_filename
2790     elif mode == constants.IEM_EXPORT:
2791       suffix = "< %s" % quoted_filename
2792
2793       # Retrieve file size
2794       try:
2795         st = os.stat(filename)
2796       except EnvironmentError, err:
2797         logging.error("Can't stat(2) %s: %s", filename, err)
2798       else:
2799         exp_size = utils.BytesToMebibyte(st.st_size)
2800
2801   elif ieio == constants.IEIO_RAW_DISK:
2802     (disk, ) = ieargs
2803
2804     real_disk = _OpenRealBD(disk)
2805
2806     if mode == constants.IEM_IMPORT:
2807       # we set here a smaller block size as, due to transport buffering, more
2808       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2809       # is not already there or we pass a wrong path; we use notrunc to no
2810       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2811       # much memory; this means that at best, we flush every 64k, which will
2812       # not be very fast
2813       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2814                                     " bs=%s oflag=dsync"),
2815                                     real_disk.dev_path,
2816                                     str(64 * 1024))
2817
2818     elif mode == constants.IEM_EXPORT:
2819       # the block size on the read dd is 1MiB to match our units
2820       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2821                                    real_disk.dev_path,
2822                                    str(1024 * 1024), # 1 MB
2823                                    str(disk.size))
2824       exp_size = disk.size
2825
2826   elif ieio == constants.IEIO_SCRIPT:
2827     (disk, disk_index, ) = ieargs
2828
2829     assert isinstance(disk_index, (int, long))
2830
2831     real_disk = _OpenRealBD(disk)
2832
2833     inst_os = OSFromDisk(instance.os)
2834     env = OSEnvironment(instance, inst_os)
2835
2836     if mode == constants.IEM_IMPORT:
2837       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2838       env["IMPORT_INDEX"] = str(disk_index)
2839       script = inst_os.import_script
2840
2841     elif mode == constants.IEM_EXPORT:
2842       env["EXPORT_DEVICE"] = real_disk.dev_path
2843       env["EXPORT_INDEX"] = str(disk_index)
2844       script = inst_os.export_script
2845
2846     # TODO: Pass special environment only to script
2847     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2848
2849     if mode == constants.IEM_IMPORT:
2850       suffix = "| %s" % script_cmd
2851
2852     elif mode == constants.IEM_EXPORT:
2853       prefix = "%s |" % script_cmd
2854
2855     # Let script predict size
2856     exp_size = constants.IE_CUSTOM_SIZE
2857
2858   else:
2859     _Fail("Invalid %s I/O mode %r", mode, ieio)
2860
2861   return (env, prefix, suffix, exp_size)
2862
2863
2864 def _CreateImportExportStatusDir(prefix):
2865   """Creates status directory for import/export.
2866
2867   """
2868   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2869                           prefix=("%s-%s-" %
2870                                   (prefix, utils.TimestampForFilename())))
2871
2872
2873 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2874   """Starts an import or export daemon.
2875
2876   @param mode: Import/output mode
2877   @type opts: L{objects.ImportExportOptions}
2878   @param opts: Daemon options
2879   @type host: string
2880   @param host: Remote host for export (None for import)
2881   @type port: int
2882   @param port: Remote port for export (None for import)
2883   @type instance: L{objects.Instance}
2884   @param instance: Instance object
2885   @param ieio: Input/output type
2886   @param ieioargs: Input/output arguments
2887
2888   """
2889   if mode == constants.IEM_IMPORT:
2890     prefix = "import"
2891
2892     if not (host is None and port is None):
2893       _Fail("Can not specify host or port on import")
2894
2895   elif mode == constants.IEM_EXPORT:
2896     prefix = "export"
2897
2898     if host is None or port is None:
2899       _Fail("Host and port must be specified for an export")
2900
2901   else:
2902     _Fail("Invalid mode %r", mode)
2903
2904   if (opts.key_name is None) ^ (opts.ca_pem is None):
2905     _Fail("Cluster certificate can only be used for both key and CA")
2906
2907   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2908     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2909
2910   if opts.key_name is None:
2911     # Use server.pem
2912     key_path = constants.NODED_CERT_FILE
2913     cert_path = constants.NODED_CERT_FILE
2914     assert opts.ca_pem is None
2915   else:
2916     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2917                                                  opts.key_name)
2918     assert opts.ca_pem is not None
2919
2920   for i in [key_path, cert_path]:
2921     if not os.path.exists(i):
2922       _Fail("File '%s' does not exist" % i)
2923
2924   status_dir = _CreateImportExportStatusDir(prefix)
2925   try:
2926     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2927     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2928     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2929
2930     if opts.ca_pem is None:
2931       # Use server.pem
2932       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2933     else:
2934       ca = opts.ca_pem
2935
2936     # Write CA file
2937     utils.WriteFile(ca_file, data=ca, mode=0400)
2938
2939     cmd = [
2940       constants.IMPORT_EXPORT_DAEMON,
2941       status_file, mode,
2942       "--key=%s" % key_path,
2943       "--cert=%s" % cert_path,
2944       "--ca=%s" % ca_file,
2945       ]
2946
2947     if host:
2948       cmd.append("--host=%s" % host)
2949
2950     if port:
2951       cmd.append("--port=%s" % port)
2952
2953     if opts.ipv6:
2954       cmd.append("--ipv6")
2955     else:
2956       cmd.append("--ipv4")
2957
2958     if opts.compress:
2959       cmd.append("--compress=%s" % opts.compress)
2960
2961     if opts.magic:
2962       cmd.append("--magic=%s" % opts.magic)
2963
2964     if exp_size is not None:
2965       cmd.append("--expected-size=%s" % exp_size)
2966
2967     if cmd_prefix:
2968       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2969
2970     if cmd_suffix:
2971       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2972
2973     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2974
2975     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2976     # support for receiving a file descriptor for output
2977     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2978                       output=logfile)
2979
2980     # The import/export name is simply the status directory name
2981     return os.path.basename(status_dir)
2982
2983   except Exception:
2984     shutil.rmtree(status_dir, ignore_errors=True)
2985     raise
2986
2987
2988 def GetImportExportStatus(names):
2989   """Returns import/export daemon status.
2990
2991   @type names: sequence
2992   @param names: List of names
2993   @rtype: List of dicts
2994   @return: Returns a list of the state of each named import/export or None if a
2995            status couldn't be read
2996
2997   """
2998   result = []
2999
3000   for name in names:
3001     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3002                                  _IES_STATUS_FILE)
3003
3004     try:
3005       data = utils.ReadFile(status_file)
3006     except EnvironmentError, err:
3007       if err.errno != errno.ENOENT:
3008         raise
3009       data = None
3010
3011     if not data:
3012       result.append(None)
3013       continue
3014
3015     result.append(serializer.LoadJson(data))
3016
3017   return result
3018
3019
3020 def AbortImportExport(name):
3021   """Sends SIGTERM to a running import/export daemon.
3022
3023   """
3024   logging.info("Abort import/export %s", name)
3025
3026   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3027   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3028
3029   if pid:
3030     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3031                  name, pid)
3032     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3033
3034
3035 def CleanupImportExport(name):
3036   """Cleanup after an import or export.
3037
3038   If the import/export daemon is still running it's killed. Afterwards the
3039   whole status directory is removed.
3040
3041   """
3042   logging.info("Finalizing import/export %s", name)
3043
3044   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3045
3046   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3047
3048   if pid:
3049     logging.info("Import/export %s is still running with PID %s",
3050                  name, pid)
3051     utils.KillProcess(pid, waitpid=False)
3052
3053   shutil.rmtree(status_dir, ignore_errors=True)
3054
3055
3056 def _FindDisks(nodes_ip, disks):
3057   """Sets the physical ID on disks and returns the block devices.
3058
3059   """
3060   # set the correct physical ID
3061   my_name = netutils.Hostname.GetSysName()
3062   for cf in disks:
3063     cf.SetPhysicalID(my_name, nodes_ip)
3064
3065   bdevs = []
3066
3067   for cf in disks:
3068     rd = _RecursiveFindBD(cf)
3069     if rd is None:
3070       _Fail("Can't find device %s", cf)
3071     bdevs.append(rd)
3072   return bdevs
3073
3074
3075 def DrbdDisconnectNet(nodes_ip, disks):
3076   """Disconnects the network on a list of drbd devices.
3077
3078   """
3079   bdevs = _FindDisks(nodes_ip, disks)
3080
3081   # disconnect disks
3082   for rd in bdevs:
3083     try:
3084       rd.DisconnectNet()
3085     except errors.BlockDeviceError, err:
3086       _Fail("Can't change network configuration to standalone mode: %s",
3087             err, exc=True)
3088
3089
3090 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3091   """Attaches the network on a list of drbd devices.
3092
3093   """
3094   bdevs = _FindDisks(nodes_ip, disks)
3095
3096   if multimaster:
3097     for idx, rd in enumerate(bdevs):
3098       try:
3099         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3100       except EnvironmentError, err:
3101         _Fail("Can't create symlink: %s", err)
3102   # reconnect disks, switch to new master configuration and if
3103   # needed primary mode
3104   for rd in bdevs:
3105     try:
3106       rd.AttachNet(multimaster)
3107     except errors.BlockDeviceError, err:
3108       _Fail("Can't change network configuration: %s", err)
3109
3110   # wait until the disks are connected; we need to retry the re-attach
3111   # if the device becomes standalone, as this might happen if the one
3112   # node disconnects and reconnects in a different mode before the
3113   # other node reconnects; in this case, one or both of the nodes will
3114   # decide it has wrong configuration and switch to standalone
3115
3116   def _Attach():
3117     all_connected = True
3118
3119     for rd in bdevs:
3120       stats = rd.GetProcStatus()
3121
3122       all_connected = (all_connected and
3123                        (stats.is_connected or stats.is_in_resync))
3124
3125       if stats.is_standalone:
3126         # peer had different config info and this node became
3127         # standalone, even though this should not happen with the
3128         # new staged way of changing disk configs
3129         try:
3130           rd.AttachNet(multimaster)
3131         except errors.BlockDeviceError, err:
3132           _Fail("Can't change network configuration: %s", err)
3133
3134     if not all_connected:
3135       raise utils.RetryAgain()
3136
3137   try:
3138     # Start with a delay of 100 miliseconds and go up to 5 seconds
3139     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3140   except utils.RetryTimeout:
3141     _Fail("Timeout in disk reconnecting")
3142
3143   if multimaster:
3144     # change to primary mode
3145     for rd in bdevs:
3146       try:
3147         rd.Open()
3148       except errors.BlockDeviceError, err:
3149         _Fail("Can't change to primary mode: %s", err)
3150
3151
3152 def DrbdWaitSync(nodes_ip, disks):
3153   """Wait until DRBDs have synchronized.
3154
3155   """
3156   def _helper(rd):
3157     stats = rd.GetProcStatus()
3158     if not (stats.is_connected or stats.is_in_resync):
3159       raise utils.RetryAgain()
3160     return stats
3161
3162   bdevs = _FindDisks(nodes_ip, disks)
3163
3164   min_resync = 100
3165   alldone = True
3166   for rd in bdevs:
3167     try:
3168       # poll each second for 15 seconds
3169       stats = utils.Retry(_helper, 1, 15, args=[rd])
3170     except utils.RetryTimeout:
3171       stats = rd.GetProcStatus()
3172       # last check
3173       if not (stats.is_connected or stats.is_in_resync):
3174         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3175     alldone = alldone and (not stats.is_in_resync)
3176     if stats.sync_percent is not None:
3177       min_resync = min(min_resync, stats.sync_percent)
3178
3179   return (alldone, min_resync)
3180
3181
3182 def GetDrbdUsermodeHelper():
3183   """Returns DRBD usermode helper currently configured.
3184
3185   """
3186   try:
3187     return bdev.BaseDRBD.GetUsermodeHelper()
3188   except errors.BlockDeviceError, err:
3189     _Fail(str(err))
3190
3191
3192 def PowercycleNode(hypervisor_type):
3193   """Hard-powercycle the node.
3194
3195   Because we need to return first, and schedule the powercycle in the
3196   background, we won't be able to report failures nicely.
3197
3198   """
3199   hyper = hypervisor.GetHypervisor(hypervisor_type)
3200   try:
3201     pid = os.fork()
3202   except OSError:
3203     # if we can't fork, we'll pretend that we're in the child process
3204     pid = 0
3205   if pid > 0:
3206     return "Reboot scheduled in 5 seconds"
3207   # ensure the child is running on ram
3208   try:
3209     utils.Mlockall()
3210   except Exception: # pylint: disable-msg=W0703
3211     pass
3212   time.sleep(5)
3213   hyper.PowercycleNode()
3214
3215
3216 class HooksRunner(object):
3217   """Hook runner.
3218
3219   This class is instantiated on the node side (ganeti-noded) and not
3220   on the master side.
3221
3222   """
3223   def __init__(self, hooks_base_dir=None):
3224     """Constructor for hooks runner.
3225
3226     @type hooks_base_dir: str or None
3227     @param hooks_base_dir: if not None, this overrides the
3228         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3229
3230     """
3231     if hooks_base_dir is None:
3232       hooks_base_dir = constants.HOOKS_BASE_DIR
3233     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3234     # constant
3235     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3236
3237   def RunHooks(self, hpath, phase, env):
3238     """Run the scripts in the hooks directory.
3239
3240     @type hpath: str
3241     @param hpath: the path to the hooks directory which
3242         holds the scripts
3243     @type phase: str
3244     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3245         L{constants.HOOKS_PHASE_POST}
3246     @type env: dict
3247     @param env: dictionary with the environment for the hook
3248     @rtype: list
3249     @return: list of 3-element tuples:
3250       - script path
3251       - script result, either L{constants.HKR_SUCCESS} or
3252         L{constants.HKR_FAIL}
3253       - output of the script
3254
3255     @raise errors.ProgrammerError: for invalid input
3256         parameters
3257
3258     """
3259     if phase == constants.HOOKS_PHASE_PRE:
3260       suffix = "pre"
3261     elif phase == constants.HOOKS_PHASE_POST:
3262       suffix = "post"
3263     else:
3264       _Fail("Unknown hooks phase '%s'", phase)
3265
3266
3267     subdir = "%s-%s.d" % (hpath, suffix)
3268     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3269
3270     results = []
3271
3272     if not os.path.isdir(dir_name):
3273       # for non-existing/non-dirs, we simply exit instead of logging a
3274       # warning at every operation
3275       return results
3276
3277     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3278
3279     for (relname, relstatus, runresult)  in runparts_results:
3280       if relstatus == constants.RUNPARTS_SKIP:
3281         rrval = constants.HKR_SKIP
3282         output = ""
3283       elif relstatus == constants.RUNPARTS_ERR:
3284         rrval = constants.HKR_FAIL
3285         output = "Hook script execution error: %s" % runresult
3286       elif relstatus == constants.RUNPARTS_RUN:
3287         if runresult.failed:
3288           rrval = constants.HKR_FAIL
3289         else:
3290           rrval = constants.HKR_SUCCESS
3291         output = utils.SafeEncode(runresult.output.strip())
3292       results.append(("%s/%s" % (subdir, relname), rrval, output))
3293
3294     return results
3295
3296
3297 class IAllocatorRunner(object):
3298   """IAllocator runner.
3299
3300   This class is instantiated on the node side (ganeti-noded) and not on
3301   the master side.
3302
3303   """
3304   @staticmethod
3305   def Run(name, idata):
3306     """Run an iallocator script.
3307
3308     @type name: str
3309     @param name: the iallocator script name
3310     @type idata: str
3311     @param idata: the allocator input data
3312
3313     @rtype: tuple
3314     @return: two element tuple of:
3315        - status
3316        - either error message or stdout of allocator (for success)
3317
3318     """
3319     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3320                                   os.path.isfile)
3321     if alloc_script is None:
3322       _Fail("iallocator module '%s' not found in the search path", name)
3323
3324     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3325     try:
3326       os.write(fd, idata)
3327       os.close(fd)
3328       result = utils.RunCmd([alloc_script, fin_name])
3329       if result.failed:
3330         _Fail("iallocator module '%s' failed: %s, output '%s'",
3331               name, result.fail_reason, result.output)
3332     finally:
3333       os.unlink(fin_name)
3334
3335     return result.stdout
3336
3337
3338 class DevCacheManager(object):
3339   """Simple class for managing a cache of block device information.
3340
3341   """
3342   _DEV_PREFIX = "/dev/"
3343   _ROOT_DIR = constants.BDEV_CACHE_DIR
3344
3345   @classmethod
3346   def _ConvertPath(cls, dev_path):
3347     """Converts a /dev/name path to the cache file name.
3348
3349     This replaces slashes with underscores and strips the /dev
3350     prefix. It then returns the full path to the cache file.
3351
3352     @type dev_path: str
3353     @param dev_path: the C{/dev/} path name
3354     @rtype: str
3355     @return: the converted path name
3356
3357     """
3358     if dev_path.startswith(cls._DEV_PREFIX):
3359       dev_path = dev_path[len(cls._DEV_PREFIX):]
3360     dev_path = dev_path.replace("/", "_")
3361     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3362     return fpath
3363
3364   @classmethod
3365   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3366     """Updates the cache information for a given device.
3367
3368     @type dev_path: str
3369     @param dev_path: the pathname of the device
3370     @type owner: str
3371     @param owner: the owner (instance name) of the device
3372     @type on_primary: bool
3373     @param on_primary: whether this is the primary
3374         node nor not
3375     @type iv_name: str
3376     @param iv_name: the instance-visible name of the
3377         device, as in objects.Disk.iv_name
3378
3379     @rtype: None
3380
3381     """
3382     if dev_path is None:
3383       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3384       return
3385     fpath = cls._ConvertPath(dev_path)
3386     if on_primary:
3387       state = "primary"
3388     else:
3389       state = "secondary"
3390     if iv_name is None:
3391       iv_name = "not_visible"
3392     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3393     try:
3394       utils.WriteFile(fpath, data=fdata)
3395     except EnvironmentError, err:
3396       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3397
3398   @classmethod
3399   def RemoveCache(cls, dev_path):
3400     """Remove data for a dev_path.
3401
3402     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3403     path name and logging.
3404
3405     @type dev_path: str
3406     @param dev_path: the pathname of the device
3407
3408     @rtype: None
3409
3410     """
3411     if dev_path is None:
3412       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3413       return
3414     fpath = cls._ConvertPath(dev_path)
3415     try:
3416       utils.RemoveFile(fpath)
3417     except EnvironmentError, err:
3418       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)