Second iteration over backend.BlockdevWipe
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79
80 class RPCFail(Exception):
81   """Class denoting RPC failure.
82
83   Its argument is the error message.
84
85   """
86
87
88 def _Fail(msg, *args, **kwargs):
89   """Log an error and the raise an RPCFail exception.
90
91   This exception is then handled specially in the ganeti daemon and
92   turned into a 'failed' return type. As such, this function is a
93   useful shortcut for logging the error and returning it to the master
94   daemon.
95
96   @type msg: string
97   @param msg: the text of the exception
98   @raise RPCFail
99
100   """
101   if args:
102     msg = msg % args
103   if "log" not in kwargs or kwargs["log"]: # if we should log this error
104     if "exc" in kwargs and kwargs["exc"]:
105       logging.exception(msg)
106     else:
107       logging.error(msg)
108   raise RPCFail(msg)
109
110
111 def _GetConfig():
112   """Simple wrapper to return a SimpleStore.
113
114   @rtype: L{ssconf.SimpleStore}
115   @return: a SimpleStore instance
116
117   """
118   return ssconf.SimpleStore()
119
120
121 def _GetSshRunner(cluster_name):
122   """Simple wrapper to return an SshRunner.
123
124   @type cluster_name: str
125   @param cluster_name: the cluster name, which is needed
126       by the SshRunner constructor
127   @rtype: L{ssh.SshRunner}
128   @return: an SshRunner instance
129
130   """
131   return ssh.SshRunner(cluster_name)
132
133
134 def _Decompress(data):
135   """Unpacks data compressed by the RPC client.
136
137   @type data: list or tuple
138   @param data: Data sent by RPC client
139   @rtype: str
140   @return: Decompressed data
141
142   """
143   assert isinstance(data, (list, tuple))
144   assert len(data) == 2
145   (encoding, content) = data
146   if encoding == constants.RPC_ENCODING_NONE:
147     return content
148   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149     return zlib.decompress(base64.b64decode(content))
150   else:
151     raise AssertionError("Unknown data encoding")
152
153
154 def _CleanDirectory(path, exclude=None):
155   """Removes all regular files in a directory.
156
157   @type path: str
158   @param path: the directory to clean
159   @type exclude: list
160   @param exclude: list of files to be excluded, defaults
161       to the empty list
162
163   """
164   if path not in _ALLOWED_CLEAN_DIRS:
165     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
166           path)
167
168   if not os.path.isdir(path):
169     return
170   if exclude is None:
171     exclude = []
172   else:
173     # Normalize excluded paths
174     exclude = [os.path.normpath(i) for i in exclude]
175
176   for rel_name in utils.ListVisibleFiles(path):
177     full_name = utils.PathJoin(path, rel_name)
178     if full_name in exclude:
179       continue
180     if os.path.isfile(full_name) and not os.path.islink(full_name):
181       utils.RemoveFile(full_name)
182
183
184 def _BuildUploadFileList():
185   """Build the list of allowed upload files.
186
187   This is abstracted so that it's built only once at module import time.
188
189   """
190   allowed_files = set([
191     constants.CLUSTER_CONF_FILE,
192     constants.ETC_HOSTS,
193     constants.SSH_KNOWN_HOSTS_FILE,
194     constants.VNC_PASSWORD_FILE,
195     constants.RAPI_CERT_FILE,
196     constants.RAPI_USERS_FILE,
197     constants.CONFD_HMAC_KEY,
198     constants.CLUSTER_DOMAIN_SECRET_FILE,
199     ])
200
201   for hv_name in constants.HYPER_TYPES:
202     hv_class = hypervisor.GetHypervisorClass(hv_name)
203     allowed_files.update(hv_class.GetAncillaryFiles())
204
205   return frozenset(allowed_files)
206
207
208 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
209
210
211 def JobQueuePurge():
212   """Removes job queue files and archived jobs.
213
214   @rtype: tuple
215   @return: True, None
216
217   """
218   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
220
221
222 def GetMasterInfo():
223   """Returns master information.
224
225   This is an utility function to compute master information, either
226   for consumption here or from the node daemon.
227
228   @rtype: tuple
229   @return: master_netdev, master_ip, master_name, primary_ip_family
230   @raise RPCFail: in case of errors
231
232   """
233   try:
234     cfg = _GetConfig()
235     master_netdev = cfg.GetMasterNetdev()
236     master_ip = cfg.GetMasterIP()
237     master_node = cfg.GetMasterNode()
238     primary_ip_family = cfg.GetPrimaryIPFamily()
239   except errors.ConfigurationError, err:
240     _Fail("Cluster configuration incomplete: %s", err, exc=True)
241   return (master_netdev, master_ip, master_node, primary_ip_family)
242
243
244 def StartMaster(start_daemons, no_voting):
245   """Activate local node as master node.
246
247   The function will either try activate the IP address of the master
248   (unless someone else has it) or also start the master daemons, based
249   on the start_daemons parameter.
250
251   @type start_daemons: boolean
252   @param start_daemons: whether to start the master daemons
253       (ganeti-masterd and ganeti-rapi), or (if false) activate the
254       master ip
255   @type no_voting: boolean
256   @param no_voting: whether to start ganeti-masterd without a node vote
257       (if start_daemons is True), but still non-interactively
258   @rtype: None
259
260   """
261   # GetMasterInfo will raise an exception if not able to return data
262   master_netdev, master_ip, _, family = GetMasterInfo()
263
264   err_msgs = []
265   # either start the master and rapi daemons
266   if start_daemons:
267     if no_voting:
268       masterd_args = "--no-voting --yes-do-it"
269     else:
270       masterd_args = ""
271
272     env = {
273       "EXTRA_MASTERD_ARGS": masterd_args,
274       }
275
276     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
277     if result.failed:
278       msg = "Can't start Ganeti master: %s" % result.output
279       logging.error(msg)
280       err_msgs.append(msg)
281   # or activate the IP
282   else:
283     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284       if netutils.IPAddress.Own(master_ip):
285         # we already have the ip:
286         logging.debug("Master IP already configured, doing nothing")
287       else:
288         msg = "Someone else has the master ip, not activating"
289         logging.error(msg)
290         err_msgs.append(msg)
291     else:
292       ipcls = netutils.IP4Address
293       if family == netutils.IP6Address.family:
294         ipcls = netutils.IP6Address
295
296       result = utils.RunCmd(["ip", "address", "add",
297                              "%s/%d" % (master_ip, ipcls.iplen),
298                              "dev", master_netdev, "label",
299                              "%s:0" % master_netdev])
300       if result.failed:
301         msg = "Can't activate master IP: %s" % result.output
302         logging.error(msg)
303         err_msgs.append(msg)
304
305       # we ignore the exit code of the following cmds
306       if ipcls == netutils.IP4Address:
307         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308                       master_ip, master_ip])
309       elif ipcls == netutils.IP6Address:
310         try:
311           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312         except errors.OpExecError:
313           # TODO: Better error reporting
314           logging.warning("Can't execute ndisc6, please install if missing")
315
316   if err_msgs:
317     _Fail("; ".join(err_msgs))
318
319
320 def StopMaster(stop_daemons):
321   """Deactivate this node as master.
322
323   The function will always try to deactivate the IP address of the
324   master. It will also stop the master daemons depending on the
325   stop_daemons parameter.
326
327   @type stop_daemons: boolean
328   @param stop_daemons: whether to also stop the master daemons
329       (ganeti-masterd and ganeti-rapi)
330   @rtype: None
331
332   """
333   # TODO: log and report back to the caller the error failures; we
334   # need to decide in which case we fail the RPC for this
335
336   # GetMasterInfo will raise an exception if not able to return data
337   master_netdev, master_ip, _, family = GetMasterInfo()
338
339   ipcls = netutils.IP4Address
340   if family == netutils.IP6Address.family:
341     ipcls = netutils.IP6Address
342
343   result = utils.RunCmd(["ip", "address", "del",
344                          "%s/%d" % (master_ip, ipcls.iplen),
345                          "dev", master_netdev])
346   if result.failed:
347     logging.error("Can't remove the master IP, error: %s", result.output)
348     # but otherwise ignore the failure
349
350   if stop_daemons:
351     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
352     if result.failed:
353       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
354                     " and error %s",
355                     result.cmd, result.exit_code, result.output)
356
357
358 def EtcHostsModify(mode, host, ip):
359   """Modify a host entry in /etc/hosts.
360
361   @param mode: The mode to operate. Either add or remove entry
362   @param host: The host to operate on
363   @param ip: The ip associated with the entry
364
365   """
366   if mode == constants.ETC_HOSTS_ADD:
367     if not ip:
368       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
369               " present")
370     utils.AddHostToEtcHosts(host, ip)
371   elif mode == constants.ETC_HOSTS_REMOVE:
372     if ip:
373       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374               " parameter is present")
375     utils.RemoveHostFromEtcHosts(host)
376   else:
377     RPCFail("Mode not supported")
378
379
380 def LeaveCluster(modify_ssh_setup):
381   """Cleans up and remove the current node.
382
383   This function cleans up and prepares the current node to be removed
384   from the cluster.
385
386   If processing is successful, then it raises an
387   L{errors.QuitGanetiException} which is used as a special case to
388   shutdown the node daemon.
389
390   @param modify_ssh_setup: boolean
391
392   """
393   _CleanDirectory(constants.DATA_DIR)
394   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
395   JobQueuePurge()
396
397   if modify_ssh_setup:
398     try:
399       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
400
401       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
402
403       utils.RemoveFile(priv_key)
404       utils.RemoveFile(pub_key)
405     except errors.OpExecError:
406       logging.exception("Error while processing ssh files")
407
408   try:
409     utils.RemoveFile(constants.CONFD_HMAC_KEY)
410     utils.RemoveFile(constants.RAPI_CERT_FILE)
411     utils.RemoveFile(constants.NODED_CERT_FILE)
412   except: # pylint: disable-msg=W0702
413     logging.exception("Error while removing cluster secrets")
414
415   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
416   if result.failed:
417     logging.error("Command %s failed with exitcode %s and error %s",
418                   result.cmd, result.exit_code, result.output)
419
420   # Raise a custom exception (handled in ganeti-noded)
421   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
422
423
424 def GetNodeInfo(vgname, hypervisor_type):
425   """Gives back a hash with different information about the node.
426
427   @type vgname: C{string}
428   @param vgname: the name of the volume group to ask for disk space information
429   @type hypervisor_type: C{str}
430   @param hypervisor_type: the name of the hypervisor to ask for
431       memory information
432   @rtype: C{dict}
433   @return: dictionary with the following keys:
434       - vg_size is the size of the configured volume group in MiB
435       - vg_free is the free size of the volume group in MiB
436       - memory_dom0 is the memory allocated for domain0 in MiB
437       - memory_free is the currently available (free) ram in MiB
438       - memory_total is the total number of ram in MiB
439
440   """
441   outputarray = {}
442
443   vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
444   vg_free = vg_size = None
445   if vginfo:
446     vg_free = int(round(vginfo[0][0], 0))
447     vg_size = int(round(vginfo[0][1], 0))
448
449   outputarray['vg_size'] = vg_size
450   outputarray['vg_free'] = vg_free
451
452   hyper = hypervisor.GetHypervisor(hypervisor_type)
453   hyp_info = hyper.GetNodeInfo()
454   if hyp_info is not None:
455     outputarray.update(hyp_info)
456
457   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
458
459   return outputarray
460
461
462 def VerifyNode(what, cluster_name):
463   """Verify the status of the local node.
464
465   Based on the input L{what} parameter, various checks are done on the
466   local node.
467
468   If the I{filelist} key is present, this list of
469   files is checksummed and the file/checksum pairs are returned.
470
471   If the I{nodelist} key is present, we check that we have
472   connectivity via ssh with the target nodes (and check the hostname
473   report).
474
475   If the I{node-net-test} key is present, we check that we have
476   connectivity to the given nodes via both primary IP and, if
477   applicable, secondary IPs.
478
479   @type what: C{dict}
480   @param what: a dictionary of things to check:
481       - filelist: list of files for which to compute checksums
482       - nodelist: list of nodes we should check ssh communication with
483       - node-net-test: list of nodes we should check node daemon port
484         connectivity with
485       - hypervisor: list with hypervisors to run the verify for
486   @rtype: dict
487   @return: a dictionary with the same keys as the input dict, and
488       values representing the result of the checks
489
490   """
491   result = {}
492   my_name = netutils.Hostname.GetSysName()
493   port = netutils.GetDaemonPort(constants.NODED)
494
495   if constants.NV_HYPERVISOR in what:
496     result[constants.NV_HYPERVISOR] = tmp = {}
497     for hv_name in what[constants.NV_HYPERVISOR]:
498       try:
499         val = hypervisor.GetHypervisor(hv_name).Verify()
500       except errors.HypervisorError, err:
501         val = "Error while checking hypervisor: %s" % str(err)
502       tmp[hv_name] = val
503
504   if constants.NV_FILELIST in what:
505     result[constants.NV_FILELIST] = utils.FingerprintFiles(
506       what[constants.NV_FILELIST])
507
508   if constants.NV_NODELIST in what:
509     result[constants.NV_NODELIST] = tmp = {}
510     random.shuffle(what[constants.NV_NODELIST])
511     for node in what[constants.NV_NODELIST]:
512       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
513       if not success:
514         tmp[node] = message
515
516   if constants.NV_NODENETTEST in what:
517     result[constants.NV_NODENETTEST] = tmp = {}
518     my_pip = my_sip = None
519     for name, pip, sip in what[constants.NV_NODENETTEST]:
520       if name == my_name:
521         my_pip = pip
522         my_sip = sip
523         break
524     if not my_pip:
525       tmp[my_name] = ("Can't find my own primary/secondary IP"
526                       " in the node list")
527     else:
528       for name, pip, sip in what[constants.NV_NODENETTEST]:
529         fail = []
530         if not netutils.TcpPing(pip, port, source=my_pip):
531           fail.append("primary")
532         if sip != pip:
533           if not netutils.TcpPing(sip, port, source=my_sip):
534             fail.append("secondary")
535         if fail:
536           tmp[name] = ("failure using the %s interface(s)" %
537                        " and ".join(fail))
538
539   if constants.NV_MASTERIP in what:
540     # FIXME: add checks on incoming data structures (here and in the
541     # rest of the function)
542     master_name, master_ip = what[constants.NV_MASTERIP]
543     if master_name == my_name:
544       source = constants.IP4_ADDRESS_LOCALHOST
545     else:
546       source = None
547     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
548                                                   source=source)
549
550   if constants.NV_LVLIST in what:
551     try:
552       val = GetVolumeList(what[constants.NV_LVLIST])
553     except RPCFail, err:
554       val = str(err)
555     result[constants.NV_LVLIST] = val
556
557   if constants.NV_INSTANCELIST in what:
558     # GetInstanceList can fail
559     try:
560       val = GetInstanceList(what[constants.NV_INSTANCELIST])
561     except RPCFail, err:
562       val = str(err)
563     result[constants.NV_INSTANCELIST] = val
564
565   if constants.NV_VGLIST in what:
566     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
567
568   if constants.NV_PVLIST in what:
569     result[constants.NV_PVLIST] = \
570       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
571                                    filter_allocatable=False)
572
573   if constants.NV_VERSION in what:
574     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
575                                     constants.RELEASE_VERSION)
576
577   if constants.NV_HVINFO in what:
578     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
579     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
580
581   if constants.NV_DRBDLIST in what:
582     try:
583       used_minors = bdev.DRBD8.GetUsedDevs().keys()
584     except errors.BlockDeviceError, err:
585       logging.warning("Can't get used minors list", exc_info=True)
586       used_minors = str(err)
587     result[constants.NV_DRBDLIST] = used_minors
588
589   if constants.NV_DRBDHELPER in what:
590     status = True
591     try:
592       payload = bdev.BaseDRBD.GetUsermodeHelper()
593     except errors.BlockDeviceError, err:
594       logging.error("Can't get DRBD usermode helper: %s", str(err))
595       status = False
596       payload = str(err)
597     result[constants.NV_DRBDHELPER] = (status, payload)
598
599   if constants.NV_NODESETUP in what:
600     result[constants.NV_NODESETUP] = tmpr = []
601     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
602       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
603                   " under /sys, missing required directories /sys/block"
604                   " and /sys/class/net")
605     if (not os.path.isdir("/proc/sys") or
606         not os.path.isfile("/proc/sysrq-trigger")):
607       tmpr.append("The procfs filesystem doesn't seem to be mounted"
608                   " under /proc, missing required directory /proc/sys and"
609                   " the file /proc/sysrq-trigger")
610
611   if constants.NV_TIME in what:
612     result[constants.NV_TIME] = utils.SplitTime(time.time())
613
614   if constants.NV_OSLIST in what:
615     result[constants.NV_OSLIST] = DiagnoseOS()
616
617   return result
618
619
620 def GetVolumeList(vg_name):
621   """Compute list of logical volumes and their size.
622
623   @type vg_name: str
624   @param vg_name: the volume group whose LVs we should list
625   @rtype: dict
626   @return:
627       dictionary of all partions (key) with value being a tuple of
628       their size (in MiB), inactive and online status::
629
630         {'test1': ('20.06', True, True)}
631
632       in case of errors, a string is returned with the error
633       details.
634
635   """
636   lvs = {}
637   sep = '|'
638   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
639                          "--separator=%s" % sep,
640                          "-olv_name,lv_size,lv_attr", vg_name])
641   if result.failed:
642     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
643
644   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
645   for line in result.stdout.splitlines():
646     line = line.strip()
647     match = valid_line_re.match(line)
648     if not match:
649       logging.error("Invalid line returned from lvs output: '%s'", line)
650       continue
651     name, size, attr = match.groups()
652     inactive = attr[4] == '-'
653     online = attr[5] == 'o'
654     virtual = attr[0] == 'v'
655     if virtual:
656       # we don't want to report such volumes as existing, since they
657       # don't really hold data
658       continue
659     lvs[name] = (size, inactive, online)
660
661   return lvs
662
663
664 def ListVolumeGroups():
665   """List the volume groups and their size.
666
667   @rtype: dict
668   @return: dictionary with keys volume name and values the
669       size of the volume
670
671   """
672   return utils.ListVolumeGroups()
673
674
675 def NodeVolumes():
676   """List all volumes on this node.
677
678   @rtype: list
679   @return:
680     A list of dictionaries, each having four keys:
681       - name: the logical volume name,
682       - size: the size of the logical volume
683       - dev: the physical device on which the LV lives
684       - vg: the volume group to which it belongs
685
686     In case of errors, we return an empty list and log the
687     error.
688
689     Note that since a logical volume can live on multiple physical
690     volumes, the resulting list might include a logical volume
691     multiple times.
692
693   """
694   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
695                          "--separator=|",
696                          "--options=lv_name,lv_size,devices,vg_name"])
697   if result.failed:
698     _Fail("Failed to list logical volumes, lvs output: %s",
699           result.output)
700
701   def parse_dev(dev):
702     return dev.split('(')[0]
703
704   def handle_dev(dev):
705     return [parse_dev(x) for x in dev.split(",")]
706
707   def map_line(line):
708     line = [v.strip() for v in line]
709     return [{'name': line[0], 'size': line[1],
710              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
711
712   all_devs = []
713   for line in result.stdout.splitlines():
714     if line.count('|') >= 3:
715       all_devs.extend(map_line(line.split('|')))
716     else:
717       logging.warning("Strange line in the output from lvs: '%s'", line)
718   return all_devs
719
720
721 def BridgesExist(bridges_list):
722   """Check if a list of bridges exist on the current node.
723
724   @rtype: boolean
725   @return: C{True} if all of them exist, C{False} otherwise
726
727   """
728   missing = []
729   for bridge in bridges_list:
730     if not utils.BridgeExists(bridge):
731       missing.append(bridge)
732
733   if missing:
734     _Fail("Missing bridges %s", utils.CommaJoin(missing))
735
736
737 def GetInstanceList(hypervisor_list):
738   """Provides a list of instances.
739
740   @type hypervisor_list: list
741   @param hypervisor_list: the list of hypervisors to query information
742
743   @rtype: list
744   @return: a list of all running instances on the current node
745     - instance1.example.com
746     - instance2.example.com
747
748   """
749   results = []
750   for hname in hypervisor_list:
751     try:
752       names = hypervisor.GetHypervisor(hname).ListInstances()
753       results.extend(names)
754     except errors.HypervisorError, err:
755       _Fail("Error enumerating instances (hypervisor %s): %s",
756             hname, err, exc=True)
757
758   return results
759
760
761 def GetInstanceInfo(instance, hname):
762   """Gives back the information about an instance as a dictionary.
763
764   @type instance: string
765   @param instance: the instance name
766   @type hname: string
767   @param hname: the hypervisor type of the instance
768
769   @rtype: dict
770   @return: dictionary with the following keys:
771       - memory: memory size of instance (int)
772       - state: xen state of instance (string)
773       - time: cpu time of instance (float)
774
775   """
776   output = {}
777
778   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
779   if iinfo is not None:
780     output['memory'] = iinfo[2]
781     output['state'] = iinfo[4]
782     output['time'] = iinfo[5]
783
784   return output
785
786
787 def GetInstanceMigratable(instance):
788   """Gives whether an instance can be migrated.
789
790   @type instance: L{objects.Instance}
791   @param instance: object representing the instance to be checked.
792
793   @rtype: tuple
794   @return: tuple of (result, description) where:
795       - result: whether the instance can be migrated or not
796       - description: a description of the issue, if relevant
797
798   """
799   hyper = hypervisor.GetHypervisor(instance.hypervisor)
800   iname = instance.name
801   if iname not in hyper.ListInstances():
802     _Fail("Instance %s is not running", iname)
803
804   for idx in range(len(instance.disks)):
805     link_name = _GetBlockDevSymlinkPath(iname, idx)
806     if not os.path.islink(link_name):
807       logging.warning("Instance %s is missing symlink %s for disk %d",
808                       iname, link_name, idx)
809
810
811 def GetAllInstancesInfo(hypervisor_list):
812   """Gather data about all instances.
813
814   This is the equivalent of L{GetInstanceInfo}, except that it
815   computes data for all instances at once, thus being faster if one
816   needs data about more than one instance.
817
818   @type hypervisor_list: list
819   @param hypervisor_list: list of hypervisors to query for instance data
820
821   @rtype: dict
822   @return: dictionary of instance: data, with data having the following keys:
823       - memory: memory size of instance (int)
824       - state: xen state of instance (string)
825       - time: cpu time of instance (float)
826       - vcpus: the number of vcpus
827
828   """
829   output = {}
830
831   for hname in hypervisor_list:
832     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
833     if iinfo:
834       for name, _, memory, vcpus, state, times in iinfo:
835         value = {
836           'memory': memory,
837           'vcpus': vcpus,
838           'state': state,
839           'time': times,
840           }
841         if name in output:
842           # we only check static parameters, like memory and vcpus,
843           # and not state and time which can change between the
844           # invocations of the different hypervisors
845           for key in 'memory', 'vcpus':
846             if value[key] != output[name][key]:
847               _Fail("Instance %s is running twice"
848                     " with different parameters", name)
849         output[name] = value
850
851   return output
852
853
854 def _InstanceLogName(kind, os_name, instance):
855   """Compute the OS log filename for a given instance and operation.
856
857   The instance name and os name are passed in as strings since not all
858   operations have these as part of an instance object.
859
860   @type kind: string
861   @param kind: the operation type (e.g. add, import, etc.)
862   @type os_name: string
863   @param os_name: the os name
864   @type instance: string
865   @param instance: the name of the instance being imported/added/etc.
866
867   """
868   # TODO: Use tempfile.mkstemp to create unique filename
869   base = ("%s-%s-%s-%s.log" %
870           (kind, os_name, instance, utils.TimestampForFilename()))
871   return utils.PathJoin(constants.LOG_OS_DIR, base)
872
873
874 def InstanceOsAdd(instance, reinstall, debug):
875   """Add an OS to an instance.
876
877   @type instance: L{objects.Instance}
878   @param instance: Instance whose OS is to be installed
879   @type reinstall: boolean
880   @param reinstall: whether this is an instance reinstall
881   @type debug: integer
882   @param debug: debug level, passed to the OS scripts
883   @rtype: None
884
885   """
886   inst_os = OSFromDisk(instance.os)
887
888   create_env = OSEnvironment(instance, inst_os, debug)
889   if reinstall:
890     create_env['INSTANCE_REINSTALL'] = "1"
891
892   logfile = _InstanceLogName("add", instance.os, instance.name)
893
894   result = utils.RunCmd([inst_os.create_script], env=create_env,
895                         cwd=inst_os.path, output=logfile,)
896   if result.failed:
897     logging.error("os create command '%s' returned error: %s, logfile: %s,"
898                   " output: %s", result.cmd, result.fail_reason, logfile,
899                   result.output)
900     lines = [utils.SafeEncode(val)
901              for val in utils.TailFile(logfile, lines=20)]
902     _Fail("OS create script failed (%s), last lines in the"
903           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
904
905
906 def RunRenameInstance(instance, old_name, debug):
907   """Run the OS rename script for an instance.
908
909   @type instance: L{objects.Instance}
910   @param instance: Instance whose OS is to be installed
911   @type old_name: string
912   @param old_name: previous instance name
913   @type debug: integer
914   @param debug: debug level, passed to the OS scripts
915   @rtype: boolean
916   @return: the success of the operation
917
918   """
919   inst_os = OSFromDisk(instance.os)
920
921   rename_env = OSEnvironment(instance, inst_os, debug)
922   rename_env['OLD_INSTANCE_NAME'] = old_name
923
924   logfile = _InstanceLogName("rename", instance.os,
925                              "%s-%s" % (old_name, instance.name))
926
927   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
928                         cwd=inst_os.path, output=logfile)
929
930   if result.failed:
931     logging.error("os create command '%s' returned error: %s output: %s",
932                   result.cmd, result.fail_reason, result.output)
933     lines = [utils.SafeEncode(val)
934              for val in utils.TailFile(logfile, lines=20)]
935     _Fail("OS rename script failed (%s), last lines in the"
936           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
937
938
939 def _GetBlockDevSymlinkPath(instance_name, idx):
940   return utils.PathJoin(constants.DISK_LINKS_DIR,
941                         "%s:%d" % (instance_name, idx))
942
943
944 def _SymlinkBlockDev(instance_name, device_path, idx):
945   """Set up symlinks to a instance's block device.
946
947   This is an auxiliary function run when an instance is start (on the primary
948   node) or when an instance is migrated (on the target node).
949
950
951   @param instance_name: the name of the target instance
952   @param device_path: path of the physical block device, on the node
953   @param idx: the disk index
954   @return: absolute path to the disk's symlink
955
956   """
957   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
958   try:
959     os.symlink(device_path, link_name)
960   except OSError, err:
961     if err.errno == errno.EEXIST:
962       if (not os.path.islink(link_name) or
963           os.readlink(link_name) != device_path):
964         os.remove(link_name)
965         os.symlink(device_path, link_name)
966     else:
967       raise
968
969   return link_name
970
971
972 def _RemoveBlockDevLinks(instance_name, disks):
973   """Remove the block device symlinks belonging to the given instance.
974
975   """
976   for idx, _ in enumerate(disks):
977     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
978     if os.path.islink(link_name):
979       try:
980         os.remove(link_name)
981       except OSError:
982         logging.exception("Can't remove symlink '%s'", link_name)
983
984
985 def _GatherAndLinkBlockDevs(instance):
986   """Set up an instance's block device(s).
987
988   This is run on the primary node at instance startup. The block
989   devices must be already assembled.
990
991   @type instance: L{objects.Instance}
992   @param instance: the instance whose disks we shoul assemble
993   @rtype: list
994   @return: list of (disk_object, device_path)
995
996   """
997   block_devices = []
998   for idx, disk in enumerate(instance.disks):
999     device = _RecursiveFindBD(disk)
1000     if device is None:
1001       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1002                                     str(disk))
1003     device.Open()
1004     try:
1005       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1006     except OSError, e:
1007       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1008                                     e.strerror)
1009
1010     block_devices.append((disk, link_name))
1011
1012   return block_devices
1013
1014
1015 def StartInstance(instance):
1016   """Start an instance.
1017
1018   @type instance: L{objects.Instance}
1019   @param instance: the instance object
1020   @rtype: None
1021
1022   """
1023   running_instances = GetInstanceList([instance.hypervisor])
1024
1025   if instance.name in running_instances:
1026     logging.info("Instance %s already running, not starting", instance.name)
1027     return
1028
1029   try:
1030     block_devices = _GatherAndLinkBlockDevs(instance)
1031     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1032     hyper.StartInstance(instance, block_devices)
1033   except errors.BlockDeviceError, err:
1034     _Fail("Block device error: %s", err, exc=True)
1035   except errors.HypervisorError, err:
1036     _RemoveBlockDevLinks(instance.name, instance.disks)
1037     _Fail("Hypervisor error: %s", err, exc=True)
1038
1039
1040 def InstanceShutdown(instance, timeout):
1041   """Shut an instance down.
1042
1043   @note: this functions uses polling with a hardcoded timeout.
1044
1045   @type instance: L{objects.Instance}
1046   @param instance: the instance object
1047   @type timeout: integer
1048   @param timeout: maximum timeout for soft shutdown
1049   @rtype: None
1050
1051   """
1052   hv_name = instance.hypervisor
1053   hyper = hypervisor.GetHypervisor(hv_name)
1054   iname = instance.name
1055
1056   if instance.name not in hyper.ListInstances():
1057     logging.info("Instance %s not running, doing nothing", iname)
1058     return
1059
1060   class _TryShutdown:
1061     def __init__(self):
1062       self.tried_once = False
1063
1064     def __call__(self):
1065       if iname not in hyper.ListInstances():
1066         return
1067
1068       try:
1069         hyper.StopInstance(instance, retry=self.tried_once)
1070       except errors.HypervisorError, err:
1071         if iname not in hyper.ListInstances():
1072           # if the instance is no longer existing, consider this a
1073           # success and go to cleanup
1074           return
1075
1076         _Fail("Failed to stop instance %s: %s", iname, err)
1077
1078       self.tried_once = True
1079
1080       raise utils.RetryAgain()
1081
1082   try:
1083     utils.Retry(_TryShutdown(), 5, timeout)
1084   except utils.RetryTimeout:
1085     # the shutdown did not succeed
1086     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1087
1088     try:
1089       hyper.StopInstance(instance, force=True)
1090     except errors.HypervisorError, err:
1091       if iname in hyper.ListInstances():
1092         # only raise an error if the instance still exists, otherwise
1093         # the error could simply be "instance ... unknown"!
1094         _Fail("Failed to force stop instance %s: %s", iname, err)
1095
1096     time.sleep(1)
1097
1098     if iname in hyper.ListInstances():
1099       _Fail("Could not shutdown instance %s even by destroy", iname)
1100
1101   try:
1102     hyper.CleanupInstance(instance.name)
1103   except errors.HypervisorError, err:
1104     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1105
1106   _RemoveBlockDevLinks(iname, instance.disks)
1107
1108
1109 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1110   """Reboot an instance.
1111
1112   @type instance: L{objects.Instance}
1113   @param instance: the instance object to reboot
1114   @type reboot_type: str
1115   @param reboot_type: the type of reboot, one the following
1116     constants:
1117       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1118         instance OS, do not recreate the VM
1119       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1120         restart the VM (at the hypervisor level)
1121       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1122         not accepted here, since that mode is handled differently, in
1123         cmdlib, and translates into full stop and start of the
1124         instance (instead of a call_instance_reboot RPC)
1125   @type shutdown_timeout: integer
1126   @param shutdown_timeout: maximum timeout for soft shutdown
1127   @rtype: None
1128
1129   """
1130   running_instances = GetInstanceList([instance.hypervisor])
1131
1132   if instance.name not in running_instances:
1133     _Fail("Cannot reboot instance %s that is not running", instance.name)
1134
1135   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1136   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1137     try:
1138       hyper.RebootInstance(instance)
1139     except errors.HypervisorError, err:
1140       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1141   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1142     try:
1143       InstanceShutdown(instance, shutdown_timeout)
1144       return StartInstance(instance)
1145     except errors.HypervisorError, err:
1146       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1147   else:
1148     _Fail("Invalid reboot_type received: %s", reboot_type)
1149
1150
1151 def MigrationInfo(instance):
1152   """Gather information about an instance to be migrated.
1153
1154   @type instance: L{objects.Instance}
1155   @param instance: the instance definition
1156
1157   """
1158   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1159   try:
1160     info = hyper.MigrationInfo(instance)
1161   except errors.HypervisorError, err:
1162     _Fail("Failed to fetch migration information: %s", err, exc=True)
1163   return info
1164
1165
1166 def AcceptInstance(instance, info, target):
1167   """Prepare the node to accept an instance.
1168
1169   @type instance: L{objects.Instance}
1170   @param instance: the instance definition
1171   @type info: string/data (opaque)
1172   @param info: migration information, from the source node
1173   @type target: string
1174   @param target: target host (usually ip), on this node
1175
1176   """
1177   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1178   try:
1179     hyper.AcceptInstance(instance, info, target)
1180   except errors.HypervisorError, err:
1181     _Fail("Failed to accept instance: %s", err, exc=True)
1182
1183
1184 def FinalizeMigration(instance, info, success):
1185   """Finalize any preparation to accept an instance.
1186
1187   @type instance: L{objects.Instance}
1188   @param instance: the instance definition
1189   @type info: string/data (opaque)
1190   @param info: migration information, from the source node
1191   @type success: boolean
1192   @param success: whether the migration was a success or a failure
1193
1194   """
1195   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1196   try:
1197     hyper.FinalizeMigration(instance, info, success)
1198   except errors.HypervisorError, err:
1199     _Fail("Failed to finalize migration: %s", err, exc=True)
1200
1201
1202 def MigrateInstance(instance, target, live):
1203   """Migrates an instance to another node.
1204
1205   @type instance: L{objects.Instance}
1206   @param instance: the instance definition
1207   @type target: string
1208   @param target: the target node name
1209   @type live: boolean
1210   @param live: whether the migration should be done live or not (the
1211       interpretation of this parameter is left to the hypervisor)
1212   @rtype: tuple
1213   @return: a tuple of (success, msg) where:
1214       - succes is a boolean denoting the success/failure of the operation
1215       - msg is a string with details in case of failure
1216
1217   """
1218   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1219
1220   try:
1221     hyper.MigrateInstance(instance, target, live)
1222   except errors.HypervisorError, err:
1223     _Fail("Failed to migrate instance: %s", err, exc=True)
1224
1225
1226 def BlockdevCreate(disk, size, owner, on_primary, info):
1227   """Creates a block device for an instance.
1228
1229   @type disk: L{objects.Disk}
1230   @param disk: the object describing the disk we should create
1231   @type size: int
1232   @param size: the size of the physical underlying device, in MiB
1233   @type owner: str
1234   @param owner: the name of the instance for which disk is created,
1235       used for device cache data
1236   @type on_primary: boolean
1237   @param on_primary:  indicates if it is the primary node or not
1238   @type info: string
1239   @param info: string that will be sent to the physical device
1240       creation, used for example to set (LVM) tags on LVs
1241
1242   @return: the new unique_id of the device (this can sometime be
1243       computed only after creation), or None. On secondary nodes,
1244       it's not required to return anything.
1245
1246   """
1247   # TODO: remove the obsolete 'size' argument
1248   # pylint: disable-msg=W0613
1249   clist = []
1250   if disk.children:
1251     for child in disk.children:
1252       try:
1253         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1254       except errors.BlockDeviceError, err:
1255         _Fail("Can't assemble device %s: %s", child, err)
1256       if on_primary or disk.AssembleOnSecondary():
1257         # we need the children open in case the device itself has to
1258         # be assembled
1259         try:
1260           # pylint: disable-msg=E1103
1261           crdev.Open()
1262         except errors.BlockDeviceError, err:
1263           _Fail("Can't make child '%s' read-write: %s", child, err)
1264       clist.append(crdev)
1265
1266   try:
1267     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1268   except errors.BlockDeviceError, err:
1269     _Fail("Can't create block device: %s", err)
1270
1271   if on_primary or disk.AssembleOnSecondary():
1272     try:
1273       device.Assemble()
1274     except errors.BlockDeviceError, err:
1275       _Fail("Can't assemble device after creation, unusual event: %s", err)
1276     device.SetSyncSpeed(constants.SYNC_SPEED)
1277     if on_primary or disk.OpenOnSecondary():
1278       try:
1279         device.Open(force=True)
1280       except errors.BlockDeviceError, err:
1281         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1282     DevCacheManager.UpdateCache(device.dev_path, owner,
1283                                 on_primary, disk.iv_name)
1284
1285   device.SetInfo(info)
1286
1287   return device.unique_id
1288
1289
1290 def _WipeDevice(path, offset, size):
1291   """This function actually wipes the device.
1292
1293   @param path: The path to the device to wipe
1294   @param offset: The offset in MiB in the file
1295   @param size: The size in MiB to write
1296
1297   """
1298   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1299          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1300          "count=%d" % size]
1301   result = utils.RunCmd(cmd)
1302
1303   if result.failed:
1304     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1305           result.fail_reason, result.output)
1306
1307
1308 def BlockdevWipe(disk, offset, size):
1309   """Wipes a block device.
1310
1311   @type disk: L{objects.Disk}
1312   @param disk: the disk object we want to wipe
1313   @type offset: int
1314   @param offset: The offset in MiB in the file
1315   @type size: int
1316   @param size: The size in MiB to write
1317
1318   """
1319   try:
1320     rdev = _RecursiveFindBD(disk)
1321   except errors.BlockDeviceError:
1322     rdev = None
1323
1324   if not rdev:
1325     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1326
1327   # Do cross verify some of the parameters
1328   if offset > rdev.size:
1329     _Fail("Offset is bigger than device size")
1330   if (offset + size) > rdev.size:
1331     _Fail("The provided offset and size to wipe is bigger than device size")
1332
1333   _WipeDevice(rdev.dev_path, offset, size)
1334
1335
1336 def BlockdevRemove(disk):
1337   """Remove a block device.
1338
1339   @note: This is intended to be called recursively.
1340
1341   @type disk: L{objects.Disk}
1342   @param disk: the disk object we should remove
1343   @rtype: boolean
1344   @return: the success of the operation
1345
1346   """
1347   msgs = []
1348   try:
1349     rdev = _RecursiveFindBD(disk)
1350   except errors.BlockDeviceError, err:
1351     # probably can't attach
1352     logging.info("Can't attach to device %s in remove", disk)
1353     rdev = None
1354   if rdev is not None:
1355     r_path = rdev.dev_path
1356     try:
1357       rdev.Remove()
1358     except errors.BlockDeviceError, err:
1359       msgs.append(str(err))
1360     if not msgs:
1361       DevCacheManager.RemoveCache(r_path)
1362
1363   if disk.children:
1364     for child in disk.children:
1365       try:
1366         BlockdevRemove(child)
1367       except RPCFail, err:
1368         msgs.append(str(err))
1369
1370   if msgs:
1371     _Fail("; ".join(msgs))
1372
1373
1374 def _RecursiveAssembleBD(disk, owner, as_primary):
1375   """Activate a block device for an instance.
1376
1377   This is run on the primary and secondary nodes for an instance.
1378
1379   @note: this function is called recursively.
1380
1381   @type disk: L{objects.Disk}
1382   @param disk: the disk we try to assemble
1383   @type owner: str
1384   @param owner: the name of the instance which owns the disk
1385   @type as_primary: boolean
1386   @param as_primary: if we should make the block device
1387       read/write
1388
1389   @return: the assembled device or None (in case no device
1390       was assembled)
1391   @raise errors.BlockDeviceError: in case there is an error
1392       during the activation of the children or the device
1393       itself
1394
1395   """
1396   children = []
1397   if disk.children:
1398     mcn = disk.ChildrenNeeded()
1399     if mcn == -1:
1400       mcn = 0 # max number of Nones allowed
1401     else:
1402       mcn = len(disk.children) - mcn # max number of Nones
1403     for chld_disk in disk.children:
1404       try:
1405         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1406       except errors.BlockDeviceError, err:
1407         if children.count(None) >= mcn:
1408           raise
1409         cdev = None
1410         logging.error("Error in child activation (but continuing): %s",
1411                       str(err))
1412       children.append(cdev)
1413
1414   if as_primary or disk.AssembleOnSecondary():
1415     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1416     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1417     result = r_dev
1418     if as_primary or disk.OpenOnSecondary():
1419       r_dev.Open()
1420     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1421                                 as_primary, disk.iv_name)
1422
1423   else:
1424     result = True
1425   return result
1426
1427
1428 def BlockdevAssemble(disk, owner, as_primary):
1429   """Activate a block device for an instance.
1430
1431   This is a wrapper over _RecursiveAssembleBD.
1432
1433   @rtype: str or boolean
1434   @return: a C{/dev/...} path for primary nodes, and
1435       C{True} for secondary nodes
1436
1437   """
1438   try:
1439     result = _RecursiveAssembleBD(disk, owner, as_primary)
1440     if isinstance(result, bdev.BlockDev):
1441       # pylint: disable-msg=E1103
1442       result = result.dev_path
1443   except errors.BlockDeviceError, err:
1444     _Fail("Error while assembling disk: %s", err, exc=True)
1445
1446   return result
1447
1448
1449 def BlockdevShutdown(disk):
1450   """Shut down a block device.
1451
1452   First, if the device is assembled (Attach() is successful), then
1453   the device is shutdown. Then the children of the device are
1454   shutdown.
1455
1456   This function is called recursively. Note that we don't cache the
1457   children or such, as oppossed to assemble, shutdown of different
1458   devices doesn't require that the upper device was active.
1459
1460   @type disk: L{objects.Disk}
1461   @param disk: the description of the disk we should
1462       shutdown
1463   @rtype: None
1464
1465   """
1466   msgs = []
1467   r_dev = _RecursiveFindBD(disk)
1468   if r_dev is not None:
1469     r_path = r_dev.dev_path
1470     try:
1471       r_dev.Shutdown()
1472       DevCacheManager.RemoveCache(r_path)
1473     except errors.BlockDeviceError, err:
1474       msgs.append(str(err))
1475
1476   if disk.children:
1477     for child in disk.children:
1478       try:
1479         BlockdevShutdown(child)
1480       except RPCFail, err:
1481         msgs.append(str(err))
1482
1483   if msgs:
1484     _Fail("; ".join(msgs))
1485
1486
1487 def BlockdevAddchildren(parent_cdev, new_cdevs):
1488   """Extend a mirrored block device.
1489
1490   @type parent_cdev: L{objects.Disk}
1491   @param parent_cdev: the disk to which we should add children
1492   @type new_cdevs: list of L{objects.Disk}
1493   @param new_cdevs: the list of children which we should add
1494   @rtype: None
1495
1496   """
1497   parent_bdev = _RecursiveFindBD(parent_cdev)
1498   if parent_bdev is None:
1499     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1500   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1501   if new_bdevs.count(None) > 0:
1502     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1503   parent_bdev.AddChildren(new_bdevs)
1504
1505
1506 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1507   """Shrink a mirrored block device.
1508
1509   @type parent_cdev: L{objects.Disk}
1510   @param parent_cdev: the disk from which we should remove children
1511   @type new_cdevs: list of L{objects.Disk}
1512   @param new_cdevs: the list of children which we should remove
1513   @rtype: None
1514
1515   """
1516   parent_bdev = _RecursiveFindBD(parent_cdev)
1517   if parent_bdev is None:
1518     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1519   devs = []
1520   for disk in new_cdevs:
1521     rpath = disk.StaticDevPath()
1522     if rpath is None:
1523       bd = _RecursiveFindBD(disk)
1524       if bd is None:
1525         _Fail("Can't find device %s while removing children", disk)
1526       else:
1527         devs.append(bd.dev_path)
1528     else:
1529       if not utils.IsNormAbsPath(rpath):
1530         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1531       devs.append(rpath)
1532   parent_bdev.RemoveChildren(devs)
1533
1534
1535 def BlockdevGetmirrorstatus(disks):
1536   """Get the mirroring status of a list of devices.
1537
1538   @type disks: list of L{objects.Disk}
1539   @param disks: the list of disks which we should query
1540   @rtype: disk
1541   @return:
1542       a list of (mirror_done, estimated_time) tuples, which
1543       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1544   @raise errors.BlockDeviceError: if any of the disks cannot be
1545       found
1546
1547   """
1548   stats = []
1549   for dsk in disks:
1550     rbd = _RecursiveFindBD(dsk)
1551     if rbd is None:
1552       _Fail("Can't find device %s", dsk)
1553
1554     stats.append(rbd.CombinedSyncStatus())
1555
1556   return stats
1557
1558
1559 def _RecursiveFindBD(disk):
1560   """Check if a device is activated.
1561
1562   If so, return information about the real device.
1563
1564   @type disk: L{objects.Disk}
1565   @param disk: the disk object we need to find
1566
1567   @return: None if the device can't be found,
1568       otherwise the device instance
1569
1570   """
1571   children = []
1572   if disk.children:
1573     for chdisk in disk.children:
1574       children.append(_RecursiveFindBD(chdisk))
1575
1576   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1577
1578
1579 def _OpenRealBD(disk):
1580   """Opens the underlying block device of a disk.
1581
1582   @type disk: L{objects.Disk}
1583   @param disk: the disk object we want to open
1584
1585   """
1586   real_disk = _RecursiveFindBD(disk)
1587   if real_disk is None:
1588     _Fail("Block device '%s' is not set up", disk)
1589
1590   real_disk.Open()
1591
1592   return real_disk
1593
1594
1595 def BlockdevFind(disk):
1596   """Check if a device is activated.
1597
1598   If it is, return information about the real device.
1599
1600   @type disk: L{objects.Disk}
1601   @param disk: the disk to find
1602   @rtype: None or objects.BlockDevStatus
1603   @return: None if the disk cannot be found, otherwise a the current
1604            information
1605
1606   """
1607   try:
1608     rbd = _RecursiveFindBD(disk)
1609   except errors.BlockDeviceError, err:
1610     _Fail("Failed to find device: %s", err, exc=True)
1611
1612   if rbd is None:
1613     return None
1614
1615   return rbd.GetSyncStatus()
1616
1617
1618 def BlockdevGetsize(disks):
1619   """Computes the size of the given disks.
1620
1621   If a disk is not found, returns None instead.
1622
1623   @type disks: list of L{objects.Disk}
1624   @param disks: the list of disk to compute the size for
1625   @rtype: list
1626   @return: list with elements None if the disk cannot be found,
1627       otherwise the size
1628
1629   """
1630   result = []
1631   for cf in disks:
1632     try:
1633       rbd = _RecursiveFindBD(cf)
1634     except errors.BlockDeviceError:
1635       result.append(None)
1636       continue
1637     if rbd is None:
1638       result.append(None)
1639     else:
1640       result.append(rbd.GetActualSize())
1641   return result
1642
1643
1644 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1645   """Export a block device to a remote node.
1646
1647   @type disk: L{objects.Disk}
1648   @param disk: the description of the disk to export
1649   @type dest_node: str
1650   @param dest_node: the destination node to export to
1651   @type dest_path: str
1652   @param dest_path: the destination path on the target node
1653   @type cluster_name: str
1654   @param cluster_name: the cluster name, needed for SSH hostalias
1655   @rtype: None
1656
1657   """
1658   real_disk = _OpenRealBD(disk)
1659
1660   # the block size on the read dd is 1MiB to match our units
1661   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1662                                "dd if=%s bs=1048576 count=%s",
1663                                real_disk.dev_path, str(disk.size))
1664
1665   # we set here a smaller block size as, due to ssh buffering, more
1666   # than 64-128k will mostly ignored; we use nocreat to fail if the
1667   # device is not already there or we pass a wrong path; we use
1668   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1669   # to not buffer too much memory; this means that at best, we flush
1670   # every 64k, which will not be very fast
1671   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1672                                 " oflag=dsync", dest_path)
1673
1674   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1675                                                    constants.GANETI_RUNAS,
1676                                                    destcmd)
1677
1678   # all commands have been checked, so we're safe to combine them
1679   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1680
1681   result = utils.RunCmd(["bash", "-c", command])
1682
1683   if result.failed:
1684     _Fail("Disk copy command '%s' returned error: %s"
1685           " output: %s", command, result.fail_reason, result.output)
1686
1687
1688 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1689   """Write a file to the filesystem.
1690
1691   This allows the master to overwrite(!) a file. It will only perform
1692   the operation if the file belongs to a list of configuration files.
1693
1694   @type file_name: str
1695   @param file_name: the target file name
1696   @type data: str
1697   @param data: the new contents of the file
1698   @type mode: int
1699   @param mode: the mode to give the file (can be None)
1700   @type uid: int
1701   @param uid: the owner of the file (can be -1 for default)
1702   @type gid: int
1703   @param gid: the group of the file (can be -1 for default)
1704   @type atime: float
1705   @param atime: the atime to set on the file (can be None)
1706   @type mtime: float
1707   @param mtime: the mtime to set on the file (can be None)
1708   @rtype: None
1709
1710   """
1711   if not os.path.isabs(file_name):
1712     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1713
1714   if file_name not in _ALLOWED_UPLOAD_FILES:
1715     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1716           file_name)
1717
1718   raw_data = _Decompress(data)
1719
1720   utils.SafeWriteFile(file_name, None,
1721                       data=raw_data, mode=mode, uid=uid, gid=gid,
1722                       atime=atime, mtime=mtime)
1723
1724
1725 def WriteSsconfFiles(values):
1726   """Update all ssconf files.
1727
1728   Wrapper around the SimpleStore.WriteFiles.
1729
1730   """
1731   ssconf.SimpleStore().WriteFiles(values)
1732
1733
1734 def _ErrnoOrStr(err):
1735   """Format an EnvironmentError exception.
1736
1737   If the L{err} argument has an errno attribute, it will be looked up
1738   and converted into a textual C{E...} description. Otherwise the
1739   string representation of the error will be returned.
1740
1741   @type err: L{EnvironmentError}
1742   @param err: the exception to format
1743
1744   """
1745   if hasattr(err, 'errno'):
1746     detail = errno.errorcode[err.errno]
1747   else:
1748     detail = str(err)
1749   return detail
1750
1751
1752 def _OSOndiskAPIVersion(os_dir):
1753   """Compute and return the API version of a given OS.
1754
1755   This function will try to read the API version of the OS residing in
1756   the 'os_dir' directory.
1757
1758   @type os_dir: str
1759   @param os_dir: the directory in which we should look for the OS
1760   @rtype: tuple
1761   @return: tuple (status, data) with status denoting the validity and
1762       data holding either the vaid versions or an error message
1763
1764   """
1765   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1766
1767   try:
1768     st = os.stat(api_file)
1769   except EnvironmentError, err:
1770     return False, ("Required file '%s' not found under path %s: %s" %
1771                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1772
1773   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1774     return False, ("File '%s' in %s is not a regular file" %
1775                    (constants.OS_API_FILE, os_dir))
1776
1777   try:
1778     api_versions = utils.ReadFile(api_file).splitlines()
1779   except EnvironmentError, err:
1780     return False, ("Error while reading the API version file at %s: %s" %
1781                    (api_file, _ErrnoOrStr(err)))
1782
1783   try:
1784     api_versions = [int(version.strip()) for version in api_versions]
1785   except (TypeError, ValueError), err:
1786     return False, ("API version(s) can't be converted to integer: %s" %
1787                    str(err))
1788
1789   return True, api_versions
1790
1791
1792 def DiagnoseOS(top_dirs=None):
1793   """Compute the validity for all OSes.
1794
1795   @type top_dirs: list
1796   @param top_dirs: the list of directories in which to
1797       search (if not given defaults to
1798       L{constants.OS_SEARCH_PATH})
1799   @rtype: list of L{objects.OS}
1800   @return: a list of tuples (name, path, status, diagnose, variants,
1801       parameters, api_version) for all (potential) OSes under all
1802       search paths, where:
1803           - name is the (potential) OS name
1804           - path is the full path to the OS
1805           - status True/False is the validity of the OS
1806           - diagnose is the error message for an invalid OS, otherwise empty
1807           - variants is a list of supported OS variants, if any
1808           - parameters is a list of (name, help) parameters, if any
1809           - api_version is a list of support OS API versions
1810
1811   """
1812   if top_dirs is None:
1813     top_dirs = constants.OS_SEARCH_PATH
1814
1815   result = []
1816   for dir_name in top_dirs:
1817     if os.path.isdir(dir_name):
1818       try:
1819         f_names = utils.ListVisibleFiles(dir_name)
1820       except EnvironmentError, err:
1821         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1822         break
1823       for name in f_names:
1824         os_path = utils.PathJoin(dir_name, name)
1825         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1826         if status:
1827           diagnose = ""
1828           variants = os_inst.supported_variants
1829           parameters = os_inst.supported_parameters
1830           api_versions = os_inst.api_versions
1831         else:
1832           diagnose = os_inst
1833           variants = parameters = api_versions = []
1834         result.append((name, os_path, status, diagnose, variants,
1835                        parameters, api_versions))
1836
1837   return result
1838
1839
1840 def _TryOSFromDisk(name, base_dir=None):
1841   """Create an OS instance from disk.
1842
1843   This function will return an OS instance if the given name is a
1844   valid OS name.
1845
1846   @type base_dir: string
1847   @keyword base_dir: Base directory containing OS installations.
1848                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1849   @rtype: tuple
1850   @return: success and either the OS instance if we find a valid one,
1851       or error message
1852
1853   """
1854   if base_dir is None:
1855     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1856   else:
1857     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1858
1859   if os_dir is None:
1860     return False, "Directory for OS %s not found in search path" % name
1861
1862   status, api_versions = _OSOndiskAPIVersion(os_dir)
1863   if not status:
1864     # push the error up
1865     return status, api_versions
1866
1867   if not constants.OS_API_VERSIONS.intersection(api_versions):
1868     return False, ("API version mismatch for path '%s': found %s, want %s." %
1869                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1870
1871   # OS Files dictionary, we will populate it with the absolute path names
1872   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1873
1874   if max(api_versions) >= constants.OS_API_V15:
1875     os_files[constants.OS_VARIANTS_FILE] = ''
1876
1877   if max(api_versions) >= constants.OS_API_V20:
1878     os_files[constants.OS_PARAMETERS_FILE] = ''
1879   else:
1880     del os_files[constants.OS_SCRIPT_VERIFY]
1881
1882   for filename in os_files:
1883     os_files[filename] = utils.PathJoin(os_dir, filename)
1884
1885     try:
1886       st = os.stat(os_files[filename])
1887     except EnvironmentError, err:
1888       return False, ("File '%s' under path '%s' is missing (%s)" %
1889                      (filename, os_dir, _ErrnoOrStr(err)))
1890
1891     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1892       return False, ("File '%s' under path '%s' is not a regular file" %
1893                      (filename, os_dir))
1894
1895     if filename in constants.OS_SCRIPTS:
1896       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1897         return False, ("File '%s' under path '%s' is not executable" %
1898                        (filename, os_dir))
1899
1900   variants = []
1901   if constants.OS_VARIANTS_FILE in os_files:
1902     variants_file = os_files[constants.OS_VARIANTS_FILE]
1903     try:
1904       variants = utils.ReadFile(variants_file).splitlines()
1905     except EnvironmentError, err:
1906       return False, ("Error while reading the OS variants file at %s: %s" %
1907                      (variants_file, _ErrnoOrStr(err)))
1908     if not variants:
1909       return False, ("No supported os variant found")
1910
1911   parameters = []
1912   if constants.OS_PARAMETERS_FILE in os_files:
1913     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1914     try:
1915       parameters = utils.ReadFile(parameters_file).splitlines()
1916     except EnvironmentError, err:
1917       return False, ("Error while reading the OS parameters file at %s: %s" %
1918                      (parameters_file, _ErrnoOrStr(err)))
1919     parameters = [v.split(None, 1) for v in parameters]
1920
1921   os_obj = objects.OS(name=name, path=os_dir,
1922                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1923                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1924                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1925                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1926                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1927                                                  None),
1928                       supported_variants=variants,
1929                       supported_parameters=parameters,
1930                       api_versions=api_versions)
1931   return True, os_obj
1932
1933
1934 def OSFromDisk(name, base_dir=None):
1935   """Create an OS instance from disk.
1936
1937   This function will return an OS instance if the given name is a
1938   valid OS name. Otherwise, it will raise an appropriate
1939   L{RPCFail} exception, detailing why this is not a valid OS.
1940
1941   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1942   an exception but returns true/false status data.
1943
1944   @type base_dir: string
1945   @keyword base_dir: Base directory containing OS installations.
1946                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1947   @rtype: L{objects.OS}
1948   @return: the OS instance if we find a valid one
1949   @raise RPCFail: if we don't find a valid OS
1950
1951   """
1952   name_only = objects.OS.GetName(name)
1953   status, payload = _TryOSFromDisk(name_only, base_dir)
1954
1955   if not status:
1956     _Fail(payload)
1957
1958   return payload
1959
1960
1961 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1962   """Calculate the basic environment for an os script.
1963
1964   @type os_name: str
1965   @param os_name: full operating system name (including variant)
1966   @type inst_os: L{objects.OS}
1967   @param inst_os: operating system for which the environment is being built
1968   @type os_params: dict
1969   @param os_params: the OS parameters
1970   @type debug: integer
1971   @param debug: debug level (0 or 1, for OS Api 10)
1972   @rtype: dict
1973   @return: dict of environment variables
1974   @raise errors.BlockDeviceError: if the block device
1975       cannot be found
1976
1977   """
1978   result = {}
1979   api_version = \
1980     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1981   result['OS_API_VERSION'] = '%d' % api_version
1982   result['OS_NAME'] = inst_os.name
1983   result['DEBUG_LEVEL'] = '%d' % debug
1984
1985   # OS variants
1986   if api_version >= constants.OS_API_V15:
1987     variant = objects.OS.GetVariant(os_name)
1988     if not variant:
1989       variant = inst_os.supported_variants[0]
1990     result['OS_VARIANT'] = variant
1991
1992   # OS params
1993   for pname, pvalue in os_params.items():
1994     result['OSP_%s' % pname.upper()] = pvalue
1995
1996   return result
1997
1998
1999 def OSEnvironment(instance, inst_os, debug=0):
2000   """Calculate the environment for an os script.
2001
2002   @type instance: L{objects.Instance}
2003   @param instance: target instance for the os script run
2004   @type inst_os: L{objects.OS}
2005   @param inst_os: operating system for which the environment is being built
2006   @type debug: integer
2007   @param debug: debug level (0 or 1, for OS Api 10)
2008   @rtype: dict
2009   @return: dict of environment variables
2010   @raise errors.BlockDeviceError: if the block device
2011       cannot be found
2012
2013   """
2014   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2015
2016   for attr in ["name", "os", "uuid", "ctime", "mtime"]:
2017     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2018
2019   result['HYPERVISOR'] = instance.hypervisor
2020   result['DISK_COUNT'] = '%d' % len(instance.disks)
2021   result['NIC_COUNT'] = '%d' % len(instance.nics)
2022
2023   # Disks
2024   for idx, disk in enumerate(instance.disks):
2025     real_disk = _OpenRealBD(disk)
2026     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2027     result['DISK_%d_ACCESS' % idx] = disk.mode
2028     if constants.HV_DISK_TYPE in instance.hvparams:
2029       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2030         instance.hvparams[constants.HV_DISK_TYPE]
2031     if disk.dev_type in constants.LDS_BLOCK:
2032       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2033     elif disk.dev_type == constants.LD_FILE:
2034       result['DISK_%d_BACKEND_TYPE' % idx] = \
2035         'file:%s' % disk.physical_id[0]
2036
2037   # NICs
2038   for idx, nic in enumerate(instance.nics):
2039     result['NIC_%d_MAC' % idx] = nic.mac
2040     if nic.ip:
2041       result['NIC_%d_IP' % idx] = nic.ip
2042     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2043     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2044       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2045     if nic.nicparams[constants.NIC_LINK]:
2046       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2047     if constants.HV_NIC_TYPE in instance.hvparams:
2048       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2049         instance.hvparams[constants.HV_NIC_TYPE]
2050
2051   # HV/BE params
2052   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2053     for key, value in source.items():
2054       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2055
2056   return result
2057
2058
2059 def BlockdevGrow(disk, amount):
2060   """Grow a stack of block devices.
2061
2062   This function is called recursively, with the childrens being the
2063   first ones to resize.
2064
2065   @type disk: L{objects.Disk}
2066   @param disk: the disk to be grown
2067   @rtype: (status, result)
2068   @return: a tuple with the status of the operation
2069       (True/False), and the errors message if status
2070       is False
2071
2072   """
2073   r_dev = _RecursiveFindBD(disk)
2074   if r_dev is None:
2075     _Fail("Cannot find block device %s", disk)
2076
2077   try:
2078     r_dev.Grow(amount)
2079   except errors.BlockDeviceError, err:
2080     _Fail("Failed to grow block device: %s", err, exc=True)
2081
2082
2083 def BlockdevSnapshot(disk):
2084   """Create a snapshot copy of a block device.
2085
2086   This function is called recursively, and the snapshot is actually created
2087   just for the leaf lvm backend device.
2088
2089   @type disk: L{objects.Disk}
2090   @param disk: the disk to be snapshotted
2091   @rtype: string
2092   @return: snapshot disk path
2093
2094   """
2095   if disk.dev_type == constants.LD_DRBD8:
2096     if not disk.children:
2097       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2098             disk.unique_id)
2099     return BlockdevSnapshot(disk.children[0])
2100   elif disk.dev_type == constants.LD_LV:
2101     r_dev = _RecursiveFindBD(disk)
2102     if r_dev is not None:
2103       # FIXME: choose a saner value for the snapshot size
2104       # let's stay on the safe side and ask for the full size, for now
2105       return r_dev.Snapshot(disk.size)
2106     else:
2107       _Fail("Cannot find block device %s", disk)
2108   else:
2109     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2110           disk.unique_id, disk.dev_type)
2111
2112
2113 def FinalizeExport(instance, snap_disks):
2114   """Write out the export configuration information.
2115
2116   @type instance: L{objects.Instance}
2117   @param instance: the instance which we export, used for
2118       saving configuration
2119   @type snap_disks: list of L{objects.Disk}
2120   @param snap_disks: list of snapshot block devices, which
2121       will be used to get the actual name of the dump file
2122
2123   @rtype: None
2124
2125   """
2126   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2127   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2128
2129   config = objects.SerializableConfigParser()
2130
2131   config.add_section(constants.INISECT_EXP)
2132   config.set(constants.INISECT_EXP, 'version', '0')
2133   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2134   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2135   config.set(constants.INISECT_EXP, 'os', instance.os)
2136   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2137
2138   config.add_section(constants.INISECT_INS)
2139   config.set(constants.INISECT_INS, 'name', instance.name)
2140   config.set(constants.INISECT_INS, 'memory', '%d' %
2141              instance.beparams[constants.BE_MEMORY])
2142   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2143              instance.beparams[constants.BE_VCPUS])
2144   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2145   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2146
2147   nic_total = 0
2148   for nic_count, nic in enumerate(instance.nics):
2149     nic_total += 1
2150     config.set(constants.INISECT_INS, 'nic%d_mac' %
2151                nic_count, '%s' % nic.mac)
2152     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2153     for param in constants.NICS_PARAMETER_TYPES:
2154       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2155                  '%s' % nic.nicparams.get(param, None))
2156   # TODO: redundant: on load can read nics until it doesn't exist
2157   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2158
2159   disk_total = 0
2160   for disk_count, disk in enumerate(snap_disks):
2161     if disk:
2162       disk_total += 1
2163       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2164                  ('%s' % disk.iv_name))
2165       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2166                  ('%s' % disk.physical_id[1]))
2167       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2168                  ('%d' % disk.size))
2169
2170   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2171
2172   # New-style hypervisor/backend parameters
2173
2174   config.add_section(constants.INISECT_HYP)
2175   for name, value in instance.hvparams.items():
2176     if name not in constants.HVC_GLOBALS:
2177       config.set(constants.INISECT_HYP, name, str(value))
2178
2179   config.add_section(constants.INISECT_BEP)
2180   for name, value in instance.beparams.items():
2181     config.set(constants.INISECT_BEP, name, str(value))
2182
2183   config.add_section(constants.INISECT_OSP)
2184   for name, value in instance.osparams.items():
2185     config.set(constants.INISECT_OSP, name, str(value))
2186
2187   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2188                   data=config.Dumps())
2189   shutil.rmtree(finaldestdir, ignore_errors=True)
2190   shutil.move(destdir, finaldestdir)
2191
2192
2193 def ExportInfo(dest):
2194   """Get export configuration information.
2195
2196   @type dest: str
2197   @param dest: directory containing the export
2198
2199   @rtype: L{objects.SerializableConfigParser}
2200   @return: a serializable config file containing the
2201       export info
2202
2203   """
2204   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2205
2206   config = objects.SerializableConfigParser()
2207   config.read(cff)
2208
2209   if (not config.has_section(constants.INISECT_EXP) or
2210       not config.has_section(constants.INISECT_INS)):
2211     _Fail("Export info file doesn't have the required fields")
2212
2213   return config.Dumps()
2214
2215
2216 def ListExports():
2217   """Return a list of exports currently available on this machine.
2218
2219   @rtype: list
2220   @return: list of the exports
2221
2222   """
2223   if os.path.isdir(constants.EXPORT_DIR):
2224     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2225   else:
2226     _Fail("No exports directory")
2227
2228
2229 def RemoveExport(export):
2230   """Remove an existing export from the node.
2231
2232   @type export: str
2233   @param export: the name of the export to remove
2234   @rtype: None
2235
2236   """
2237   target = utils.PathJoin(constants.EXPORT_DIR, export)
2238
2239   try:
2240     shutil.rmtree(target)
2241   except EnvironmentError, err:
2242     _Fail("Error while removing the export: %s", err, exc=True)
2243
2244
2245 def BlockdevRename(devlist):
2246   """Rename a list of block devices.
2247
2248   @type devlist: list of tuples
2249   @param devlist: list of tuples of the form  (disk,
2250       new_logical_id, new_physical_id); disk is an
2251       L{objects.Disk} object describing the current disk,
2252       and new logical_id/physical_id is the name we
2253       rename it to
2254   @rtype: boolean
2255   @return: True if all renames succeeded, False otherwise
2256
2257   """
2258   msgs = []
2259   result = True
2260   for disk, unique_id in devlist:
2261     dev = _RecursiveFindBD(disk)
2262     if dev is None:
2263       msgs.append("Can't find device %s in rename" % str(disk))
2264       result = False
2265       continue
2266     try:
2267       old_rpath = dev.dev_path
2268       dev.Rename(unique_id)
2269       new_rpath = dev.dev_path
2270       if old_rpath != new_rpath:
2271         DevCacheManager.RemoveCache(old_rpath)
2272         # FIXME: we should add the new cache information here, like:
2273         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2274         # but we don't have the owner here - maybe parse from existing
2275         # cache? for now, we only lose lvm data when we rename, which
2276         # is less critical than DRBD or MD
2277     except errors.BlockDeviceError, err:
2278       msgs.append("Can't rename device '%s' to '%s': %s" %
2279                   (dev, unique_id, err))
2280       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2281       result = False
2282   if not result:
2283     _Fail("; ".join(msgs))
2284
2285
2286 def _TransformFileStorageDir(file_storage_dir):
2287   """Checks whether given file_storage_dir is valid.
2288
2289   Checks wheter the given file_storage_dir is within the cluster-wide
2290   default file_storage_dir stored in SimpleStore. Only paths under that
2291   directory are allowed.
2292
2293   @type file_storage_dir: str
2294   @param file_storage_dir: the path to check
2295
2296   @return: the normalized path if valid, None otherwise
2297
2298   """
2299   if not constants.ENABLE_FILE_STORAGE:
2300     _Fail("File storage disabled at configure time")
2301   cfg = _GetConfig()
2302   file_storage_dir = os.path.normpath(file_storage_dir)
2303   base_file_storage_dir = cfg.GetFileStorageDir()
2304   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2305       base_file_storage_dir):
2306     _Fail("File storage directory '%s' is not under base file"
2307           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2308   return file_storage_dir
2309
2310
2311 def CreateFileStorageDir(file_storage_dir):
2312   """Create file storage directory.
2313
2314   @type file_storage_dir: str
2315   @param file_storage_dir: directory to create
2316
2317   @rtype: tuple
2318   @return: tuple with first element a boolean indicating wheter dir
2319       creation was successful or not
2320
2321   """
2322   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2323   if os.path.exists(file_storage_dir):
2324     if not os.path.isdir(file_storage_dir):
2325       _Fail("Specified storage dir '%s' is not a directory",
2326             file_storage_dir)
2327   else:
2328     try:
2329       os.makedirs(file_storage_dir, 0750)
2330     except OSError, err:
2331       _Fail("Cannot create file storage directory '%s': %s",
2332             file_storage_dir, err, exc=True)
2333
2334
2335 def RemoveFileStorageDir(file_storage_dir):
2336   """Remove file storage directory.
2337
2338   Remove it only if it's empty. If not log an error and return.
2339
2340   @type file_storage_dir: str
2341   @param file_storage_dir: the directory we should cleanup
2342   @rtype: tuple (success,)
2343   @return: tuple of one element, C{success}, denoting
2344       whether the operation was successful
2345
2346   """
2347   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2348   if os.path.exists(file_storage_dir):
2349     if not os.path.isdir(file_storage_dir):
2350       _Fail("Specified Storage directory '%s' is not a directory",
2351             file_storage_dir)
2352     # deletes dir only if empty, otherwise we want to fail the rpc call
2353     try:
2354       os.rmdir(file_storage_dir)
2355     except OSError, err:
2356       _Fail("Cannot remove file storage directory '%s': %s",
2357             file_storage_dir, err)
2358
2359
2360 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2361   """Rename the file storage directory.
2362
2363   @type old_file_storage_dir: str
2364   @param old_file_storage_dir: the current path
2365   @type new_file_storage_dir: str
2366   @param new_file_storage_dir: the name we should rename to
2367   @rtype: tuple (success,)
2368   @return: tuple of one element, C{success}, denoting
2369       whether the operation was successful
2370
2371   """
2372   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2373   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2374   if not os.path.exists(new_file_storage_dir):
2375     if os.path.isdir(old_file_storage_dir):
2376       try:
2377         os.rename(old_file_storage_dir, new_file_storage_dir)
2378       except OSError, err:
2379         _Fail("Cannot rename '%s' to '%s': %s",
2380               old_file_storage_dir, new_file_storage_dir, err)
2381     else:
2382       _Fail("Specified storage dir '%s' is not a directory",
2383             old_file_storage_dir)
2384   else:
2385     if os.path.exists(old_file_storage_dir):
2386       _Fail("Cannot rename '%s' to '%s': both locations exist",
2387             old_file_storage_dir, new_file_storage_dir)
2388
2389
2390 def _EnsureJobQueueFile(file_name):
2391   """Checks whether the given filename is in the queue directory.
2392
2393   @type file_name: str
2394   @param file_name: the file name we should check
2395   @rtype: None
2396   @raises RPCFail: if the file is not valid
2397
2398   """
2399   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2400   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2401
2402   if not result:
2403     _Fail("Passed job queue file '%s' does not belong to"
2404           " the queue directory '%s'", file_name, queue_dir)
2405
2406
2407 def JobQueueUpdate(file_name, content):
2408   """Updates a file in the queue directory.
2409
2410   This is just a wrapper over L{utils.WriteFile}, with proper
2411   checking.
2412
2413   @type file_name: str
2414   @param file_name: the job file name
2415   @type content: str
2416   @param content: the new job contents
2417   @rtype: boolean
2418   @return: the success of the operation
2419
2420   """
2421   _EnsureJobQueueFile(file_name)
2422   getents = runtime.GetEnts()
2423
2424   # Write and replace the file atomically
2425   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2426                   gid=getents.masterd_gid)
2427
2428
2429 def JobQueueRename(old, new):
2430   """Renames a job queue file.
2431
2432   This is just a wrapper over os.rename with proper checking.
2433
2434   @type old: str
2435   @param old: the old (actual) file name
2436   @type new: str
2437   @param new: the desired file name
2438   @rtype: tuple
2439   @return: the success of the operation and payload
2440
2441   """
2442   _EnsureJobQueueFile(old)
2443   _EnsureJobQueueFile(new)
2444
2445   utils.RenameFile(old, new, mkdir=True)
2446
2447
2448 def BlockdevClose(instance_name, disks):
2449   """Closes the given block devices.
2450
2451   This means they will be switched to secondary mode (in case of
2452   DRBD).
2453
2454   @param instance_name: if the argument is not empty, the symlinks
2455       of this instance will be removed
2456   @type disks: list of L{objects.Disk}
2457   @param disks: the list of disks to be closed
2458   @rtype: tuple (success, message)
2459   @return: a tuple of success and message, where success
2460       indicates the succes of the operation, and message
2461       which will contain the error details in case we
2462       failed
2463
2464   """
2465   bdevs = []
2466   for cf in disks:
2467     rd = _RecursiveFindBD(cf)
2468     if rd is None:
2469       _Fail("Can't find device %s", cf)
2470     bdevs.append(rd)
2471
2472   msg = []
2473   for rd in bdevs:
2474     try:
2475       rd.Close()
2476     except errors.BlockDeviceError, err:
2477       msg.append(str(err))
2478   if msg:
2479     _Fail("Can't make devices secondary: %s", ",".join(msg))
2480   else:
2481     if instance_name:
2482       _RemoveBlockDevLinks(instance_name, disks)
2483
2484
2485 def ValidateHVParams(hvname, hvparams):
2486   """Validates the given hypervisor parameters.
2487
2488   @type hvname: string
2489   @param hvname: the hypervisor name
2490   @type hvparams: dict
2491   @param hvparams: the hypervisor parameters to be validated
2492   @rtype: None
2493
2494   """
2495   try:
2496     hv_type = hypervisor.GetHypervisor(hvname)
2497     hv_type.ValidateParameters(hvparams)
2498   except errors.HypervisorError, err:
2499     _Fail(str(err), log=False)
2500
2501
2502 def _CheckOSPList(os_obj, parameters):
2503   """Check whether a list of parameters is supported by the OS.
2504
2505   @type os_obj: L{objects.OS}
2506   @param os_obj: OS object to check
2507   @type parameters: list
2508   @param parameters: the list of parameters to check
2509
2510   """
2511   supported = [v[0] for v in os_obj.supported_parameters]
2512   delta = frozenset(parameters).difference(supported)
2513   if delta:
2514     _Fail("The following parameters are not supported"
2515           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2516
2517
2518 def ValidateOS(required, osname, checks, osparams):
2519   """Validate the given OS' parameters.
2520
2521   @type required: boolean
2522   @param required: whether absence of the OS should translate into
2523       failure or not
2524   @type osname: string
2525   @param osname: the OS to be validated
2526   @type checks: list
2527   @param checks: list of the checks to run (currently only 'parameters')
2528   @type osparams: dict
2529   @param osparams: dictionary with OS parameters
2530   @rtype: boolean
2531   @return: True if the validation passed, or False if the OS was not
2532       found and L{required} was false
2533
2534   """
2535   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2536     _Fail("Unknown checks required for OS %s: %s", osname,
2537           set(checks).difference(constants.OS_VALIDATE_CALLS))
2538
2539   name_only = objects.OS.GetName(osname)
2540   status, tbv = _TryOSFromDisk(name_only, None)
2541
2542   if not status:
2543     if required:
2544       _Fail(tbv)
2545     else:
2546       return False
2547
2548   if max(tbv.api_versions) < constants.OS_API_V20:
2549     return True
2550
2551   if constants.OS_VALIDATE_PARAMETERS in checks:
2552     _CheckOSPList(tbv, osparams.keys())
2553
2554   validate_env = OSCoreEnv(osname, tbv, osparams)
2555   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2556                         cwd=tbv.path)
2557   if result.failed:
2558     logging.error("os validate command '%s' returned error: %s output: %s",
2559                   result.cmd, result.fail_reason, result.output)
2560     _Fail("OS validation script failed (%s), output: %s",
2561           result.fail_reason, result.output, log=False)
2562
2563   return True
2564
2565
2566 def DemoteFromMC():
2567   """Demotes the current node from master candidate role.
2568
2569   """
2570   # try to ensure we're not the master by mistake
2571   master, myself = ssconf.GetMasterAndMyself()
2572   if master == myself:
2573     _Fail("ssconf status shows I'm the master node, will not demote")
2574
2575   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2576   if not result.failed:
2577     _Fail("The master daemon is running, will not demote")
2578
2579   try:
2580     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2581       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2582   except EnvironmentError, err:
2583     if err.errno != errno.ENOENT:
2584       _Fail("Error while backing up cluster file: %s", err, exc=True)
2585
2586   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2587
2588
2589 def _GetX509Filenames(cryptodir, name):
2590   """Returns the full paths for the private key and certificate.
2591
2592   """
2593   return (utils.PathJoin(cryptodir, name),
2594           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2595           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2596
2597
2598 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2599   """Creates a new X509 certificate for SSL/TLS.
2600
2601   @type validity: int
2602   @param validity: Validity in seconds
2603   @rtype: tuple; (string, string)
2604   @return: Certificate name and public part
2605
2606   """
2607   (key_pem, cert_pem) = \
2608     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2609                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2610
2611   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2612                               prefix="x509-%s-" % utils.TimestampForFilename())
2613   try:
2614     name = os.path.basename(cert_dir)
2615     assert len(name) > 5
2616
2617     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2618
2619     utils.WriteFile(key_file, mode=0400, data=key_pem)
2620     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2621
2622     # Never return private key as it shouldn't leave the node
2623     return (name, cert_pem)
2624   except Exception:
2625     shutil.rmtree(cert_dir, ignore_errors=True)
2626     raise
2627
2628
2629 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2630   """Removes a X509 certificate.
2631
2632   @type name: string
2633   @param name: Certificate name
2634
2635   """
2636   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2637
2638   utils.RemoveFile(key_file)
2639   utils.RemoveFile(cert_file)
2640
2641   try:
2642     os.rmdir(cert_dir)
2643   except EnvironmentError, err:
2644     _Fail("Cannot remove certificate directory '%s': %s",
2645           cert_dir, err)
2646
2647
2648 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2649   """Returns the command for the requested input/output.
2650
2651   @type instance: L{objects.Instance}
2652   @param instance: The instance object
2653   @param mode: Import/export mode
2654   @param ieio: Input/output type
2655   @param ieargs: Input/output arguments
2656
2657   """
2658   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2659
2660   env = None
2661   prefix = None
2662   suffix = None
2663   exp_size = None
2664
2665   if ieio == constants.IEIO_FILE:
2666     (filename, ) = ieargs
2667
2668     if not utils.IsNormAbsPath(filename):
2669       _Fail("Path '%s' is not normalized or absolute", filename)
2670
2671     directory = os.path.normpath(os.path.dirname(filename))
2672
2673     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2674         constants.EXPORT_DIR):
2675       _Fail("File '%s' is not under exports directory '%s'",
2676             filename, constants.EXPORT_DIR)
2677
2678     # Create directory
2679     utils.Makedirs(directory, mode=0750)
2680
2681     quoted_filename = utils.ShellQuote(filename)
2682
2683     if mode == constants.IEM_IMPORT:
2684       suffix = "> %s" % quoted_filename
2685     elif mode == constants.IEM_EXPORT:
2686       suffix = "< %s" % quoted_filename
2687
2688       # Retrieve file size
2689       try:
2690         st = os.stat(filename)
2691       except EnvironmentError, err:
2692         logging.error("Can't stat(2) %s: %s", filename, err)
2693       else:
2694         exp_size = utils.BytesToMebibyte(st.st_size)
2695
2696   elif ieio == constants.IEIO_RAW_DISK:
2697     (disk, ) = ieargs
2698
2699     real_disk = _OpenRealBD(disk)
2700
2701     if mode == constants.IEM_IMPORT:
2702       # we set here a smaller block size as, due to transport buffering, more
2703       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2704       # is not already there or we pass a wrong path; we use notrunc to no
2705       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2706       # much memory; this means that at best, we flush every 64k, which will
2707       # not be very fast
2708       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2709                                     " bs=%s oflag=dsync"),
2710                                     real_disk.dev_path,
2711                                     str(64 * 1024))
2712
2713     elif mode == constants.IEM_EXPORT:
2714       # the block size on the read dd is 1MiB to match our units
2715       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2716                                    real_disk.dev_path,
2717                                    str(1024 * 1024), # 1 MB
2718                                    str(disk.size))
2719       exp_size = disk.size
2720
2721   elif ieio == constants.IEIO_SCRIPT:
2722     (disk, disk_index, ) = ieargs
2723
2724     assert isinstance(disk_index, (int, long))
2725
2726     real_disk = _OpenRealBD(disk)
2727
2728     inst_os = OSFromDisk(instance.os)
2729     env = OSEnvironment(instance, inst_os)
2730
2731     if mode == constants.IEM_IMPORT:
2732       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2733       env["IMPORT_INDEX"] = str(disk_index)
2734       script = inst_os.import_script
2735
2736     elif mode == constants.IEM_EXPORT:
2737       env["EXPORT_DEVICE"] = real_disk.dev_path
2738       env["EXPORT_INDEX"] = str(disk_index)
2739       script = inst_os.export_script
2740
2741     # TODO: Pass special environment only to script
2742     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2743
2744     if mode == constants.IEM_IMPORT:
2745       suffix = "| %s" % script_cmd
2746
2747     elif mode == constants.IEM_EXPORT:
2748       prefix = "%s |" % script_cmd
2749
2750     # Let script predict size
2751     exp_size = constants.IE_CUSTOM_SIZE
2752
2753   else:
2754     _Fail("Invalid %s I/O mode %r", mode, ieio)
2755
2756   return (env, prefix, suffix, exp_size)
2757
2758
2759 def _CreateImportExportStatusDir(prefix):
2760   """Creates status directory for import/export.
2761
2762   """
2763   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2764                           prefix=("%s-%s-" %
2765                                   (prefix, utils.TimestampForFilename())))
2766
2767
2768 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2769   """Starts an import or export daemon.
2770
2771   @param mode: Import/output mode
2772   @type opts: L{objects.ImportExportOptions}
2773   @param opts: Daemon options
2774   @type host: string
2775   @param host: Remote host for export (None for import)
2776   @type port: int
2777   @param port: Remote port for export (None for import)
2778   @type instance: L{objects.Instance}
2779   @param instance: Instance object
2780   @param ieio: Input/output type
2781   @param ieioargs: Input/output arguments
2782
2783   """
2784   if mode == constants.IEM_IMPORT:
2785     prefix = "import"
2786
2787     if not (host is None and port is None):
2788       _Fail("Can not specify host or port on import")
2789
2790   elif mode == constants.IEM_EXPORT:
2791     prefix = "export"
2792
2793     if host is None or port is None:
2794       _Fail("Host and port must be specified for an export")
2795
2796   else:
2797     _Fail("Invalid mode %r", mode)
2798
2799   if (opts.key_name is None) ^ (opts.ca_pem is None):
2800     _Fail("Cluster certificate can only be used for both key and CA")
2801
2802   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2803     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2804
2805   if opts.key_name is None:
2806     # Use server.pem
2807     key_path = constants.NODED_CERT_FILE
2808     cert_path = constants.NODED_CERT_FILE
2809     assert opts.ca_pem is None
2810   else:
2811     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2812                                                  opts.key_name)
2813     assert opts.ca_pem is not None
2814
2815   for i in [key_path, cert_path]:
2816     if not os.path.exists(i):
2817       _Fail("File '%s' does not exist" % i)
2818
2819   status_dir = _CreateImportExportStatusDir(prefix)
2820   try:
2821     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2822     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2823     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2824
2825     if opts.ca_pem is None:
2826       # Use server.pem
2827       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2828     else:
2829       ca = opts.ca_pem
2830
2831     # Write CA file
2832     utils.WriteFile(ca_file, data=ca, mode=0400)
2833
2834     cmd = [
2835       constants.IMPORT_EXPORT_DAEMON,
2836       status_file, mode,
2837       "--key=%s" % key_path,
2838       "--cert=%s" % cert_path,
2839       "--ca=%s" % ca_file,
2840       ]
2841
2842     if host:
2843       cmd.append("--host=%s" % host)
2844
2845     if port:
2846       cmd.append("--port=%s" % port)
2847
2848     if opts.compress:
2849       cmd.append("--compress=%s" % opts.compress)
2850
2851     if opts.magic:
2852       cmd.append("--magic=%s" % opts.magic)
2853
2854     if exp_size is not None:
2855       cmd.append("--expected-size=%s" % exp_size)
2856
2857     if cmd_prefix:
2858       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2859
2860     if cmd_suffix:
2861       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2862
2863     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2864
2865     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2866     # support for receiving a file descriptor for output
2867     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2868                       output=logfile)
2869
2870     # The import/export name is simply the status directory name
2871     return os.path.basename(status_dir)
2872
2873   except Exception:
2874     shutil.rmtree(status_dir, ignore_errors=True)
2875     raise
2876
2877
2878 def GetImportExportStatus(names):
2879   """Returns import/export daemon status.
2880
2881   @type names: sequence
2882   @param names: List of names
2883   @rtype: List of dicts
2884   @return: Returns a list of the state of each named import/export or None if a
2885            status couldn't be read
2886
2887   """
2888   result = []
2889
2890   for name in names:
2891     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2892                                  _IES_STATUS_FILE)
2893
2894     try:
2895       data = utils.ReadFile(status_file)
2896     except EnvironmentError, err:
2897       if err.errno != errno.ENOENT:
2898         raise
2899       data = None
2900
2901     if not data:
2902       result.append(None)
2903       continue
2904
2905     result.append(serializer.LoadJson(data))
2906
2907   return result
2908
2909
2910 def AbortImportExport(name):
2911   """Sends SIGTERM to a running import/export daemon.
2912
2913   """
2914   logging.info("Abort import/export %s", name)
2915
2916   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2917   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2918
2919   if pid:
2920     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2921                  name, pid)
2922     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2923
2924
2925 def CleanupImportExport(name):
2926   """Cleanup after an import or export.
2927
2928   If the import/export daemon is still running it's killed. Afterwards the
2929   whole status directory is removed.
2930
2931   """
2932   logging.info("Finalizing import/export %s", name)
2933
2934   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2935
2936   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2937
2938   if pid:
2939     logging.info("Import/export %s is still running with PID %s",
2940                  name, pid)
2941     utils.KillProcess(pid, waitpid=False)
2942
2943   shutil.rmtree(status_dir, ignore_errors=True)
2944
2945
2946 def _FindDisks(nodes_ip, disks):
2947   """Sets the physical ID on disks and returns the block devices.
2948
2949   """
2950   # set the correct physical ID
2951   my_name = netutils.Hostname.GetSysName()
2952   for cf in disks:
2953     cf.SetPhysicalID(my_name, nodes_ip)
2954
2955   bdevs = []
2956
2957   for cf in disks:
2958     rd = _RecursiveFindBD(cf)
2959     if rd is None:
2960       _Fail("Can't find device %s", cf)
2961     bdevs.append(rd)
2962   return bdevs
2963
2964
2965 def DrbdDisconnectNet(nodes_ip, disks):
2966   """Disconnects the network on a list of drbd devices.
2967
2968   """
2969   bdevs = _FindDisks(nodes_ip, disks)
2970
2971   # disconnect disks
2972   for rd in bdevs:
2973     try:
2974       rd.DisconnectNet()
2975     except errors.BlockDeviceError, err:
2976       _Fail("Can't change network configuration to standalone mode: %s",
2977             err, exc=True)
2978
2979
2980 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2981   """Attaches the network on a list of drbd devices.
2982
2983   """
2984   bdevs = _FindDisks(nodes_ip, disks)
2985
2986   if multimaster:
2987     for idx, rd in enumerate(bdevs):
2988       try:
2989         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2990       except EnvironmentError, err:
2991         _Fail("Can't create symlink: %s", err)
2992   # reconnect disks, switch to new master configuration and if
2993   # needed primary mode
2994   for rd in bdevs:
2995     try:
2996       rd.AttachNet(multimaster)
2997     except errors.BlockDeviceError, err:
2998       _Fail("Can't change network configuration: %s", err)
2999
3000   # wait until the disks are connected; we need to retry the re-attach
3001   # if the device becomes standalone, as this might happen if the one
3002   # node disconnects and reconnects in a different mode before the
3003   # other node reconnects; in this case, one or both of the nodes will
3004   # decide it has wrong configuration and switch to standalone
3005
3006   def _Attach():
3007     all_connected = True
3008
3009     for rd in bdevs:
3010       stats = rd.GetProcStatus()
3011
3012       all_connected = (all_connected and
3013                        (stats.is_connected or stats.is_in_resync))
3014
3015       if stats.is_standalone:
3016         # peer had different config info and this node became
3017         # standalone, even though this should not happen with the
3018         # new staged way of changing disk configs
3019         try:
3020           rd.AttachNet(multimaster)
3021         except errors.BlockDeviceError, err:
3022           _Fail("Can't change network configuration: %s", err)
3023
3024     if not all_connected:
3025       raise utils.RetryAgain()
3026
3027   try:
3028     # Start with a delay of 100 miliseconds and go up to 5 seconds
3029     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3030   except utils.RetryTimeout:
3031     _Fail("Timeout in disk reconnecting")
3032
3033   if multimaster:
3034     # change to primary mode
3035     for rd in bdevs:
3036       try:
3037         rd.Open()
3038       except errors.BlockDeviceError, err:
3039         _Fail("Can't change to primary mode: %s", err)
3040
3041
3042 def DrbdWaitSync(nodes_ip, disks):
3043   """Wait until DRBDs have synchronized.
3044
3045   """
3046   def _helper(rd):
3047     stats = rd.GetProcStatus()
3048     if not (stats.is_connected or stats.is_in_resync):
3049       raise utils.RetryAgain()
3050     return stats
3051
3052   bdevs = _FindDisks(nodes_ip, disks)
3053
3054   min_resync = 100
3055   alldone = True
3056   for rd in bdevs:
3057     try:
3058       # poll each second for 15 seconds
3059       stats = utils.Retry(_helper, 1, 15, args=[rd])
3060     except utils.RetryTimeout:
3061       stats = rd.GetProcStatus()
3062       # last check
3063       if not (stats.is_connected or stats.is_in_resync):
3064         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3065     alldone = alldone and (not stats.is_in_resync)
3066     if stats.sync_percent is not None:
3067       min_resync = min(min_resync, stats.sync_percent)
3068
3069   return (alldone, min_resync)
3070
3071
3072 def GetDrbdUsermodeHelper():
3073   """Returns DRBD usermode helper currently configured.
3074
3075   """
3076   try:
3077     return bdev.BaseDRBD.GetUsermodeHelper()
3078   except errors.BlockDeviceError, err:
3079     _Fail(str(err))
3080
3081
3082 def PowercycleNode(hypervisor_type):
3083   """Hard-powercycle the node.
3084
3085   Because we need to return first, and schedule the powercycle in the
3086   background, we won't be able to report failures nicely.
3087
3088   """
3089   hyper = hypervisor.GetHypervisor(hypervisor_type)
3090   try:
3091     pid = os.fork()
3092   except OSError:
3093     # if we can't fork, we'll pretend that we're in the child process
3094     pid = 0
3095   if pid > 0:
3096     return "Reboot scheduled in 5 seconds"
3097   # ensure the child is running on ram
3098   try:
3099     utils.Mlockall()
3100   except Exception: # pylint: disable-msg=W0703
3101     pass
3102   time.sleep(5)
3103   hyper.PowercycleNode()
3104
3105
3106 class HooksRunner(object):
3107   """Hook runner.
3108
3109   This class is instantiated on the node side (ganeti-noded) and not
3110   on the master side.
3111
3112   """
3113   def __init__(self, hooks_base_dir=None):
3114     """Constructor for hooks runner.
3115
3116     @type hooks_base_dir: str or None
3117     @param hooks_base_dir: if not None, this overrides the
3118         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3119
3120     """
3121     if hooks_base_dir is None:
3122       hooks_base_dir = constants.HOOKS_BASE_DIR
3123     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3124     # constant
3125     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3126
3127   def RunHooks(self, hpath, phase, env):
3128     """Run the scripts in the hooks directory.
3129
3130     @type hpath: str
3131     @param hpath: the path to the hooks directory which
3132         holds the scripts
3133     @type phase: str
3134     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3135         L{constants.HOOKS_PHASE_POST}
3136     @type env: dict
3137     @param env: dictionary with the environment for the hook
3138     @rtype: list
3139     @return: list of 3-element tuples:
3140       - script path
3141       - script result, either L{constants.HKR_SUCCESS} or
3142         L{constants.HKR_FAIL}
3143       - output of the script
3144
3145     @raise errors.ProgrammerError: for invalid input
3146         parameters
3147
3148     """
3149     if phase == constants.HOOKS_PHASE_PRE:
3150       suffix = "pre"
3151     elif phase == constants.HOOKS_PHASE_POST:
3152       suffix = "post"
3153     else:
3154       _Fail("Unknown hooks phase '%s'", phase)
3155
3156
3157     subdir = "%s-%s.d" % (hpath, suffix)
3158     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3159
3160     results = []
3161
3162     if not os.path.isdir(dir_name):
3163       # for non-existing/non-dirs, we simply exit instead of logging a
3164       # warning at every operation
3165       return results
3166
3167     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3168
3169     for (relname, relstatus, runresult)  in runparts_results:
3170       if relstatus == constants.RUNPARTS_SKIP:
3171         rrval = constants.HKR_SKIP
3172         output = ""
3173       elif relstatus == constants.RUNPARTS_ERR:
3174         rrval = constants.HKR_FAIL
3175         output = "Hook script execution error: %s" % runresult
3176       elif relstatus == constants.RUNPARTS_RUN:
3177         if runresult.failed:
3178           rrval = constants.HKR_FAIL
3179         else:
3180           rrval = constants.HKR_SUCCESS
3181         output = utils.SafeEncode(runresult.output.strip())
3182       results.append(("%s/%s" % (subdir, relname), rrval, output))
3183
3184     return results
3185
3186
3187 class IAllocatorRunner(object):
3188   """IAllocator runner.
3189
3190   This class is instantiated on the node side (ganeti-noded) and not on
3191   the master side.
3192
3193   """
3194   @staticmethod
3195   def Run(name, idata):
3196     """Run an iallocator script.
3197
3198     @type name: str
3199     @param name: the iallocator script name
3200     @type idata: str
3201     @param idata: the allocator input data
3202
3203     @rtype: tuple
3204     @return: two element tuple of:
3205        - status
3206        - either error message or stdout of allocator (for success)
3207
3208     """
3209     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3210                                   os.path.isfile)
3211     if alloc_script is None:
3212       _Fail("iallocator module '%s' not found in the search path", name)
3213
3214     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3215     try:
3216       os.write(fd, idata)
3217       os.close(fd)
3218       result = utils.RunCmd([alloc_script, fin_name])
3219       if result.failed:
3220         _Fail("iallocator module '%s' failed: %s, output '%s'",
3221               name, result.fail_reason, result.output)
3222     finally:
3223       os.unlink(fin_name)
3224
3225     return result.stdout
3226
3227
3228 class DevCacheManager(object):
3229   """Simple class for managing a cache of block device information.
3230
3231   """
3232   _DEV_PREFIX = "/dev/"
3233   _ROOT_DIR = constants.BDEV_CACHE_DIR
3234
3235   @classmethod
3236   def _ConvertPath(cls, dev_path):
3237     """Converts a /dev/name path to the cache file name.
3238
3239     This replaces slashes with underscores and strips the /dev
3240     prefix. It then returns the full path to the cache file.
3241
3242     @type dev_path: str
3243     @param dev_path: the C{/dev/} path name
3244     @rtype: str
3245     @return: the converted path name
3246
3247     """
3248     if dev_path.startswith(cls._DEV_PREFIX):
3249       dev_path = dev_path[len(cls._DEV_PREFIX):]
3250     dev_path = dev_path.replace("/", "_")
3251     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3252     return fpath
3253
3254   @classmethod
3255   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3256     """Updates the cache information for a given device.
3257
3258     @type dev_path: str
3259     @param dev_path: the pathname of the device
3260     @type owner: str
3261     @param owner: the owner (instance name) of the device
3262     @type on_primary: bool
3263     @param on_primary: whether this is the primary
3264         node nor not
3265     @type iv_name: str
3266     @param iv_name: the instance-visible name of the
3267         device, as in objects.Disk.iv_name
3268
3269     @rtype: None
3270
3271     """
3272     if dev_path is None:
3273       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3274       return
3275     fpath = cls._ConvertPath(dev_path)
3276     if on_primary:
3277       state = "primary"
3278     else:
3279       state = "secondary"
3280     if iv_name is None:
3281       iv_name = "not_visible"
3282     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3283     try:
3284       utils.WriteFile(fpath, data=fdata)
3285     except EnvironmentError, err:
3286       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3287
3288   @classmethod
3289   def RemoveCache(cls, dev_path):
3290     """Remove data for a dev_path.
3291
3292     This is just a wrapper over L{utils.RemoveFile} with a converted
3293     path name and logging.
3294
3295     @type dev_path: str
3296     @param dev_path: the pathname of the device
3297
3298     @rtype: None
3299
3300     """
3301     if dev_path is None:
3302       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3303       return
3304     fpath = cls._ConvertPath(dev_path)
3305     try:
3306       utils.RemoveFile(fpath)
3307     except EnvironmentError, err:
3308       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)