LUExportInstance: Accept instance already shut down
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79
80 class RPCFail(Exception):
81   """Class denoting RPC failure.
82
83   Its argument is the error message.
84
85   """
86
87
88 def _Fail(msg, *args, **kwargs):
89   """Log an error and the raise an RPCFail exception.
90
91   This exception is then handled specially in the ganeti daemon and
92   turned into a 'failed' return type. As such, this function is a
93   useful shortcut for logging the error and returning it to the master
94   daemon.
95
96   @type msg: string
97   @param msg: the text of the exception
98   @raise RPCFail
99
100   """
101   if args:
102     msg = msg % args
103   if "log" not in kwargs or kwargs["log"]: # if we should log this error
104     if "exc" in kwargs and kwargs["exc"]:
105       logging.exception(msg)
106     else:
107       logging.error(msg)
108   raise RPCFail(msg)
109
110
111 def _GetConfig():
112   """Simple wrapper to return a SimpleStore.
113
114   @rtype: L{ssconf.SimpleStore}
115   @return: a SimpleStore instance
116
117   """
118   return ssconf.SimpleStore()
119
120
121 def _GetSshRunner(cluster_name):
122   """Simple wrapper to return an SshRunner.
123
124   @type cluster_name: str
125   @param cluster_name: the cluster name, which is needed
126       by the SshRunner constructor
127   @rtype: L{ssh.SshRunner}
128   @return: an SshRunner instance
129
130   """
131   return ssh.SshRunner(cluster_name)
132
133
134 def _Decompress(data):
135   """Unpacks data compressed by the RPC client.
136
137   @type data: list or tuple
138   @param data: Data sent by RPC client
139   @rtype: str
140   @return: Decompressed data
141
142   """
143   assert isinstance(data, (list, tuple))
144   assert len(data) == 2
145   (encoding, content) = data
146   if encoding == constants.RPC_ENCODING_NONE:
147     return content
148   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149     return zlib.decompress(base64.b64decode(content))
150   else:
151     raise AssertionError("Unknown data encoding")
152
153
154 def _CleanDirectory(path, exclude=None):
155   """Removes all regular files in a directory.
156
157   @type path: str
158   @param path: the directory to clean
159   @type exclude: list
160   @param exclude: list of files to be excluded, defaults
161       to the empty list
162
163   """
164   if path not in _ALLOWED_CLEAN_DIRS:
165     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
166           path)
167
168   if not os.path.isdir(path):
169     return
170   if exclude is None:
171     exclude = []
172   else:
173     # Normalize excluded paths
174     exclude = [os.path.normpath(i) for i in exclude]
175
176   for rel_name in utils.ListVisibleFiles(path):
177     full_name = utils.PathJoin(path, rel_name)
178     if full_name in exclude:
179       continue
180     if os.path.isfile(full_name) and not os.path.islink(full_name):
181       utils.RemoveFile(full_name)
182
183
184 def _BuildUploadFileList():
185   """Build the list of allowed upload files.
186
187   This is abstracted so that it's built only once at module import time.
188
189   """
190   allowed_files = set([
191     constants.CLUSTER_CONF_FILE,
192     constants.ETC_HOSTS,
193     constants.SSH_KNOWN_HOSTS_FILE,
194     constants.VNC_PASSWORD_FILE,
195     constants.RAPI_CERT_FILE,
196     constants.RAPI_USERS_FILE,
197     constants.CONFD_HMAC_KEY,
198     constants.CLUSTER_DOMAIN_SECRET_FILE,
199     ])
200
201   for hv_name in constants.HYPER_TYPES:
202     hv_class = hypervisor.GetHypervisorClass(hv_name)
203     allowed_files.update(hv_class.GetAncillaryFiles())
204
205   return frozenset(allowed_files)
206
207
208 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
209
210
211 def JobQueuePurge():
212   """Removes job queue files and archived jobs.
213
214   @rtype: tuple
215   @return: True, None
216
217   """
218   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
220
221
222 def GetMasterInfo():
223   """Returns master information.
224
225   This is an utility function to compute master information, either
226   for consumption here or from the node daemon.
227
228   @rtype: tuple
229   @return: master_netdev, master_ip, master_name, primary_ip_family
230   @raise RPCFail: in case of errors
231
232   """
233   try:
234     cfg = _GetConfig()
235     master_netdev = cfg.GetMasterNetdev()
236     master_ip = cfg.GetMasterIP()
237     master_node = cfg.GetMasterNode()
238     primary_ip_family = cfg.GetPrimaryIPFamily()
239   except errors.ConfigurationError, err:
240     _Fail("Cluster configuration incomplete: %s", err, exc=True)
241   return (master_netdev, master_ip, master_node, primary_ip_family)
242
243
244 def StartMaster(start_daemons, no_voting):
245   """Activate local node as master node.
246
247   The function will either try activate the IP address of the master
248   (unless someone else has it) or also start the master daemons, based
249   on the start_daemons parameter.
250
251   @type start_daemons: boolean
252   @param start_daemons: whether to start the master daemons
253       (ganeti-masterd and ganeti-rapi), or (if false) activate the
254       master ip
255   @type no_voting: boolean
256   @param no_voting: whether to start ganeti-masterd without a node vote
257       (if start_daemons is True), but still non-interactively
258   @rtype: None
259
260   """
261   # GetMasterInfo will raise an exception if not able to return data
262   master_netdev, master_ip, _, family = GetMasterInfo()
263
264   err_msgs = []
265   # either start the master and rapi daemons
266   if start_daemons:
267     if no_voting:
268       masterd_args = "--no-voting --yes-do-it"
269     else:
270       masterd_args = ""
271
272     env = {
273       "EXTRA_MASTERD_ARGS": masterd_args,
274       }
275
276     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
277     if result.failed:
278       msg = "Can't start Ganeti master: %s" % result.output
279       logging.error(msg)
280       err_msgs.append(msg)
281   # or activate the IP
282   else:
283     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284       if netutils.IPAddress.Own(master_ip):
285         # we already have the ip:
286         logging.debug("Master IP already configured, doing nothing")
287       else:
288         msg = "Someone else has the master ip, not activating"
289         logging.error(msg)
290         err_msgs.append(msg)
291     else:
292       ipcls = netutils.IP4Address
293       if family == netutils.IP6Address.family:
294         ipcls = netutils.IP6Address
295
296       result = utils.RunCmd(["ip", "address", "add",
297                              "%s/%d" % (master_ip, ipcls.iplen),
298                              "dev", master_netdev, "label",
299                              "%s:0" % master_netdev])
300       if result.failed:
301         msg = "Can't activate master IP: %s" % result.output
302         logging.error(msg)
303         err_msgs.append(msg)
304
305       # we ignore the exit code of the following cmds
306       if ipcls == netutils.IP4Address:
307         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308                       master_ip, master_ip])
309       elif ipcls == netutils.IP6Address:
310         try:
311           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312         except errors.OpExecError:
313           # TODO: Better error reporting
314           logging.warning("Can't execute ndisc6, please install if missing")
315
316   if err_msgs:
317     _Fail("; ".join(err_msgs))
318
319
320 def StopMaster(stop_daemons):
321   """Deactivate this node as master.
322
323   The function will always try to deactivate the IP address of the
324   master. It will also stop the master daemons depending on the
325   stop_daemons parameter.
326
327   @type stop_daemons: boolean
328   @param stop_daemons: whether to also stop the master daemons
329       (ganeti-masterd and ganeti-rapi)
330   @rtype: None
331
332   """
333   # TODO: log and report back to the caller the error failures; we
334   # need to decide in which case we fail the RPC for this
335
336   # GetMasterInfo will raise an exception if not able to return data
337   master_netdev, master_ip, _, family = GetMasterInfo()
338
339   ipcls = netutils.IP4Address
340   if family == netutils.IP6Address.family:
341     ipcls = netutils.IP6Address
342
343   result = utils.RunCmd(["ip", "address", "del",
344                          "%s/%d" % (master_ip, ipcls.iplen),
345                          "dev", master_netdev])
346   if result.failed:
347     logging.error("Can't remove the master IP, error: %s", result.output)
348     # but otherwise ignore the failure
349
350   if stop_daemons:
351     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
352     if result.failed:
353       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
354                     " and error %s",
355                     result.cmd, result.exit_code, result.output)
356
357
358 def EtcHostsModify(mode, host, ip):
359   """Modify a host entry in /etc/hosts.
360
361   @param mode: The mode to operate. Either add or remove entry
362   @param host: The host to operate on
363   @param ip: The ip associated with the entry
364
365   """
366   if mode == constants.ETC_HOSTS_ADD:
367     if not ip:
368       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
369               " present")
370     utils.AddHostToEtcHosts(host, ip)
371   elif mode == constants.ETC_HOSTS_REMOVE:
372     if ip:
373       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374               " parameter is present")
375     utils.RemoveHostFromEtcHosts(host)
376   else:
377     RPCFail("Mode not supported")
378
379
380 def LeaveCluster(modify_ssh_setup):
381   """Cleans up and remove the current node.
382
383   This function cleans up and prepares the current node to be removed
384   from the cluster.
385
386   If processing is successful, then it raises an
387   L{errors.QuitGanetiException} which is used as a special case to
388   shutdown the node daemon.
389
390   @param modify_ssh_setup: boolean
391
392   """
393   _CleanDirectory(constants.DATA_DIR)
394   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
395   JobQueuePurge()
396
397   if modify_ssh_setup:
398     try:
399       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
400
401       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
402
403       utils.RemoveFile(priv_key)
404       utils.RemoveFile(pub_key)
405     except errors.OpExecError:
406       logging.exception("Error while processing ssh files")
407
408   try:
409     utils.RemoveFile(constants.CONFD_HMAC_KEY)
410     utils.RemoveFile(constants.RAPI_CERT_FILE)
411     utils.RemoveFile(constants.NODED_CERT_FILE)
412   except: # pylint: disable-msg=W0702
413     logging.exception("Error while removing cluster secrets")
414
415   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
416   if result.failed:
417     logging.error("Command %s failed with exitcode %s and error %s",
418                   result.cmd, result.exit_code, result.output)
419
420   # Raise a custom exception (handled in ganeti-noded)
421   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
422
423
424 def GetNodeInfo(vgname, hypervisor_type):
425   """Gives back a hash with different information about the node.
426
427   @type vgname: C{string}
428   @param vgname: the name of the volume group to ask for disk space information
429   @type hypervisor_type: C{str}
430   @param hypervisor_type: the name of the hypervisor to ask for
431       memory information
432   @rtype: C{dict}
433   @return: dictionary with the following keys:
434       - vg_size is the size of the configured volume group in MiB
435       - vg_free is the free size of the volume group in MiB
436       - memory_dom0 is the memory allocated for domain0 in MiB
437       - memory_free is the currently available (free) ram in MiB
438       - memory_total is the total number of ram in MiB
439
440   """
441   outputarray = {}
442
443   vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
444   vg_free = vg_size = None
445   if vginfo:
446     vg_free = int(round(vginfo[0][0], 0))
447     vg_size = int(round(vginfo[0][1], 0))
448
449   outputarray['vg_size'] = vg_size
450   outputarray['vg_free'] = vg_free
451
452   hyper = hypervisor.GetHypervisor(hypervisor_type)
453   hyp_info = hyper.GetNodeInfo()
454   if hyp_info is not None:
455     outputarray.update(hyp_info)
456
457   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
458
459   return outputarray
460
461
462 def VerifyNode(what, cluster_name):
463   """Verify the status of the local node.
464
465   Based on the input L{what} parameter, various checks are done on the
466   local node.
467
468   If the I{filelist} key is present, this list of
469   files is checksummed and the file/checksum pairs are returned.
470
471   If the I{nodelist} key is present, we check that we have
472   connectivity via ssh with the target nodes (and check the hostname
473   report).
474
475   If the I{node-net-test} key is present, we check that we have
476   connectivity to the given nodes via both primary IP and, if
477   applicable, secondary IPs.
478
479   @type what: C{dict}
480   @param what: a dictionary of things to check:
481       - filelist: list of files for which to compute checksums
482       - nodelist: list of nodes we should check ssh communication with
483       - node-net-test: list of nodes we should check node daemon port
484         connectivity with
485       - hypervisor: list with hypervisors to run the verify for
486   @rtype: dict
487   @return: a dictionary with the same keys as the input dict, and
488       values representing the result of the checks
489
490   """
491   result = {}
492   my_name = netutils.Hostname.GetSysName()
493   port = netutils.GetDaemonPort(constants.NODED)
494   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
495
496   if constants.NV_HYPERVISOR in what and vm_capable:
497     result[constants.NV_HYPERVISOR] = tmp = {}
498     for hv_name in what[constants.NV_HYPERVISOR]:
499       try:
500         val = hypervisor.GetHypervisor(hv_name).Verify()
501       except errors.HypervisorError, err:
502         val = "Error while checking hypervisor: %s" % str(err)
503       tmp[hv_name] = val
504
505   if constants.NV_FILELIST in what:
506     result[constants.NV_FILELIST] = utils.FingerprintFiles(
507       what[constants.NV_FILELIST])
508
509   if constants.NV_NODELIST in what:
510     result[constants.NV_NODELIST] = tmp = {}
511     random.shuffle(what[constants.NV_NODELIST])
512     for node in what[constants.NV_NODELIST]:
513       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
514       if not success:
515         tmp[node] = message
516
517   if constants.NV_NODENETTEST in what:
518     result[constants.NV_NODENETTEST] = tmp = {}
519     my_pip = my_sip = None
520     for name, pip, sip in what[constants.NV_NODENETTEST]:
521       if name == my_name:
522         my_pip = pip
523         my_sip = sip
524         break
525     if not my_pip:
526       tmp[my_name] = ("Can't find my own primary/secondary IP"
527                       " in the node list")
528     else:
529       for name, pip, sip in what[constants.NV_NODENETTEST]:
530         fail = []
531         if not netutils.TcpPing(pip, port, source=my_pip):
532           fail.append("primary")
533         if sip != pip:
534           if not netutils.TcpPing(sip, port, source=my_sip):
535             fail.append("secondary")
536         if fail:
537           tmp[name] = ("failure using the %s interface(s)" %
538                        " and ".join(fail))
539
540   if constants.NV_MASTERIP in what:
541     # FIXME: add checks on incoming data structures (here and in the
542     # rest of the function)
543     master_name, master_ip = what[constants.NV_MASTERIP]
544     if master_name == my_name:
545       source = constants.IP4_ADDRESS_LOCALHOST
546     else:
547       source = None
548     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
549                                                   source=source)
550
551   if constants.NV_LVLIST in what and vm_capable:
552     try:
553       val = GetVolumeList(what[constants.NV_LVLIST])
554     except RPCFail, err:
555       val = str(err)
556     result[constants.NV_LVLIST] = val
557
558   if constants.NV_INSTANCELIST in what and vm_capable:
559     # GetInstanceList can fail
560     try:
561       val = GetInstanceList(what[constants.NV_INSTANCELIST])
562     except RPCFail, err:
563       val = str(err)
564     result[constants.NV_INSTANCELIST] = val
565
566   if constants.NV_VGLIST in what and vm_capable:
567     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
568
569   if constants.NV_PVLIST in what and vm_capable:
570     result[constants.NV_PVLIST] = \
571       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
572                                    filter_allocatable=False)
573
574   if constants.NV_VERSION in what:
575     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
576                                     constants.RELEASE_VERSION)
577
578   if constants.NV_HVINFO in what and vm_capable:
579     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
580     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
581
582   if constants.NV_DRBDLIST in what and vm_capable:
583     try:
584       used_minors = bdev.DRBD8.GetUsedDevs().keys()
585     except errors.BlockDeviceError, err:
586       logging.warning("Can't get used minors list", exc_info=True)
587       used_minors = str(err)
588     result[constants.NV_DRBDLIST] = used_minors
589
590   if constants.NV_DRBDHELPER in what and vm_capable:
591     status = True
592     try:
593       payload = bdev.BaseDRBD.GetUsermodeHelper()
594     except errors.BlockDeviceError, err:
595       logging.error("Can't get DRBD usermode helper: %s", str(err))
596       status = False
597       payload = str(err)
598     result[constants.NV_DRBDHELPER] = (status, payload)
599
600   if constants.NV_NODESETUP in what:
601     result[constants.NV_NODESETUP] = tmpr = []
602     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
603       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
604                   " under /sys, missing required directories /sys/block"
605                   " and /sys/class/net")
606     if (not os.path.isdir("/proc/sys") or
607         not os.path.isfile("/proc/sysrq-trigger")):
608       tmpr.append("The procfs filesystem doesn't seem to be mounted"
609                   " under /proc, missing required directory /proc/sys and"
610                   " the file /proc/sysrq-trigger")
611
612   if constants.NV_TIME in what:
613     result[constants.NV_TIME] = utils.SplitTime(time.time())
614
615   if constants.NV_OSLIST in what and vm_capable:
616     result[constants.NV_OSLIST] = DiagnoseOS()
617
618   return result
619
620
621 def GetVolumeList(vg_name):
622   """Compute list of logical volumes and their size.
623
624   @type vg_name: str
625   @param vg_name: the volume group whose LVs we should list
626   @rtype: dict
627   @return:
628       dictionary of all partions (key) with value being a tuple of
629       their size (in MiB), inactive and online status::
630
631         {'test1': ('20.06', True, True)}
632
633       in case of errors, a string is returned with the error
634       details.
635
636   """
637   lvs = {}
638   sep = '|'
639   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
640                          "--separator=%s" % sep,
641                          "-olv_name,lv_size,lv_attr", vg_name])
642   if result.failed:
643     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
644
645   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
646   for line in result.stdout.splitlines():
647     line = line.strip()
648     match = valid_line_re.match(line)
649     if not match:
650       logging.error("Invalid line returned from lvs output: '%s'", line)
651       continue
652     name, size, attr = match.groups()
653     inactive = attr[4] == '-'
654     online = attr[5] == 'o'
655     virtual = attr[0] == 'v'
656     if virtual:
657       # we don't want to report such volumes as existing, since they
658       # don't really hold data
659       continue
660     lvs[name] = (size, inactive, online)
661
662   return lvs
663
664
665 def ListVolumeGroups():
666   """List the volume groups and their size.
667
668   @rtype: dict
669   @return: dictionary with keys volume name and values the
670       size of the volume
671
672   """
673   return utils.ListVolumeGroups()
674
675
676 def NodeVolumes():
677   """List all volumes on this node.
678
679   @rtype: list
680   @return:
681     A list of dictionaries, each having four keys:
682       - name: the logical volume name,
683       - size: the size of the logical volume
684       - dev: the physical device on which the LV lives
685       - vg: the volume group to which it belongs
686
687     In case of errors, we return an empty list and log the
688     error.
689
690     Note that since a logical volume can live on multiple physical
691     volumes, the resulting list might include a logical volume
692     multiple times.
693
694   """
695   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
696                          "--separator=|",
697                          "--options=lv_name,lv_size,devices,vg_name"])
698   if result.failed:
699     _Fail("Failed to list logical volumes, lvs output: %s",
700           result.output)
701
702   def parse_dev(dev):
703     return dev.split('(')[0]
704
705   def handle_dev(dev):
706     return [parse_dev(x) for x in dev.split(",")]
707
708   def map_line(line):
709     line = [v.strip() for v in line]
710     return [{'name': line[0], 'size': line[1],
711              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
712
713   all_devs = []
714   for line in result.stdout.splitlines():
715     if line.count('|') >= 3:
716       all_devs.extend(map_line(line.split('|')))
717     else:
718       logging.warning("Strange line in the output from lvs: '%s'", line)
719   return all_devs
720
721
722 def BridgesExist(bridges_list):
723   """Check if a list of bridges exist on the current node.
724
725   @rtype: boolean
726   @return: C{True} if all of them exist, C{False} otherwise
727
728   """
729   missing = []
730   for bridge in bridges_list:
731     if not utils.BridgeExists(bridge):
732       missing.append(bridge)
733
734   if missing:
735     _Fail("Missing bridges %s", utils.CommaJoin(missing))
736
737
738 def GetInstanceList(hypervisor_list):
739   """Provides a list of instances.
740
741   @type hypervisor_list: list
742   @param hypervisor_list: the list of hypervisors to query information
743
744   @rtype: list
745   @return: a list of all running instances on the current node
746     - instance1.example.com
747     - instance2.example.com
748
749   """
750   results = []
751   for hname in hypervisor_list:
752     try:
753       names = hypervisor.GetHypervisor(hname).ListInstances()
754       results.extend(names)
755     except errors.HypervisorError, err:
756       _Fail("Error enumerating instances (hypervisor %s): %s",
757             hname, err, exc=True)
758
759   return results
760
761
762 def GetInstanceInfo(instance, hname):
763   """Gives back the information about an instance as a dictionary.
764
765   @type instance: string
766   @param instance: the instance name
767   @type hname: string
768   @param hname: the hypervisor type of the instance
769
770   @rtype: dict
771   @return: dictionary with the following keys:
772       - memory: memory size of instance (int)
773       - state: xen state of instance (string)
774       - time: cpu time of instance (float)
775
776   """
777   output = {}
778
779   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
780   if iinfo is not None:
781     output['memory'] = iinfo[2]
782     output['state'] = iinfo[4]
783     output['time'] = iinfo[5]
784
785   return output
786
787
788 def GetInstanceMigratable(instance):
789   """Gives whether an instance can be migrated.
790
791   @type instance: L{objects.Instance}
792   @param instance: object representing the instance to be checked.
793
794   @rtype: tuple
795   @return: tuple of (result, description) where:
796       - result: whether the instance can be migrated or not
797       - description: a description of the issue, if relevant
798
799   """
800   hyper = hypervisor.GetHypervisor(instance.hypervisor)
801   iname = instance.name
802   if iname not in hyper.ListInstances():
803     _Fail("Instance %s is not running", iname)
804
805   for idx in range(len(instance.disks)):
806     link_name = _GetBlockDevSymlinkPath(iname, idx)
807     if not os.path.islink(link_name):
808       logging.warning("Instance %s is missing symlink %s for disk %d",
809                       iname, link_name, idx)
810
811
812 def GetAllInstancesInfo(hypervisor_list):
813   """Gather data about all instances.
814
815   This is the equivalent of L{GetInstanceInfo}, except that it
816   computes data for all instances at once, thus being faster if one
817   needs data about more than one instance.
818
819   @type hypervisor_list: list
820   @param hypervisor_list: list of hypervisors to query for instance data
821
822   @rtype: dict
823   @return: dictionary of instance: data, with data having the following keys:
824       - memory: memory size of instance (int)
825       - state: xen state of instance (string)
826       - time: cpu time of instance (float)
827       - vcpus: the number of vcpus
828
829   """
830   output = {}
831
832   for hname in hypervisor_list:
833     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
834     if iinfo:
835       for name, _, memory, vcpus, state, times in iinfo:
836         value = {
837           'memory': memory,
838           'vcpus': vcpus,
839           'state': state,
840           'time': times,
841           }
842         if name in output:
843           # we only check static parameters, like memory and vcpus,
844           # and not state and time which can change between the
845           # invocations of the different hypervisors
846           for key in 'memory', 'vcpus':
847             if value[key] != output[name][key]:
848               _Fail("Instance %s is running twice"
849                     " with different parameters", name)
850         output[name] = value
851
852   return output
853
854
855 def _InstanceLogName(kind, os_name, instance):
856   """Compute the OS log filename for a given instance and operation.
857
858   The instance name and os name are passed in as strings since not all
859   operations have these as part of an instance object.
860
861   @type kind: string
862   @param kind: the operation type (e.g. add, import, etc.)
863   @type os_name: string
864   @param os_name: the os name
865   @type instance: string
866   @param instance: the name of the instance being imported/added/etc.
867
868   """
869   # TODO: Use tempfile.mkstemp to create unique filename
870   base = ("%s-%s-%s-%s.log" %
871           (kind, os_name, instance, utils.TimestampForFilename()))
872   return utils.PathJoin(constants.LOG_OS_DIR, base)
873
874
875 def InstanceOsAdd(instance, reinstall, debug):
876   """Add an OS to an instance.
877
878   @type instance: L{objects.Instance}
879   @param instance: Instance whose OS is to be installed
880   @type reinstall: boolean
881   @param reinstall: whether this is an instance reinstall
882   @type debug: integer
883   @param debug: debug level, passed to the OS scripts
884   @rtype: None
885
886   """
887   inst_os = OSFromDisk(instance.os)
888
889   create_env = OSEnvironment(instance, inst_os, debug)
890   if reinstall:
891     create_env['INSTANCE_REINSTALL'] = "1"
892
893   logfile = _InstanceLogName("add", instance.os, instance.name)
894
895   result = utils.RunCmd([inst_os.create_script], env=create_env,
896                         cwd=inst_os.path, output=logfile,)
897   if result.failed:
898     logging.error("os create command '%s' returned error: %s, logfile: %s,"
899                   " output: %s", result.cmd, result.fail_reason, logfile,
900                   result.output)
901     lines = [utils.SafeEncode(val)
902              for val in utils.TailFile(logfile, lines=20)]
903     _Fail("OS create script failed (%s), last lines in the"
904           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
905
906
907 def RunRenameInstance(instance, old_name, debug):
908   """Run the OS rename script for an instance.
909
910   @type instance: L{objects.Instance}
911   @param instance: Instance whose OS is to be installed
912   @type old_name: string
913   @param old_name: previous instance name
914   @type debug: integer
915   @param debug: debug level, passed to the OS scripts
916   @rtype: boolean
917   @return: the success of the operation
918
919   """
920   inst_os = OSFromDisk(instance.os)
921
922   rename_env = OSEnvironment(instance, inst_os, debug)
923   rename_env['OLD_INSTANCE_NAME'] = old_name
924
925   logfile = _InstanceLogName("rename", instance.os,
926                              "%s-%s" % (old_name, instance.name))
927
928   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
929                         cwd=inst_os.path, output=logfile)
930
931   if result.failed:
932     logging.error("os create command '%s' returned error: %s output: %s",
933                   result.cmd, result.fail_reason, result.output)
934     lines = [utils.SafeEncode(val)
935              for val in utils.TailFile(logfile, lines=20)]
936     _Fail("OS rename script failed (%s), last lines in the"
937           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
938
939
940 def _GetBlockDevSymlinkPath(instance_name, idx):
941   return utils.PathJoin(constants.DISK_LINKS_DIR,
942                         "%s:%d" % (instance_name, idx))
943
944
945 def _SymlinkBlockDev(instance_name, device_path, idx):
946   """Set up symlinks to a instance's block device.
947
948   This is an auxiliary function run when an instance is start (on the primary
949   node) or when an instance is migrated (on the target node).
950
951
952   @param instance_name: the name of the target instance
953   @param device_path: path of the physical block device, on the node
954   @param idx: the disk index
955   @return: absolute path to the disk's symlink
956
957   """
958   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
959   try:
960     os.symlink(device_path, link_name)
961   except OSError, err:
962     if err.errno == errno.EEXIST:
963       if (not os.path.islink(link_name) or
964           os.readlink(link_name) != device_path):
965         os.remove(link_name)
966         os.symlink(device_path, link_name)
967     else:
968       raise
969
970   return link_name
971
972
973 def _RemoveBlockDevLinks(instance_name, disks):
974   """Remove the block device symlinks belonging to the given instance.
975
976   """
977   for idx, _ in enumerate(disks):
978     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
979     if os.path.islink(link_name):
980       try:
981         os.remove(link_name)
982       except OSError:
983         logging.exception("Can't remove symlink '%s'", link_name)
984
985
986 def _GatherAndLinkBlockDevs(instance):
987   """Set up an instance's block device(s).
988
989   This is run on the primary node at instance startup. The block
990   devices must be already assembled.
991
992   @type instance: L{objects.Instance}
993   @param instance: the instance whose disks we shoul assemble
994   @rtype: list
995   @return: list of (disk_object, device_path)
996
997   """
998   block_devices = []
999   for idx, disk in enumerate(instance.disks):
1000     device = _RecursiveFindBD(disk)
1001     if device is None:
1002       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1003                                     str(disk))
1004     device.Open()
1005     try:
1006       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1007     except OSError, e:
1008       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1009                                     e.strerror)
1010
1011     block_devices.append((disk, link_name))
1012
1013   return block_devices
1014
1015
1016 def StartInstance(instance):
1017   """Start an instance.
1018
1019   @type instance: L{objects.Instance}
1020   @param instance: the instance object
1021   @rtype: None
1022
1023   """
1024   running_instances = GetInstanceList([instance.hypervisor])
1025
1026   if instance.name in running_instances:
1027     logging.info("Instance %s already running, not starting", instance.name)
1028     return
1029
1030   try:
1031     block_devices = _GatherAndLinkBlockDevs(instance)
1032     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1033     hyper.StartInstance(instance, block_devices)
1034   except errors.BlockDeviceError, err:
1035     _Fail("Block device error: %s", err, exc=True)
1036   except errors.HypervisorError, err:
1037     _RemoveBlockDevLinks(instance.name, instance.disks)
1038     _Fail("Hypervisor error: %s", err, exc=True)
1039
1040
1041 def InstanceShutdown(instance, timeout):
1042   """Shut an instance down.
1043
1044   @note: this functions uses polling with a hardcoded timeout.
1045
1046   @type instance: L{objects.Instance}
1047   @param instance: the instance object
1048   @type timeout: integer
1049   @param timeout: maximum timeout for soft shutdown
1050   @rtype: None
1051
1052   """
1053   hv_name = instance.hypervisor
1054   hyper = hypervisor.GetHypervisor(hv_name)
1055   iname = instance.name
1056
1057   if instance.name not in hyper.ListInstances():
1058     logging.info("Instance %s not running, doing nothing", iname)
1059     return
1060
1061   class _TryShutdown:
1062     def __init__(self):
1063       self.tried_once = False
1064
1065     def __call__(self):
1066       if iname not in hyper.ListInstances():
1067         return
1068
1069       try:
1070         hyper.StopInstance(instance, retry=self.tried_once)
1071       except errors.HypervisorError, err:
1072         if iname not in hyper.ListInstances():
1073           # if the instance is no longer existing, consider this a
1074           # success and go to cleanup
1075           return
1076
1077         _Fail("Failed to stop instance %s: %s", iname, err)
1078
1079       self.tried_once = True
1080
1081       raise utils.RetryAgain()
1082
1083   try:
1084     utils.Retry(_TryShutdown(), 5, timeout)
1085   except utils.RetryTimeout:
1086     # the shutdown did not succeed
1087     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1088
1089     try:
1090       hyper.StopInstance(instance, force=True)
1091     except errors.HypervisorError, err:
1092       if iname in hyper.ListInstances():
1093         # only raise an error if the instance still exists, otherwise
1094         # the error could simply be "instance ... unknown"!
1095         _Fail("Failed to force stop instance %s: %s", iname, err)
1096
1097     time.sleep(1)
1098
1099     if iname in hyper.ListInstances():
1100       _Fail("Could not shutdown instance %s even by destroy", iname)
1101
1102   try:
1103     hyper.CleanupInstance(instance.name)
1104   except errors.HypervisorError, err:
1105     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1106
1107   _RemoveBlockDevLinks(iname, instance.disks)
1108
1109
1110 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1111   """Reboot an instance.
1112
1113   @type instance: L{objects.Instance}
1114   @param instance: the instance object to reboot
1115   @type reboot_type: str
1116   @param reboot_type: the type of reboot, one the following
1117     constants:
1118       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1119         instance OS, do not recreate the VM
1120       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1121         restart the VM (at the hypervisor level)
1122       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1123         not accepted here, since that mode is handled differently, in
1124         cmdlib, and translates into full stop and start of the
1125         instance (instead of a call_instance_reboot RPC)
1126   @type shutdown_timeout: integer
1127   @param shutdown_timeout: maximum timeout for soft shutdown
1128   @rtype: None
1129
1130   """
1131   running_instances = GetInstanceList([instance.hypervisor])
1132
1133   if instance.name not in running_instances:
1134     _Fail("Cannot reboot instance %s that is not running", instance.name)
1135
1136   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1137   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1138     try:
1139       hyper.RebootInstance(instance)
1140     except errors.HypervisorError, err:
1141       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1142   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1143     try:
1144       InstanceShutdown(instance, shutdown_timeout)
1145       return StartInstance(instance)
1146     except errors.HypervisorError, err:
1147       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1148   else:
1149     _Fail("Invalid reboot_type received: %s", reboot_type)
1150
1151
1152 def MigrationInfo(instance):
1153   """Gather information about an instance to be migrated.
1154
1155   @type instance: L{objects.Instance}
1156   @param instance: the instance definition
1157
1158   """
1159   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1160   try:
1161     info = hyper.MigrationInfo(instance)
1162   except errors.HypervisorError, err:
1163     _Fail("Failed to fetch migration information: %s", err, exc=True)
1164   return info
1165
1166
1167 def AcceptInstance(instance, info, target):
1168   """Prepare the node to accept an instance.
1169
1170   @type instance: L{objects.Instance}
1171   @param instance: the instance definition
1172   @type info: string/data (opaque)
1173   @param info: migration information, from the source node
1174   @type target: string
1175   @param target: target host (usually ip), on this node
1176
1177   """
1178   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1179   try:
1180     hyper.AcceptInstance(instance, info, target)
1181   except errors.HypervisorError, err:
1182     _Fail("Failed to accept instance: %s", err, exc=True)
1183
1184
1185 def FinalizeMigration(instance, info, success):
1186   """Finalize any preparation to accept an instance.
1187
1188   @type instance: L{objects.Instance}
1189   @param instance: the instance definition
1190   @type info: string/data (opaque)
1191   @param info: migration information, from the source node
1192   @type success: boolean
1193   @param success: whether the migration was a success or a failure
1194
1195   """
1196   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1197   try:
1198     hyper.FinalizeMigration(instance, info, success)
1199   except errors.HypervisorError, err:
1200     _Fail("Failed to finalize migration: %s", err, exc=True)
1201
1202
1203 def MigrateInstance(instance, target, live):
1204   """Migrates an instance to another node.
1205
1206   @type instance: L{objects.Instance}
1207   @param instance: the instance definition
1208   @type target: string
1209   @param target: the target node name
1210   @type live: boolean
1211   @param live: whether the migration should be done live or not (the
1212       interpretation of this parameter is left to the hypervisor)
1213   @rtype: tuple
1214   @return: a tuple of (success, msg) where:
1215       - succes is a boolean denoting the success/failure of the operation
1216       - msg is a string with details in case of failure
1217
1218   """
1219   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1220
1221   try:
1222     hyper.MigrateInstance(instance, target, live)
1223   except errors.HypervisorError, err:
1224     _Fail("Failed to migrate instance: %s", err, exc=True)
1225
1226
1227 def BlockdevCreate(disk, size, owner, on_primary, info):
1228   """Creates a block device for an instance.
1229
1230   @type disk: L{objects.Disk}
1231   @param disk: the object describing the disk we should create
1232   @type size: int
1233   @param size: the size of the physical underlying device, in MiB
1234   @type owner: str
1235   @param owner: the name of the instance for which disk is created,
1236       used for device cache data
1237   @type on_primary: boolean
1238   @param on_primary:  indicates if it is the primary node or not
1239   @type info: string
1240   @param info: string that will be sent to the physical device
1241       creation, used for example to set (LVM) tags on LVs
1242
1243   @return: the new unique_id of the device (this can sometime be
1244       computed only after creation), or None. On secondary nodes,
1245       it's not required to return anything.
1246
1247   """
1248   # TODO: remove the obsolete 'size' argument
1249   # pylint: disable-msg=W0613
1250   clist = []
1251   if disk.children:
1252     for child in disk.children:
1253       try:
1254         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1255       except errors.BlockDeviceError, err:
1256         _Fail("Can't assemble device %s: %s", child, err)
1257       if on_primary or disk.AssembleOnSecondary():
1258         # we need the children open in case the device itself has to
1259         # be assembled
1260         try:
1261           # pylint: disable-msg=E1103
1262           crdev.Open()
1263         except errors.BlockDeviceError, err:
1264           _Fail("Can't make child '%s' read-write: %s", child, err)
1265       clist.append(crdev)
1266
1267   try:
1268     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1269   except errors.BlockDeviceError, err:
1270     _Fail("Can't create block device: %s", err)
1271
1272   if on_primary or disk.AssembleOnSecondary():
1273     try:
1274       device.Assemble()
1275     except errors.BlockDeviceError, err:
1276       _Fail("Can't assemble device after creation, unusual event: %s", err)
1277     device.SetSyncSpeed(constants.SYNC_SPEED)
1278     if on_primary or disk.OpenOnSecondary():
1279       try:
1280         device.Open(force=True)
1281       except errors.BlockDeviceError, err:
1282         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1283     DevCacheManager.UpdateCache(device.dev_path, owner,
1284                                 on_primary, disk.iv_name)
1285
1286   device.SetInfo(info)
1287
1288   return device.unique_id
1289
1290
1291 def _WipeDevice(path, offset, size):
1292   """This function actually wipes the device.
1293
1294   @param path: The path to the device to wipe
1295   @param offset: The offset in MiB in the file
1296   @param size: The size in MiB to write
1297
1298   """
1299   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1300          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1301          "count=%d" % size]
1302   result = utils.RunCmd(cmd)
1303
1304   if result.failed:
1305     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1306           result.fail_reason, result.output)
1307
1308
1309 def BlockdevWipe(disk, offset, size):
1310   """Wipes a block device.
1311
1312   @type disk: L{objects.Disk}
1313   @param disk: the disk object we want to wipe
1314   @type offset: int
1315   @param offset: The offset in MiB in the file
1316   @type size: int
1317   @param size: The size in MiB to write
1318
1319   """
1320   try:
1321     rdev = _RecursiveFindBD(disk)
1322   except errors.BlockDeviceError:
1323     rdev = None
1324
1325   if not rdev:
1326     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1327
1328   # Do cross verify some of the parameters
1329   if offset > rdev.size:
1330     _Fail("Offset is bigger than device size")
1331   if (offset + size) > rdev.size:
1332     _Fail("The provided offset and size to wipe is bigger than device size")
1333
1334   _WipeDevice(rdev.dev_path, offset, size)
1335
1336
1337 def BlockdevRemove(disk):
1338   """Remove a block device.
1339
1340   @note: This is intended to be called recursively.
1341
1342   @type disk: L{objects.Disk}
1343   @param disk: the disk object we should remove
1344   @rtype: boolean
1345   @return: the success of the operation
1346
1347   """
1348   msgs = []
1349   try:
1350     rdev = _RecursiveFindBD(disk)
1351   except errors.BlockDeviceError, err:
1352     # probably can't attach
1353     logging.info("Can't attach to device %s in remove", disk)
1354     rdev = None
1355   if rdev is not None:
1356     r_path = rdev.dev_path
1357     try:
1358       rdev.Remove()
1359     except errors.BlockDeviceError, err:
1360       msgs.append(str(err))
1361     if not msgs:
1362       DevCacheManager.RemoveCache(r_path)
1363
1364   if disk.children:
1365     for child in disk.children:
1366       try:
1367         BlockdevRemove(child)
1368       except RPCFail, err:
1369         msgs.append(str(err))
1370
1371   if msgs:
1372     _Fail("; ".join(msgs))
1373
1374
1375 def _RecursiveAssembleBD(disk, owner, as_primary):
1376   """Activate a block device for an instance.
1377
1378   This is run on the primary and secondary nodes for an instance.
1379
1380   @note: this function is called recursively.
1381
1382   @type disk: L{objects.Disk}
1383   @param disk: the disk we try to assemble
1384   @type owner: str
1385   @param owner: the name of the instance which owns the disk
1386   @type as_primary: boolean
1387   @param as_primary: if we should make the block device
1388       read/write
1389
1390   @return: the assembled device or None (in case no device
1391       was assembled)
1392   @raise errors.BlockDeviceError: in case there is an error
1393       during the activation of the children or the device
1394       itself
1395
1396   """
1397   children = []
1398   if disk.children:
1399     mcn = disk.ChildrenNeeded()
1400     if mcn == -1:
1401       mcn = 0 # max number of Nones allowed
1402     else:
1403       mcn = len(disk.children) - mcn # max number of Nones
1404     for chld_disk in disk.children:
1405       try:
1406         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1407       except errors.BlockDeviceError, err:
1408         if children.count(None) >= mcn:
1409           raise
1410         cdev = None
1411         logging.error("Error in child activation (but continuing): %s",
1412                       str(err))
1413       children.append(cdev)
1414
1415   if as_primary or disk.AssembleOnSecondary():
1416     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1417     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1418     result = r_dev
1419     if as_primary or disk.OpenOnSecondary():
1420       r_dev.Open()
1421     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1422                                 as_primary, disk.iv_name)
1423
1424   else:
1425     result = True
1426   return result
1427
1428
1429 def BlockdevAssemble(disk, owner, as_primary):
1430   """Activate a block device for an instance.
1431
1432   This is a wrapper over _RecursiveAssembleBD.
1433
1434   @rtype: str or boolean
1435   @return: a C{/dev/...} path for primary nodes, and
1436       C{True} for secondary nodes
1437
1438   """
1439   try:
1440     result = _RecursiveAssembleBD(disk, owner, as_primary)
1441     if isinstance(result, bdev.BlockDev):
1442       # pylint: disable-msg=E1103
1443       result = result.dev_path
1444   except errors.BlockDeviceError, err:
1445     _Fail("Error while assembling disk: %s", err, exc=True)
1446
1447   return result
1448
1449
1450 def BlockdevShutdown(disk):
1451   """Shut down a block device.
1452
1453   First, if the device is assembled (Attach() is successful), then
1454   the device is shutdown. Then the children of the device are
1455   shutdown.
1456
1457   This function is called recursively. Note that we don't cache the
1458   children or such, as oppossed to assemble, shutdown of different
1459   devices doesn't require that the upper device was active.
1460
1461   @type disk: L{objects.Disk}
1462   @param disk: the description of the disk we should
1463       shutdown
1464   @rtype: None
1465
1466   """
1467   msgs = []
1468   r_dev = _RecursiveFindBD(disk)
1469   if r_dev is not None:
1470     r_path = r_dev.dev_path
1471     try:
1472       r_dev.Shutdown()
1473       DevCacheManager.RemoveCache(r_path)
1474     except errors.BlockDeviceError, err:
1475       msgs.append(str(err))
1476
1477   if disk.children:
1478     for child in disk.children:
1479       try:
1480         BlockdevShutdown(child)
1481       except RPCFail, err:
1482         msgs.append(str(err))
1483
1484   if msgs:
1485     _Fail("; ".join(msgs))
1486
1487
1488 def BlockdevAddchildren(parent_cdev, new_cdevs):
1489   """Extend a mirrored block device.
1490
1491   @type parent_cdev: L{objects.Disk}
1492   @param parent_cdev: the disk to which we should add children
1493   @type new_cdevs: list of L{objects.Disk}
1494   @param new_cdevs: the list of children which we should add
1495   @rtype: None
1496
1497   """
1498   parent_bdev = _RecursiveFindBD(parent_cdev)
1499   if parent_bdev is None:
1500     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1501   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1502   if new_bdevs.count(None) > 0:
1503     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1504   parent_bdev.AddChildren(new_bdevs)
1505
1506
1507 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1508   """Shrink a mirrored block device.
1509
1510   @type parent_cdev: L{objects.Disk}
1511   @param parent_cdev: the disk from which we should remove children
1512   @type new_cdevs: list of L{objects.Disk}
1513   @param new_cdevs: the list of children which we should remove
1514   @rtype: None
1515
1516   """
1517   parent_bdev = _RecursiveFindBD(parent_cdev)
1518   if parent_bdev is None:
1519     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1520   devs = []
1521   for disk in new_cdevs:
1522     rpath = disk.StaticDevPath()
1523     if rpath is None:
1524       bd = _RecursiveFindBD(disk)
1525       if bd is None:
1526         _Fail("Can't find device %s while removing children", disk)
1527       else:
1528         devs.append(bd.dev_path)
1529     else:
1530       if not utils.IsNormAbsPath(rpath):
1531         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1532       devs.append(rpath)
1533   parent_bdev.RemoveChildren(devs)
1534
1535
1536 def BlockdevGetmirrorstatus(disks):
1537   """Get the mirroring status of a list of devices.
1538
1539   @type disks: list of L{objects.Disk}
1540   @param disks: the list of disks which we should query
1541   @rtype: disk
1542   @return:
1543       a list of (mirror_done, estimated_time) tuples, which
1544       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1545   @raise errors.BlockDeviceError: if any of the disks cannot be
1546       found
1547
1548   """
1549   stats = []
1550   for dsk in disks:
1551     rbd = _RecursiveFindBD(dsk)
1552     if rbd is None:
1553       _Fail("Can't find device %s", dsk)
1554
1555     stats.append(rbd.CombinedSyncStatus())
1556
1557   return stats
1558
1559
1560 def _RecursiveFindBD(disk):
1561   """Check if a device is activated.
1562
1563   If so, return information about the real device.
1564
1565   @type disk: L{objects.Disk}
1566   @param disk: the disk object we need to find
1567
1568   @return: None if the device can't be found,
1569       otherwise the device instance
1570
1571   """
1572   children = []
1573   if disk.children:
1574     for chdisk in disk.children:
1575       children.append(_RecursiveFindBD(chdisk))
1576
1577   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1578
1579
1580 def _OpenRealBD(disk):
1581   """Opens the underlying block device of a disk.
1582
1583   @type disk: L{objects.Disk}
1584   @param disk: the disk object we want to open
1585
1586   """
1587   real_disk = _RecursiveFindBD(disk)
1588   if real_disk is None:
1589     _Fail("Block device '%s' is not set up", disk)
1590
1591   real_disk.Open()
1592
1593   return real_disk
1594
1595
1596 def BlockdevFind(disk):
1597   """Check if a device is activated.
1598
1599   If it is, return information about the real device.
1600
1601   @type disk: L{objects.Disk}
1602   @param disk: the disk to find
1603   @rtype: None or objects.BlockDevStatus
1604   @return: None if the disk cannot be found, otherwise a the current
1605            information
1606
1607   """
1608   try:
1609     rbd = _RecursiveFindBD(disk)
1610   except errors.BlockDeviceError, err:
1611     _Fail("Failed to find device: %s", err, exc=True)
1612
1613   if rbd is None:
1614     return None
1615
1616   return rbd.GetSyncStatus()
1617
1618
1619 def BlockdevGetsize(disks):
1620   """Computes the size of the given disks.
1621
1622   If a disk is not found, returns None instead.
1623
1624   @type disks: list of L{objects.Disk}
1625   @param disks: the list of disk to compute the size for
1626   @rtype: list
1627   @return: list with elements None if the disk cannot be found,
1628       otherwise the size
1629
1630   """
1631   result = []
1632   for cf in disks:
1633     try:
1634       rbd = _RecursiveFindBD(cf)
1635     except errors.BlockDeviceError:
1636       result.append(None)
1637       continue
1638     if rbd is None:
1639       result.append(None)
1640     else:
1641       result.append(rbd.GetActualSize())
1642   return result
1643
1644
1645 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1646   """Export a block device to a remote node.
1647
1648   @type disk: L{objects.Disk}
1649   @param disk: the description of the disk to export
1650   @type dest_node: str
1651   @param dest_node: the destination node to export to
1652   @type dest_path: str
1653   @param dest_path: the destination path on the target node
1654   @type cluster_name: str
1655   @param cluster_name: the cluster name, needed for SSH hostalias
1656   @rtype: None
1657
1658   """
1659   real_disk = _OpenRealBD(disk)
1660
1661   # the block size on the read dd is 1MiB to match our units
1662   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1663                                "dd if=%s bs=1048576 count=%s",
1664                                real_disk.dev_path, str(disk.size))
1665
1666   # we set here a smaller block size as, due to ssh buffering, more
1667   # than 64-128k will mostly ignored; we use nocreat to fail if the
1668   # device is not already there or we pass a wrong path; we use
1669   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1670   # to not buffer too much memory; this means that at best, we flush
1671   # every 64k, which will not be very fast
1672   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1673                                 " oflag=dsync", dest_path)
1674
1675   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1676                                                    constants.GANETI_RUNAS,
1677                                                    destcmd)
1678
1679   # all commands have been checked, so we're safe to combine them
1680   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1681
1682   result = utils.RunCmd(["bash", "-c", command])
1683
1684   if result.failed:
1685     _Fail("Disk copy command '%s' returned error: %s"
1686           " output: %s", command, result.fail_reason, result.output)
1687
1688
1689 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1690   """Write a file to the filesystem.
1691
1692   This allows the master to overwrite(!) a file. It will only perform
1693   the operation if the file belongs to a list of configuration files.
1694
1695   @type file_name: str
1696   @param file_name: the target file name
1697   @type data: str
1698   @param data: the new contents of the file
1699   @type mode: int
1700   @param mode: the mode to give the file (can be None)
1701   @type uid: int
1702   @param uid: the owner of the file (can be -1 for default)
1703   @type gid: int
1704   @param gid: the group of the file (can be -1 for default)
1705   @type atime: float
1706   @param atime: the atime to set on the file (can be None)
1707   @type mtime: float
1708   @param mtime: the mtime to set on the file (can be None)
1709   @rtype: None
1710
1711   """
1712   if not os.path.isabs(file_name):
1713     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1714
1715   if file_name not in _ALLOWED_UPLOAD_FILES:
1716     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1717           file_name)
1718
1719   raw_data = _Decompress(data)
1720
1721   utils.SafeWriteFile(file_name, None,
1722                       data=raw_data, mode=mode, uid=uid, gid=gid,
1723                       atime=atime, mtime=mtime)
1724
1725
1726 def WriteSsconfFiles(values):
1727   """Update all ssconf files.
1728
1729   Wrapper around the SimpleStore.WriteFiles.
1730
1731   """
1732   ssconf.SimpleStore().WriteFiles(values)
1733
1734
1735 def _ErrnoOrStr(err):
1736   """Format an EnvironmentError exception.
1737
1738   If the L{err} argument has an errno attribute, it will be looked up
1739   and converted into a textual C{E...} description. Otherwise the
1740   string representation of the error will be returned.
1741
1742   @type err: L{EnvironmentError}
1743   @param err: the exception to format
1744
1745   """
1746   if hasattr(err, 'errno'):
1747     detail = errno.errorcode[err.errno]
1748   else:
1749     detail = str(err)
1750   return detail
1751
1752
1753 def _OSOndiskAPIVersion(os_dir):
1754   """Compute and return the API version of a given OS.
1755
1756   This function will try to read the API version of the OS residing in
1757   the 'os_dir' directory.
1758
1759   @type os_dir: str
1760   @param os_dir: the directory in which we should look for the OS
1761   @rtype: tuple
1762   @return: tuple (status, data) with status denoting the validity and
1763       data holding either the vaid versions or an error message
1764
1765   """
1766   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1767
1768   try:
1769     st = os.stat(api_file)
1770   except EnvironmentError, err:
1771     return False, ("Required file '%s' not found under path %s: %s" %
1772                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1773
1774   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1775     return False, ("File '%s' in %s is not a regular file" %
1776                    (constants.OS_API_FILE, os_dir))
1777
1778   try:
1779     api_versions = utils.ReadFile(api_file).splitlines()
1780   except EnvironmentError, err:
1781     return False, ("Error while reading the API version file at %s: %s" %
1782                    (api_file, _ErrnoOrStr(err)))
1783
1784   try:
1785     api_versions = [int(version.strip()) for version in api_versions]
1786   except (TypeError, ValueError), err:
1787     return False, ("API version(s) can't be converted to integer: %s" %
1788                    str(err))
1789
1790   return True, api_versions
1791
1792
1793 def DiagnoseOS(top_dirs=None):
1794   """Compute the validity for all OSes.
1795
1796   @type top_dirs: list
1797   @param top_dirs: the list of directories in which to
1798       search (if not given defaults to
1799       L{constants.OS_SEARCH_PATH})
1800   @rtype: list of L{objects.OS}
1801   @return: a list of tuples (name, path, status, diagnose, variants,
1802       parameters, api_version) for all (potential) OSes under all
1803       search paths, where:
1804           - name is the (potential) OS name
1805           - path is the full path to the OS
1806           - status True/False is the validity of the OS
1807           - diagnose is the error message for an invalid OS, otherwise empty
1808           - variants is a list of supported OS variants, if any
1809           - parameters is a list of (name, help) parameters, if any
1810           - api_version is a list of support OS API versions
1811
1812   """
1813   if top_dirs is None:
1814     top_dirs = constants.OS_SEARCH_PATH
1815
1816   result = []
1817   for dir_name in top_dirs:
1818     if os.path.isdir(dir_name):
1819       try:
1820         f_names = utils.ListVisibleFiles(dir_name)
1821       except EnvironmentError, err:
1822         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1823         break
1824       for name in f_names:
1825         os_path = utils.PathJoin(dir_name, name)
1826         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1827         if status:
1828           diagnose = ""
1829           variants = os_inst.supported_variants
1830           parameters = os_inst.supported_parameters
1831           api_versions = os_inst.api_versions
1832         else:
1833           diagnose = os_inst
1834           variants = parameters = api_versions = []
1835         result.append((name, os_path, status, diagnose, variants,
1836                        parameters, api_versions))
1837
1838   return result
1839
1840
1841 def _TryOSFromDisk(name, base_dir=None):
1842   """Create an OS instance from disk.
1843
1844   This function will return an OS instance if the given name is a
1845   valid OS name.
1846
1847   @type base_dir: string
1848   @keyword base_dir: Base directory containing OS installations.
1849                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1850   @rtype: tuple
1851   @return: success and either the OS instance if we find a valid one,
1852       or error message
1853
1854   """
1855   if base_dir is None:
1856     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1857   else:
1858     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1859
1860   if os_dir is None:
1861     return False, "Directory for OS %s not found in search path" % name
1862
1863   status, api_versions = _OSOndiskAPIVersion(os_dir)
1864   if not status:
1865     # push the error up
1866     return status, api_versions
1867
1868   if not constants.OS_API_VERSIONS.intersection(api_versions):
1869     return False, ("API version mismatch for path '%s': found %s, want %s." %
1870                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1871
1872   # OS Files dictionary, we will populate it with the absolute path names
1873   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1874
1875   if max(api_versions) >= constants.OS_API_V15:
1876     os_files[constants.OS_VARIANTS_FILE] = ''
1877
1878   if max(api_versions) >= constants.OS_API_V20:
1879     os_files[constants.OS_PARAMETERS_FILE] = ''
1880   else:
1881     del os_files[constants.OS_SCRIPT_VERIFY]
1882
1883   for filename in os_files:
1884     os_files[filename] = utils.PathJoin(os_dir, filename)
1885
1886     try:
1887       st = os.stat(os_files[filename])
1888     except EnvironmentError, err:
1889       return False, ("File '%s' under path '%s' is missing (%s)" %
1890                      (filename, os_dir, _ErrnoOrStr(err)))
1891
1892     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1893       return False, ("File '%s' under path '%s' is not a regular file" %
1894                      (filename, os_dir))
1895
1896     if filename in constants.OS_SCRIPTS:
1897       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1898         return False, ("File '%s' under path '%s' is not executable" %
1899                        (filename, os_dir))
1900
1901   variants = []
1902   if constants.OS_VARIANTS_FILE in os_files:
1903     variants_file = os_files[constants.OS_VARIANTS_FILE]
1904     try:
1905       variants = utils.ReadFile(variants_file).splitlines()
1906     except EnvironmentError, err:
1907       return False, ("Error while reading the OS variants file at %s: %s" %
1908                      (variants_file, _ErrnoOrStr(err)))
1909     if not variants:
1910       return False, ("No supported os variant found")
1911
1912   parameters = []
1913   if constants.OS_PARAMETERS_FILE in os_files:
1914     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1915     try:
1916       parameters = utils.ReadFile(parameters_file).splitlines()
1917     except EnvironmentError, err:
1918       return False, ("Error while reading the OS parameters file at %s: %s" %
1919                      (parameters_file, _ErrnoOrStr(err)))
1920     parameters = [v.split(None, 1) for v in parameters]
1921
1922   os_obj = objects.OS(name=name, path=os_dir,
1923                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1924                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1925                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1926                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1927                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1928                                                  None),
1929                       supported_variants=variants,
1930                       supported_parameters=parameters,
1931                       api_versions=api_versions)
1932   return True, os_obj
1933
1934
1935 def OSFromDisk(name, base_dir=None):
1936   """Create an OS instance from disk.
1937
1938   This function will return an OS instance if the given name is a
1939   valid OS name. Otherwise, it will raise an appropriate
1940   L{RPCFail} exception, detailing why this is not a valid OS.
1941
1942   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1943   an exception but returns true/false status data.
1944
1945   @type base_dir: string
1946   @keyword base_dir: Base directory containing OS installations.
1947                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1948   @rtype: L{objects.OS}
1949   @return: the OS instance if we find a valid one
1950   @raise RPCFail: if we don't find a valid OS
1951
1952   """
1953   name_only = objects.OS.GetName(name)
1954   status, payload = _TryOSFromDisk(name_only, base_dir)
1955
1956   if not status:
1957     _Fail(payload)
1958
1959   return payload
1960
1961
1962 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1963   """Calculate the basic environment for an os script.
1964
1965   @type os_name: str
1966   @param os_name: full operating system name (including variant)
1967   @type inst_os: L{objects.OS}
1968   @param inst_os: operating system for which the environment is being built
1969   @type os_params: dict
1970   @param os_params: the OS parameters
1971   @type debug: integer
1972   @param debug: debug level (0 or 1, for OS Api 10)
1973   @rtype: dict
1974   @return: dict of environment variables
1975   @raise errors.BlockDeviceError: if the block device
1976       cannot be found
1977
1978   """
1979   result = {}
1980   api_version = \
1981     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1982   result['OS_API_VERSION'] = '%d' % api_version
1983   result['OS_NAME'] = inst_os.name
1984   result['DEBUG_LEVEL'] = '%d' % debug
1985
1986   # OS variants
1987   if api_version >= constants.OS_API_V15:
1988     variant = objects.OS.GetVariant(os_name)
1989     if not variant:
1990       variant = inst_os.supported_variants[0]
1991     result['OS_VARIANT'] = variant
1992
1993   # OS params
1994   for pname, pvalue in os_params.items():
1995     result['OSP_%s' % pname.upper()] = pvalue
1996
1997   return result
1998
1999
2000 def OSEnvironment(instance, inst_os, debug=0):
2001   """Calculate the environment for an os script.
2002
2003   @type instance: L{objects.Instance}
2004   @param instance: target instance for the os script run
2005   @type inst_os: L{objects.OS}
2006   @param inst_os: operating system for which the environment is being built
2007   @type debug: integer
2008   @param debug: debug level (0 or 1, for OS Api 10)
2009   @rtype: dict
2010   @return: dict of environment variables
2011   @raise errors.BlockDeviceError: if the block device
2012       cannot be found
2013
2014   """
2015   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2016
2017   for attr in ["name", "os", "uuid", "ctime", "mtime"]:
2018     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2019
2020   result['HYPERVISOR'] = instance.hypervisor
2021   result['DISK_COUNT'] = '%d' % len(instance.disks)
2022   result['NIC_COUNT'] = '%d' % len(instance.nics)
2023
2024   # Disks
2025   for idx, disk in enumerate(instance.disks):
2026     real_disk = _OpenRealBD(disk)
2027     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2028     result['DISK_%d_ACCESS' % idx] = disk.mode
2029     if constants.HV_DISK_TYPE in instance.hvparams:
2030       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2031         instance.hvparams[constants.HV_DISK_TYPE]
2032     if disk.dev_type in constants.LDS_BLOCK:
2033       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2034     elif disk.dev_type == constants.LD_FILE:
2035       result['DISK_%d_BACKEND_TYPE' % idx] = \
2036         'file:%s' % disk.physical_id[0]
2037
2038   # NICs
2039   for idx, nic in enumerate(instance.nics):
2040     result['NIC_%d_MAC' % idx] = nic.mac
2041     if nic.ip:
2042       result['NIC_%d_IP' % idx] = nic.ip
2043     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2044     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2045       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2046     if nic.nicparams[constants.NIC_LINK]:
2047       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2048     if constants.HV_NIC_TYPE in instance.hvparams:
2049       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2050         instance.hvparams[constants.HV_NIC_TYPE]
2051
2052   # HV/BE params
2053   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2054     for key, value in source.items():
2055       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2056
2057   return result
2058
2059
2060 def BlockdevGrow(disk, amount):
2061   """Grow a stack of block devices.
2062
2063   This function is called recursively, with the childrens being the
2064   first ones to resize.
2065
2066   @type disk: L{objects.Disk}
2067   @param disk: the disk to be grown
2068   @rtype: (status, result)
2069   @return: a tuple with the status of the operation
2070       (True/False), and the errors message if status
2071       is False
2072
2073   """
2074   r_dev = _RecursiveFindBD(disk)
2075   if r_dev is None:
2076     _Fail("Cannot find block device %s", disk)
2077
2078   try:
2079     r_dev.Grow(amount)
2080   except errors.BlockDeviceError, err:
2081     _Fail("Failed to grow block device: %s", err, exc=True)
2082
2083
2084 def BlockdevSnapshot(disk):
2085   """Create a snapshot copy of a block device.
2086
2087   This function is called recursively, and the snapshot is actually created
2088   just for the leaf lvm backend device.
2089
2090   @type disk: L{objects.Disk}
2091   @param disk: the disk to be snapshotted
2092   @rtype: string
2093   @return: snapshot disk path
2094
2095   """
2096   if disk.dev_type == constants.LD_DRBD8:
2097     if not disk.children:
2098       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2099             disk.unique_id)
2100     return BlockdevSnapshot(disk.children[0])
2101   elif disk.dev_type == constants.LD_LV:
2102     r_dev = _RecursiveFindBD(disk)
2103     if r_dev is not None:
2104       # FIXME: choose a saner value for the snapshot size
2105       # let's stay on the safe side and ask for the full size, for now
2106       return r_dev.Snapshot(disk.size)
2107     else:
2108       _Fail("Cannot find block device %s", disk)
2109   else:
2110     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2111           disk.unique_id, disk.dev_type)
2112
2113
2114 def FinalizeExport(instance, snap_disks):
2115   """Write out the export configuration information.
2116
2117   @type instance: L{objects.Instance}
2118   @param instance: the instance which we export, used for
2119       saving configuration
2120   @type snap_disks: list of L{objects.Disk}
2121   @param snap_disks: list of snapshot block devices, which
2122       will be used to get the actual name of the dump file
2123
2124   @rtype: None
2125
2126   """
2127   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2128   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2129
2130   config = objects.SerializableConfigParser()
2131
2132   config.add_section(constants.INISECT_EXP)
2133   config.set(constants.INISECT_EXP, 'version', '0')
2134   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2135   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2136   config.set(constants.INISECT_EXP, 'os', instance.os)
2137   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2138
2139   config.add_section(constants.INISECT_INS)
2140   config.set(constants.INISECT_INS, 'name', instance.name)
2141   config.set(constants.INISECT_INS, 'memory', '%d' %
2142              instance.beparams[constants.BE_MEMORY])
2143   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2144              instance.beparams[constants.BE_VCPUS])
2145   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2146   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2147
2148   nic_total = 0
2149   for nic_count, nic in enumerate(instance.nics):
2150     nic_total += 1
2151     config.set(constants.INISECT_INS, 'nic%d_mac' %
2152                nic_count, '%s' % nic.mac)
2153     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2154     for param in constants.NICS_PARAMETER_TYPES:
2155       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2156                  '%s' % nic.nicparams.get(param, None))
2157   # TODO: redundant: on load can read nics until it doesn't exist
2158   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2159
2160   disk_total = 0
2161   for disk_count, disk in enumerate(snap_disks):
2162     if disk:
2163       disk_total += 1
2164       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2165                  ('%s' % disk.iv_name))
2166       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2167                  ('%s' % disk.physical_id[1]))
2168       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2169                  ('%d' % disk.size))
2170
2171   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2172
2173   # New-style hypervisor/backend parameters
2174
2175   config.add_section(constants.INISECT_HYP)
2176   for name, value in instance.hvparams.items():
2177     if name not in constants.HVC_GLOBALS:
2178       config.set(constants.INISECT_HYP, name, str(value))
2179
2180   config.add_section(constants.INISECT_BEP)
2181   for name, value in instance.beparams.items():
2182     config.set(constants.INISECT_BEP, name, str(value))
2183
2184   config.add_section(constants.INISECT_OSP)
2185   for name, value in instance.osparams.items():
2186     config.set(constants.INISECT_OSP, name, str(value))
2187
2188   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2189                   data=config.Dumps())
2190   shutil.rmtree(finaldestdir, ignore_errors=True)
2191   shutil.move(destdir, finaldestdir)
2192
2193
2194 def ExportInfo(dest):
2195   """Get export configuration information.
2196
2197   @type dest: str
2198   @param dest: directory containing the export
2199
2200   @rtype: L{objects.SerializableConfigParser}
2201   @return: a serializable config file containing the
2202       export info
2203
2204   """
2205   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2206
2207   config = objects.SerializableConfigParser()
2208   config.read(cff)
2209
2210   if (not config.has_section(constants.INISECT_EXP) or
2211       not config.has_section(constants.INISECT_INS)):
2212     _Fail("Export info file doesn't have the required fields")
2213
2214   return config.Dumps()
2215
2216
2217 def ListExports():
2218   """Return a list of exports currently available on this machine.
2219
2220   @rtype: list
2221   @return: list of the exports
2222
2223   """
2224   if os.path.isdir(constants.EXPORT_DIR):
2225     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2226   else:
2227     _Fail("No exports directory")
2228
2229
2230 def RemoveExport(export):
2231   """Remove an existing export from the node.
2232
2233   @type export: str
2234   @param export: the name of the export to remove
2235   @rtype: None
2236
2237   """
2238   target = utils.PathJoin(constants.EXPORT_DIR, export)
2239
2240   try:
2241     shutil.rmtree(target)
2242   except EnvironmentError, err:
2243     _Fail("Error while removing the export: %s", err, exc=True)
2244
2245
2246 def BlockdevRename(devlist):
2247   """Rename a list of block devices.
2248
2249   @type devlist: list of tuples
2250   @param devlist: list of tuples of the form  (disk,
2251       new_logical_id, new_physical_id); disk is an
2252       L{objects.Disk} object describing the current disk,
2253       and new logical_id/physical_id is the name we
2254       rename it to
2255   @rtype: boolean
2256   @return: True if all renames succeeded, False otherwise
2257
2258   """
2259   msgs = []
2260   result = True
2261   for disk, unique_id in devlist:
2262     dev = _RecursiveFindBD(disk)
2263     if dev is None:
2264       msgs.append("Can't find device %s in rename" % str(disk))
2265       result = False
2266       continue
2267     try:
2268       old_rpath = dev.dev_path
2269       dev.Rename(unique_id)
2270       new_rpath = dev.dev_path
2271       if old_rpath != new_rpath:
2272         DevCacheManager.RemoveCache(old_rpath)
2273         # FIXME: we should add the new cache information here, like:
2274         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2275         # but we don't have the owner here - maybe parse from existing
2276         # cache? for now, we only lose lvm data when we rename, which
2277         # is less critical than DRBD or MD
2278     except errors.BlockDeviceError, err:
2279       msgs.append("Can't rename device '%s' to '%s': %s" %
2280                   (dev, unique_id, err))
2281       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2282       result = False
2283   if not result:
2284     _Fail("; ".join(msgs))
2285
2286
2287 def _TransformFileStorageDir(file_storage_dir):
2288   """Checks whether given file_storage_dir is valid.
2289
2290   Checks wheter the given file_storage_dir is within the cluster-wide
2291   default file_storage_dir stored in SimpleStore. Only paths under that
2292   directory are allowed.
2293
2294   @type file_storage_dir: str
2295   @param file_storage_dir: the path to check
2296
2297   @return: the normalized path if valid, None otherwise
2298
2299   """
2300   if not constants.ENABLE_FILE_STORAGE:
2301     _Fail("File storage disabled at configure time")
2302   cfg = _GetConfig()
2303   file_storage_dir = os.path.normpath(file_storage_dir)
2304   base_file_storage_dir = cfg.GetFileStorageDir()
2305   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2306       base_file_storage_dir):
2307     _Fail("File storage directory '%s' is not under base file"
2308           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2309   return file_storage_dir
2310
2311
2312 def CreateFileStorageDir(file_storage_dir):
2313   """Create file storage directory.
2314
2315   @type file_storage_dir: str
2316   @param file_storage_dir: directory to create
2317
2318   @rtype: tuple
2319   @return: tuple with first element a boolean indicating wheter dir
2320       creation was successful or not
2321
2322   """
2323   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2324   if os.path.exists(file_storage_dir):
2325     if not os.path.isdir(file_storage_dir):
2326       _Fail("Specified storage dir '%s' is not a directory",
2327             file_storage_dir)
2328   else:
2329     try:
2330       os.makedirs(file_storage_dir, 0750)
2331     except OSError, err:
2332       _Fail("Cannot create file storage directory '%s': %s",
2333             file_storage_dir, err, exc=True)
2334
2335
2336 def RemoveFileStorageDir(file_storage_dir):
2337   """Remove file storage directory.
2338
2339   Remove it only if it's empty. If not log an error and return.
2340
2341   @type file_storage_dir: str
2342   @param file_storage_dir: the directory we should cleanup
2343   @rtype: tuple (success,)
2344   @return: tuple of one element, C{success}, denoting
2345       whether the operation was successful
2346
2347   """
2348   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2349   if os.path.exists(file_storage_dir):
2350     if not os.path.isdir(file_storage_dir):
2351       _Fail("Specified Storage directory '%s' is not a directory",
2352             file_storage_dir)
2353     # deletes dir only if empty, otherwise we want to fail the rpc call
2354     try:
2355       os.rmdir(file_storage_dir)
2356     except OSError, err:
2357       _Fail("Cannot remove file storage directory '%s': %s",
2358             file_storage_dir, err)
2359
2360
2361 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2362   """Rename the file storage directory.
2363
2364   @type old_file_storage_dir: str
2365   @param old_file_storage_dir: the current path
2366   @type new_file_storage_dir: str
2367   @param new_file_storage_dir: the name we should rename to
2368   @rtype: tuple (success,)
2369   @return: tuple of one element, C{success}, denoting
2370       whether the operation was successful
2371
2372   """
2373   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2374   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2375   if not os.path.exists(new_file_storage_dir):
2376     if os.path.isdir(old_file_storage_dir):
2377       try:
2378         os.rename(old_file_storage_dir, new_file_storage_dir)
2379       except OSError, err:
2380         _Fail("Cannot rename '%s' to '%s': %s",
2381               old_file_storage_dir, new_file_storage_dir, err)
2382     else:
2383       _Fail("Specified storage dir '%s' is not a directory",
2384             old_file_storage_dir)
2385   else:
2386     if os.path.exists(old_file_storage_dir):
2387       _Fail("Cannot rename '%s' to '%s': both locations exist",
2388             old_file_storage_dir, new_file_storage_dir)
2389
2390
2391 def _EnsureJobQueueFile(file_name):
2392   """Checks whether the given filename is in the queue directory.
2393
2394   @type file_name: str
2395   @param file_name: the file name we should check
2396   @rtype: None
2397   @raises RPCFail: if the file is not valid
2398
2399   """
2400   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2401   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2402
2403   if not result:
2404     _Fail("Passed job queue file '%s' does not belong to"
2405           " the queue directory '%s'", file_name, queue_dir)
2406
2407
2408 def JobQueueUpdate(file_name, content):
2409   """Updates a file in the queue directory.
2410
2411   This is just a wrapper over L{utils.WriteFile}, with proper
2412   checking.
2413
2414   @type file_name: str
2415   @param file_name: the job file name
2416   @type content: str
2417   @param content: the new job contents
2418   @rtype: boolean
2419   @return: the success of the operation
2420
2421   """
2422   _EnsureJobQueueFile(file_name)
2423   getents = runtime.GetEnts()
2424
2425   # Write and replace the file atomically
2426   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2427                   gid=getents.masterd_gid)
2428
2429
2430 def JobQueueRename(old, new):
2431   """Renames a job queue file.
2432
2433   This is just a wrapper over os.rename with proper checking.
2434
2435   @type old: str
2436   @param old: the old (actual) file name
2437   @type new: str
2438   @param new: the desired file name
2439   @rtype: tuple
2440   @return: the success of the operation and payload
2441
2442   """
2443   _EnsureJobQueueFile(old)
2444   _EnsureJobQueueFile(new)
2445
2446   utils.RenameFile(old, new, mkdir=True)
2447
2448
2449 def BlockdevClose(instance_name, disks):
2450   """Closes the given block devices.
2451
2452   This means they will be switched to secondary mode (in case of
2453   DRBD).
2454
2455   @param instance_name: if the argument is not empty, the symlinks
2456       of this instance will be removed
2457   @type disks: list of L{objects.Disk}
2458   @param disks: the list of disks to be closed
2459   @rtype: tuple (success, message)
2460   @return: a tuple of success and message, where success
2461       indicates the succes of the operation, and message
2462       which will contain the error details in case we
2463       failed
2464
2465   """
2466   bdevs = []
2467   for cf in disks:
2468     rd = _RecursiveFindBD(cf)
2469     if rd is None:
2470       _Fail("Can't find device %s", cf)
2471     bdevs.append(rd)
2472
2473   msg = []
2474   for rd in bdevs:
2475     try:
2476       rd.Close()
2477     except errors.BlockDeviceError, err:
2478       msg.append(str(err))
2479   if msg:
2480     _Fail("Can't make devices secondary: %s", ",".join(msg))
2481   else:
2482     if instance_name:
2483       _RemoveBlockDevLinks(instance_name, disks)
2484
2485
2486 def ValidateHVParams(hvname, hvparams):
2487   """Validates the given hypervisor parameters.
2488
2489   @type hvname: string
2490   @param hvname: the hypervisor name
2491   @type hvparams: dict
2492   @param hvparams: the hypervisor parameters to be validated
2493   @rtype: None
2494
2495   """
2496   try:
2497     hv_type = hypervisor.GetHypervisor(hvname)
2498     hv_type.ValidateParameters(hvparams)
2499   except errors.HypervisorError, err:
2500     _Fail(str(err), log=False)
2501
2502
2503 def _CheckOSPList(os_obj, parameters):
2504   """Check whether a list of parameters is supported by the OS.
2505
2506   @type os_obj: L{objects.OS}
2507   @param os_obj: OS object to check
2508   @type parameters: list
2509   @param parameters: the list of parameters to check
2510
2511   """
2512   supported = [v[0] for v in os_obj.supported_parameters]
2513   delta = frozenset(parameters).difference(supported)
2514   if delta:
2515     _Fail("The following parameters are not supported"
2516           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2517
2518
2519 def ValidateOS(required, osname, checks, osparams):
2520   """Validate the given OS' parameters.
2521
2522   @type required: boolean
2523   @param required: whether absence of the OS should translate into
2524       failure or not
2525   @type osname: string
2526   @param osname: the OS to be validated
2527   @type checks: list
2528   @param checks: list of the checks to run (currently only 'parameters')
2529   @type osparams: dict
2530   @param osparams: dictionary with OS parameters
2531   @rtype: boolean
2532   @return: True if the validation passed, or False if the OS was not
2533       found and L{required} was false
2534
2535   """
2536   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2537     _Fail("Unknown checks required for OS %s: %s", osname,
2538           set(checks).difference(constants.OS_VALIDATE_CALLS))
2539
2540   name_only = objects.OS.GetName(osname)
2541   status, tbv = _TryOSFromDisk(name_only, None)
2542
2543   if not status:
2544     if required:
2545       _Fail(tbv)
2546     else:
2547       return False
2548
2549   if max(tbv.api_versions) < constants.OS_API_V20:
2550     return True
2551
2552   if constants.OS_VALIDATE_PARAMETERS in checks:
2553     _CheckOSPList(tbv, osparams.keys())
2554
2555   validate_env = OSCoreEnv(osname, tbv, osparams)
2556   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2557                         cwd=tbv.path)
2558   if result.failed:
2559     logging.error("os validate command '%s' returned error: %s output: %s",
2560                   result.cmd, result.fail_reason, result.output)
2561     _Fail("OS validation script failed (%s), output: %s",
2562           result.fail_reason, result.output, log=False)
2563
2564   return True
2565
2566
2567 def DemoteFromMC():
2568   """Demotes the current node from master candidate role.
2569
2570   """
2571   # try to ensure we're not the master by mistake
2572   master, myself = ssconf.GetMasterAndMyself()
2573   if master == myself:
2574     _Fail("ssconf status shows I'm the master node, will not demote")
2575
2576   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2577   if not result.failed:
2578     _Fail("The master daemon is running, will not demote")
2579
2580   try:
2581     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2582       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2583   except EnvironmentError, err:
2584     if err.errno != errno.ENOENT:
2585       _Fail("Error while backing up cluster file: %s", err, exc=True)
2586
2587   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2588
2589
2590 def _GetX509Filenames(cryptodir, name):
2591   """Returns the full paths for the private key and certificate.
2592
2593   """
2594   return (utils.PathJoin(cryptodir, name),
2595           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2596           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2597
2598
2599 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2600   """Creates a new X509 certificate for SSL/TLS.
2601
2602   @type validity: int
2603   @param validity: Validity in seconds
2604   @rtype: tuple; (string, string)
2605   @return: Certificate name and public part
2606
2607   """
2608   (key_pem, cert_pem) = \
2609     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2610                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2611
2612   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2613                               prefix="x509-%s-" % utils.TimestampForFilename())
2614   try:
2615     name = os.path.basename(cert_dir)
2616     assert len(name) > 5
2617
2618     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2619
2620     utils.WriteFile(key_file, mode=0400, data=key_pem)
2621     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2622
2623     # Never return private key as it shouldn't leave the node
2624     return (name, cert_pem)
2625   except Exception:
2626     shutil.rmtree(cert_dir, ignore_errors=True)
2627     raise
2628
2629
2630 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2631   """Removes a X509 certificate.
2632
2633   @type name: string
2634   @param name: Certificate name
2635
2636   """
2637   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2638
2639   utils.RemoveFile(key_file)
2640   utils.RemoveFile(cert_file)
2641
2642   try:
2643     os.rmdir(cert_dir)
2644   except EnvironmentError, err:
2645     _Fail("Cannot remove certificate directory '%s': %s",
2646           cert_dir, err)
2647
2648
2649 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2650   """Returns the command for the requested input/output.
2651
2652   @type instance: L{objects.Instance}
2653   @param instance: The instance object
2654   @param mode: Import/export mode
2655   @param ieio: Input/output type
2656   @param ieargs: Input/output arguments
2657
2658   """
2659   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2660
2661   env = None
2662   prefix = None
2663   suffix = None
2664   exp_size = None
2665
2666   if ieio == constants.IEIO_FILE:
2667     (filename, ) = ieargs
2668
2669     if not utils.IsNormAbsPath(filename):
2670       _Fail("Path '%s' is not normalized or absolute", filename)
2671
2672     directory = os.path.normpath(os.path.dirname(filename))
2673
2674     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2675         constants.EXPORT_DIR):
2676       _Fail("File '%s' is not under exports directory '%s'",
2677             filename, constants.EXPORT_DIR)
2678
2679     # Create directory
2680     utils.Makedirs(directory, mode=0750)
2681
2682     quoted_filename = utils.ShellQuote(filename)
2683
2684     if mode == constants.IEM_IMPORT:
2685       suffix = "> %s" % quoted_filename
2686     elif mode == constants.IEM_EXPORT:
2687       suffix = "< %s" % quoted_filename
2688
2689       # Retrieve file size
2690       try:
2691         st = os.stat(filename)
2692       except EnvironmentError, err:
2693         logging.error("Can't stat(2) %s: %s", filename, err)
2694       else:
2695         exp_size = utils.BytesToMebibyte(st.st_size)
2696
2697   elif ieio == constants.IEIO_RAW_DISK:
2698     (disk, ) = ieargs
2699
2700     real_disk = _OpenRealBD(disk)
2701
2702     if mode == constants.IEM_IMPORT:
2703       # we set here a smaller block size as, due to transport buffering, more
2704       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2705       # is not already there or we pass a wrong path; we use notrunc to no
2706       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2707       # much memory; this means that at best, we flush every 64k, which will
2708       # not be very fast
2709       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2710                                     " bs=%s oflag=dsync"),
2711                                     real_disk.dev_path,
2712                                     str(64 * 1024))
2713
2714     elif mode == constants.IEM_EXPORT:
2715       # the block size on the read dd is 1MiB to match our units
2716       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2717                                    real_disk.dev_path,
2718                                    str(1024 * 1024), # 1 MB
2719                                    str(disk.size))
2720       exp_size = disk.size
2721
2722   elif ieio == constants.IEIO_SCRIPT:
2723     (disk, disk_index, ) = ieargs
2724
2725     assert isinstance(disk_index, (int, long))
2726
2727     real_disk = _OpenRealBD(disk)
2728
2729     inst_os = OSFromDisk(instance.os)
2730     env = OSEnvironment(instance, inst_os)
2731
2732     if mode == constants.IEM_IMPORT:
2733       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2734       env["IMPORT_INDEX"] = str(disk_index)
2735       script = inst_os.import_script
2736
2737     elif mode == constants.IEM_EXPORT:
2738       env["EXPORT_DEVICE"] = real_disk.dev_path
2739       env["EXPORT_INDEX"] = str(disk_index)
2740       script = inst_os.export_script
2741
2742     # TODO: Pass special environment only to script
2743     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2744
2745     if mode == constants.IEM_IMPORT:
2746       suffix = "| %s" % script_cmd
2747
2748     elif mode == constants.IEM_EXPORT:
2749       prefix = "%s |" % script_cmd
2750
2751     # Let script predict size
2752     exp_size = constants.IE_CUSTOM_SIZE
2753
2754   else:
2755     _Fail("Invalid %s I/O mode %r", mode, ieio)
2756
2757   return (env, prefix, suffix, exp_size)
2758
2759
2760 def _CreateImportExportStatusDir(prefix):
2761   """Creates status directory for import/export.
2762
2763   """
2764   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2765                           prefix=("%s-%s-" %
2766                                   (prefix, utils.TimestampForFilename())))
2767
2768
2769 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2770   """Starts an import or export daemon.
2771
2772   @param mode: Import/output mode
2773   @type opts: L{objects.ImportExportOptions}
2774   @param opts: Daemon options
2775   @type host: string
2776   @param host: Remote host for export (None for import)
2777   @type port: int
2778   @param port: Remote port for export (None for import)
2779   @type instance: L{objects.Instance}
2780   @param instance: Instance object
2781   @param ieio: Input/output type
2782   @param ieioargs: Input/output arguments
2783
2784   """
2785   if mode == constants.IEM_IMPORT:
2786     prefix = "import"
2787
2788     if not (host is None and port is None):
2789       _Fail("Can not specify host or port on import")
2790
2791   elif mode == constants.IEM_EXPORT:
2792     prefix = "export"
2793
2794     if host is None or port is None:
2795       _Fail("Host and port must be specified for an export")
2796
2797   else:
2798     _Fail("Invalid mode %r", mode)
2799
2800   if (opts.key_name is None) ^ (opts.ca_pem is None):
2801     _Fail("Cluster certificate can only be used for both key and CA")
2802
2803   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2804     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2805
2806   if opts.key_name is None:
2807     # Use server.pem
2808     key_path = constants.NODED_CERT_FILE
2809     cert_path = constants.NODED_CERT_FILE
2810     assert opts.ca_pem is None
2811   else:
2812     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2813                                                  opts.key_name)
2814     assert opts.ca_pem is not None
2815
2816   for i in [key_path, cert_path]:
2817     if not os.path.exists(i):
2818       _Fail("File '%s' does not exist" % i)
2819
2820   status_dir = _CreateImportExportStatusDir(prefix)
2821   try:
2822     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2823     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2824     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2825
2826     if opts.ca_pem is None:
2827       # Use server.pem
2828       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2829     else:
2830       ca = opts.ca_pem
2831
2832     # Write CA file
2833     utils.WriteFile(ca_file, data=ca, mode=0400)
2834
2835     cmd = [
2836       constants.IMPORT_EXPORT_DAEMON,
2837       status_file, mode,
2838       "--key=%s" % key_path,
2839       "--cert=%s" % cert_path,
2840       "--ca=%s" % ca_file,
2841       ]
2842
2843     if host:
2844       cmd.append("--host=%s" % host)
2845
2846     if port:
2847       cmd.append("--port=%s" % port)
2848
2849     if opts.compress:
2850       cmd.append("--compress=%s" % opts.compress)
2851
2852     if opts.magic:
2853       cmd.append("--magic=%s" % opts.magic)
2854
2855     if exp_size is not None:
2856       cmd.append("--expected-size=%s" % exp_size)
2857
2858     if cmd_prefix:
2859       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2860
2861     if cmd_suffix:
2862       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2863
2864     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2865
2866     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2867     # support for receiving a file descriptor for output
2868     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2869                       output=logfile)
2870
2871     # The import/export name is simply the status directory name
2872     return os.path.basename(status_dir)
2873
2874   except Exception:
2875     shutil.rmtree(status_dir, ignore_errors=True)
2876     raise
2877
2878
2879 def GetImportExportStatus(names):
2880   """Returns import/export daemon status.
2881
2882   @type names: sequence
2883   @param names: List of names
2884   @rtype: List of dicts
2885   @return: Returns a list of the state of each named import/export or None if a
2886            status couldn't be read
2887
2888   """
2889   result = []
2890
2891   for name in names:
2892     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2893                                  _IES_STATUS_FILE)
2894
2895     try:
2896       data = utils.ReadFile(status_file)
2897     except EnvironmentError, err:
2898       if err.errno != errno.ENOENT:
2899         raise
2900       data = None
2901
2902     if not data:
2903       result.append(None)
2904       continue
2905
2906     result.append(serializer.LoadJson(data))
2907
2908   return result
2909
2910
2911 def AbortImportExport(name):
2912   """Sends SIGTERM to a running import/export daemon.
2913
2914   """
2915   logging.info("Abort import/export %s", name)
2916
2917   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2918   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2919
2920   if pid:
2921     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2922                  name, pid)
2923     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2924
2925
2926 def CleanupImportExport(name):
2927   """Cleanup after an import or export.
2928
2929   If the import/export daemon is still running it's killed. Afterwards the
2930   whole status directory is removed.
2931
2932   """
2933   logging.info("Finalizing import/export %s", name)
2934
2935   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2936
2937   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2938
2939   if pid:
2940     logging.info("Import/export %s is still running with PID %s",
2941                  name, pid)
2942     utils.KillProcess(pid, waitpid=False)
2943
2944   shutil.rmtree(status_dir, ignore_errors=True)
2945
2946
2947 def _FindDisks(nodes_ip, disks):
2948   """Sets the physical ID on disks and returns the block devices.
2949
2950   """
2951   # set the correct physical ID
2952   my_name = netutils.Hostname.GetSysName()
2953   for cf in disks:
2954     cf.SetPhysicalID(my_name, nodes_ip)
2955
2956   bdevs = []
2957
2958   for cf in disks:
2959     rd = _RecursiveFindBD(cf)
2960     if rd is None:
2961       _Fail("Can't find device %s", cf)
2962     bdevs.append(rd)
2963   return bdevs
2964
2965
2966 def DrbdDisconnectNet(nodes_ip, disks):
2967   """Disconnects the network on a list of drbd devices.
2968
2969   """
2970   bdevs = _FindDisks(nodes_ip, disks)
2971
2972   # disconnect disks
2973   for rd in bdevs:
2974     try:
2975       rd.DisconnectNet()
2976     except errors.BlockDeviceError, err:
2977       _Fail("Can't change network configuration to standalone mode: %s",
2978             err, exc=True)
2979
2980
2981 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2982   """Attaches the network on a list of drbd devices.
2983
2984   """
2985   bdevs = _FindDisks(nodes_ip, disks)
2986
2987   if multimaster:
2988     for idx, rd in enumerate(bdevs):
2989       try:
2990         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2991       except EnvironmentError, err:
2992         _Fail("Can't create symlink: %s", err)
2993   # reconnect disks, switch to new master configuration and if
2994   # needed primary mode
2995   for rd in bdevs:
2996     try:
2997       rd.AttachNet(multimaster)
2998     except errors.BlockDeviceError, err:
2999       _Fail("Can't change network configuration: %s", err)
3000
3001   # wait until the disks are connected; we need to retry the re-attach
3002   # if the device becomes standalone, as this might happen if the one
3003   # node disconnects and reconnects in a different mode before the
3004   # other node reconnects; in this case, one or both of the nodes will
3005   # decide it has wrong configuration and switch to standalone
3006
3007   def _Attach():
3008     all_connected = True
3009
3010     for rd in bdevs:
3011       stats = rd.GetProcStatus()
3012
3013       all_connected = (all_connected and
3014                        (stats.is_connected or stats.is_in_resync))
3015
3016       if stats.is_standalone:
3017         # peer had different config info and this node became
3018         # standalone, even though this should not happen with the
3019         # new staged way of changing disk configs
3020         try:
3021           rd.AttachNet(multimaster)
3022         except errors.BlockDeviceError, err:
3023           _Fail("Can't change network configuration: %s", err)
3024
3025     if not all_connected:
3026       raise utils.RetryAgain()
3027
3028   try:
3029     # Start with a delay of 100 miliseconds and go up to 5 seconds
3030     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3031   except utils.RetryTimeout:
3032     _Fail("Timeout in disk reconnecting")
3033
3034   if multimaster:
3035     # change to primary mode
3036     for rd in bdevs:
3037       try:
3038         rd.Open()
3039       except errors.BlockDeviceError, err:
3040         _Fail("Can't change to primary mode: %s", err)
3041
3042
3043 def DrbdWaitSync(nodes_ip, disks):
3044   """Wait until DRBDs have synchronized.
3045
3046   """
3047   def _helper(rd):
3048     stats = rd.GetProcStatus()
3049     if not (stats.is_connected or stats.is_in_resync):
3050       raise utils.RetryAgain()
3051     return stats
3052
3053   bdevs = _FindDisks(nodes_ip, disks)
3054
3055   min_resync = 100
3056   alldone = True
3057   for rd in bdevs:
3058     try:
3059       # poll each second for 15 seconds
3060       stats = utils.Retry(_helper, 1, 15, args=[rd])
3061     except utils.RetryTimeout:
3062       stats = rd.GetProcStatus()
3063       # last check
3064       if not (stats.is_connected or stats.is_in_resync):
3065         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3066     alldone = alldone and (not stats.is_in_resync)
3067     if stats.sync_percent is not None:
3068       min_resync = min(min_resync, stats.sync_percent)
3069
3070   return (alldone, min_resync)
3071
3072
3073 def GetDrbdUsermodeHelper():
3074   """Returns DRBD usermode helper currently configured.
3075
3076   """
3077   try:
3078     return bdev.BaseDRBD.GetUsermodeHelper()
3079   except errors.BlockDeviceError, err:
3080     _Fail(str(err))
3081
3082
3083 def PowercycleNode(hypervisor_type):
3084   """Hard-powercycle the node.
3085
3086   Because we need to return first, and schedule the powercycle in the
3087   background, we won't be able to report failures nicely.
3088
3089   """
3090   hyper = hypervisor.GetHypervisor(hypervisor_type)
3091   try:
3092     pid = os.fork()
3093   except OSError:
3094     # if we can't fork, we'll pretend that we're in the child process
3095     pid = 0
3096   if pid > 0:
3097     return "Reboot scheduled in 5 seconds"
3098   # ensure the child is running on ram
3099   try:
3100     utils.Mlockall()
3101   except Exception: # pylint: disable-msg=W0703
3102     pass
3103   time.sleep(5)
3104   hyper.PowercycleNode()
3105
3106
3107 class HooksRunner(object):
3108   """Hook runner.
3109
3110   This class is instantiated on the node side (ganeti-noded) and not
3111   on the master side.
3112
3113   """
3114   def __init__(self, hooks_base_dir=None):
3115     """Constructor for hooks runner.
3116
3117     @type hooks_base_dir: str or None
3118     @param hooks_base_dir: if not None, this overrides the
3119         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3120
3121     """
3122     if hooks_base_dir is None:
3123       hooks_base_dir = constants.HOOKS_BASE_DIR
3124     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3125     # constant
3126     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3127
3128   def RunHooks(self, hpath, phase, env):
3129     """Run the scripts in the hooks directory.
3130
3131     @type hpath: str
3132     @param hpath: the path to the hooks directory which
3133         holds the scripts
3134     @type phase: str
3135     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3136         L{constants.HOOKS_PHASE_POST}
3137     @type env: dict
3138     @param env: dictionary with the environment for the hook
3139     @rtype: list
3140     @return: list of 3-element tuples:
3141       - script path
3142       - script result, either L{constants.HKR_SUCCESS} or
3143         L{constants.HKR_FAIL}
3144       - output of the script
3145
3146     @raise errors.ProgrammerError: for invalid input
3147         parameters
3148
3149     """
3150     if phase == constants.HOOKS_PHASE_PRE:
3151       suffix = "pre"
3152     elif phase == constants.HOOKS_PHASE_POST:
3153       suffix = "post"
3154     else:
3155       _Fail("Unknown hooks phase '%s'", phase)
3156
3157
3158     subdir = "%s-%s.d" % (hpath, suffix)
3159     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3160
3161     results = []
3162
3163     if not os.path.isdir(dir_name):
3164       # for non-existing/non-dirs, we simply exit instead of logging a
3165       # warning at every operation
3166       return results
3167
3168     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3169
3170     for (relname, relstatus, runresult)  in runparts_results:
3171       if relstatus == constants.RUNPARTS_SKIP:
3172         rrval = constants.HKR_SKIP
3173         output = ""
3174       elif relstatus == constants.RUNPARTS_ERR:
3175         rrval = constants.HKR_FAIL
3176         output = "Hook script execution error: %s" % runresult
3177       elif relstatus == constants.RUNPARTS_RUN:
3178         if runresult.failed:
3179           rrval = constants.HKR_FAIL
3180         else:
3181           rrval = constants.HKR_SUCCESS
3182         output = utils.SafeEncode(runresult.output.strip())
3183       results.append(("%s/%s" % (subdir, relname), rrval, output))
3184
3185     return results
3186
3187
3188 class IAllocatorRunner(object):
3189   """IAllocator runner.
3190
3191   This class is instantiated on the node side (ganeti-noded) and not on
3192   the master side.
3193
3194   """
3195   @staticmethod
3196   def Run(name, idata):
3197     """Run an iallocator script.
3198
3199     @type name: str
3200     @param name: the iallocator script name
3201     @type idata: str
3202     @param idata: the allocator input data
3203
3204     @rtype: tuple
3205     @return: two element tuple of:
3206        - status
3207        - either error message or stdout of allocator (for success)
3208
3209     """
3210     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3211                                   os.path.isfile)
3212     if alloc_script is None:
3213       _Fail("iallocator module '%s' not found in the search path", name)
3214
3215     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3216     try:
3217       os.write(fd, idata)
3218       os.close(fd)
3219       result = utils.RunCmd([alloc_script, fin_name])
3220       if result.failed:
3221         _Fail("iallocator module '%s' failed: %s, output '%s'",
3222               name, result.fail_reason, result.output)
3223     finally:
3224       os.unlink(fin_name)
3225
3226     return result.stdout
3227
3228
3229 class DevCacheManager(object):
3230   """Simple class for managing a cache of block device information.
3231
3232   """
3233   _DEV_PREFIX = "/dev/"
3234   _ROOT_DIR = constants.BDEV_CACHE_DIR
3235
3236   @classmethod
3237   def _ConvertPath(cls, dev_path):
3238     """Converts a /dev/name path to the cache file name.
3239
3240     This replaces slashes with underscores and strips the /dev
3241     prefix. It then returns the full path to the cache file.
3242
3243     @type dev_path: str
3244     @param dev_path: the C{/dev/} path name
3245     @rtype: str
3246     @return: the converted path name
3247
3248     """
3249     if dev_path.startswith(cls._DEV_PREFIX):
3250       dev_path = dev_path[len(cls._DEV_PREFIX):]
3251     dev_path = dev_path.replace("/", "_")
3252     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3253     return fpath
3254
3255   @classmethod
3256   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3257     """Updates the cache information for a given device.
3258
3259     @type dev_path: str
3260     @param dev_path: the pathname of the device
3261     @type owner: str
3262     @param owner: the owner (instance name) of the device
3263     @type on_primary: bool
3264     @param on_primary: whether this is the primary
3265         node nor not
3266     @type iv_name: str
3267     @param iv_name: the instance-visible name of the
3268         device, as in objects.Disk.iv_name
3269
3270     @rtype: None
3271
3272     """
3273     if dev_path is None:
3274       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3275       return
3276     fpath = cls._ConvertPath(dev_path)
3277     if on_primary:
3278       state = "primary"
3279     else:
3280       state = "secondary"
3281     if iv_name is None:
3282       iv_name = "not_visible"
3283     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3284     try:
3285       utils.WriteFile(fpath, data=fdata)
3286     except EnvironmentError, err:
3287       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3288
3289   @classmethod
3290   def RemoveCache(cls, dev_path):
3291     """Remove data for a dev_path.
3292
3293     This is just a wrapper over L{utils.RemoveFile} with a converted
3294     path name and logging.
3295
3296     @type dev_path: str
3297     @param dev_path: the pathname of the device
3298
3299     @rtype: None
3300
3301     """
3302     if dev_path is None:
3303       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3304       return
3305     fpath = cls._ConvertPath(dev_path)
3306     try:
3307       utils.RemoveFile(fpath)
3308     except EnvironmentError, err:
3309       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)