Shared storage instance migration
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.RAPI_USERS_FILE,
200     constants.CONFD_HMAC_KEY,
201     constants.CLUSTER_DOMAIN_SECRET_FILE,
202     ])
203
204   for hv_name in constants.HYPER_TYPES:
205     hv_class = hypervisor.GetHypervisorClass(hv_name)
206     allowed_files.update(hv_class.GetAncillaryFiles())
207
208   return frozenset(allowed_files)
209
210
211 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212
213
214 def JobQueuePurge():
215   """Removes job queue files and archived jobs.
216
217   @rtype: tuple
218   @return: True, None
219
220   """
221   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
222   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223
224
225 def GetMasterInfo():
226   """Returns master information.
227
228   This is an utility function to compute master information, either
229   for consumption here or from the node daemon.
230
231   @rtype: tuple
232   @return: master_netdev, master_ip, master_name, primary_ip_family
233   @raise RPCFail: in case of errors
234
235   """
236   try:
237     cfg = _GetConfig()
238     master_netdev = cfg.GetMasterNetdev()
239     master_ip = cfg.GetMasterIP()
240     master_node = cfg.GetMasterNode()
241     primary_ip_family = cfg.GetPrimaryIPFamily()
242   except errors.ConfigurationError, err:
243     _Fail("Cluster configuration incomplete: %s", err, exc=True)
244   return (master_netdev, master_ip, master_node, primary_ip_family)
245
246
247 def StartMaster(start_daemons, no_voting):
248   """Activate local node as master node.
249
250   The function will either try activate the IP address of the master
251   (unless someone else has it) or also start the master daemons, based
252   on the start_daemons parameter.
253
254   @type start_daemons: boolean
255   @param start_daemons: whether to start the master daemons
256       (ganeti-masterd and ganeti-rapi), or (if false) activate the
257       master ip
258   @type no_voting: boolean
259   @param no_voting: whether to start ganeti-masterd without a node vote
260       (if start_daemons is True), but still non-interactively
261   @rtype: None
262
263   """
264   # GetMasterInfo will raise an exception if not able to return data
265   master_netdev, master_ip, _, family = GetMasterInfo()
266
267   err_msgs = []
268   # either start the master and rapi daemons
269   if start_daemons:
270     if no_voting:
271       masterd_args = "--no-voting --yes-do-it"
272     else:
273       masterd_args = ""
274
275     env = {
276       "EXTRA_MASTERD_ARGS": masterd_args,
277       }
278
279     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
280     if result.failed:
281       msg = "Can't start Ganeti master: %s" % result.output
282       logging.error(msg)
283       err_msgs.append(msg)
284   # or activate the IP
285   else:
286     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
287       if netutils.IPAddress.Own(master_ip):
288         # we already have the ip:
289         logging.debug("Master IP already configured, doing nothing")
290       else:
291         msg = "Someone else has the master ip, not activating"
292         logging.error(msg)
293         err_msgs.append(msg)
294     else:
295       ipcls = netutils.IP4Address
296       if family == netutils.IP6Address.family:
297         ipcls = netutils.IP6Address
298
299       result = utils.RunCmd(["ip", "address", "add",
300                              "%s/%d" % (master_ip, ipcls.iplen),
301                              "dev", master_netdev, "label",
302                              "%s:0" % master_netdev])
303       if result.failed:
304         msg = "Can't activate master IP: %s" % result.output
305         logging.error(msg)
306         err_msgs.append(msg)
307
308       # we ignore the exit code of the following cmds
309       if ipcls == netutils.IP4Address:
310         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
311                       master_ip, master_ip])
312       elif ipcls == netutils.IP6Address:
313         try:
314           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
315         except errors.OpExecError:
316           # TODO: Better error reporting
317           logging.warning("Can't execute ndisc6, please install if missing")
318
319   if err_msgs:
320     _Fail("; ".join(err_msgs))
321
322
323 def StopMaster(stop_daemons):
324   """Deactivate this node as master.
325
326   The function will always try to deactivate the IP address of the
327   master. It will also stop the master daemons depending on the
328   stop_daemons parameter.
329
330   @type stop_daemons: boolean
331   @param stop_daemons: whether to also stop the master daemons
332       (ganeti-masterd and ganeti-rapi)
333   @rtype: None
334
335   """
336   # TODO: log and report back to the caller the error failures; we
337   # need to decide in which case we fail the RPC for this
338
339   # GetMasterInfo will raise an exception if not able to return data
340   master_netdev, master_ip, _, family = GetMasterInfo()
341
342   ipcls = netutils.IP4Address
343   if family == netutils.IP6Address.family:
344     ipcls = netutils.IP6Address
345
346   result = utils.RunCmd(["ip", "address", "del",
347                          "%s/%d" % (master_ip, ipcls.iplen),
348                          "dev", master_netdev])
349   if result.failed:
350     logging.error("Can't remove the master IP, error: %s", result.output)
351     # but otherwise ignore the failure
352
353   if stop_daemons:
354     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
355     if result.failed:
356       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
357                     " and error %s",
358                     result.cmd, result.exit_code, result.output)
359
360
361 def EtcHostsModify(mode, host, ip):
362   """Modify a host entry in /etc/hosts.
363
364   @param mode: The mode to operate. Either add or remove entry
365   @param host: The host to operate on
366   @param ip: The ip associated with the entry
367
368   """
369   if mode == constants.ETC_HOSTS_ADD:
370     if not ip:
371       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
372               " present")
373     utils.AddHostToEtcHosts(host, ip)
374   elif mode == constants.ETC_HOSTS_REMOVE:
375     if ip:
376       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
377               " parameter is present")
378     utils.RemoveHostFromEtcHosts(host)
379   else:
380     RPCFail("Mode not supported")
381
382
383 def LeaveCluster(modify_ssh_setup):
384   """Cleans up and remove the current node.
385
386   This function cleans up and prepares the current node to be removed
387   from the cluster.
388
389   If processing is successful, then it raises an
390   L{errors.QuitGanetiException} which is used as a special case to
391   shutdown the node daemon.
392
393   @param modify_ssh_setup: boolean
394
395   """
396   _CleanDirectory(constants.DATA_DIR)
397   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
398   JobQueuePurge()
399
400   if modify_ssh_setup:
401     try:
402       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
403
404       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
405
406       utils.RemoveFile(priv_key)
407       utils.RemoveFile(pub_key)
408     except errors.OpExecError:
409       logging.exception("Error while processing ssh files")
410
411   try:
412     utils.RemoveFile(constants.CONFD_HMAC_KEY)
413     utils.RemoveFile(constants.RAPI_CERT_FILE)
414     utils.RemoveFile(constants.NODED_CERT_FILE)
415   except: # pylint: disable-msg=W0702
416     logging.exception("Error while removing cluster secrets")
417
418   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
419   if result.failed:
420     logging.error("Command %s failed with exitcode %s and error %s",
421                   result.cmd, result.exit_code, result.output)
422
423   # Raise a custom exception (handled in ganeti-noded)
424   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
425
426
427 def GetNodeInfo(vgname, hypervisor_type):
428   """Gives back a hash with different information about the node.
429
430   @type vgname: C{string}
431   @param vgname: the name of the volume group to ask for disk space information
432   @type hypervisor_type: C{str}
433   @param hypervisor_type: the name of the hypervisor to ask for
434       memory information
435   @rtype: C{dict}
436   @return: dictionary with the following keys:
437       - vg_size is the size of the configured volume group in MiB
438       - vg_free is the free size of the volume group in MiB
439       - memory_dom0 is the memory allocated for domain0 in MiB
440       - memory_free is the currently available (free) ram in MiB
441       - memory_total is the total number of ram in MiB
442
443   """
444   outputarray = {}
445
446   if vgname is not None:
447     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
448     vg_free = vg_size = None
449     if vginfo:
450       vg_free = int(round(vginfo[0][0], 0))
451       vg_size = int(round(vginfo[0][1], 0))
452     outputarray['vg_size'] = vg_size
453     outputarray['vg_free'] = vg_free
454
455   if hypervisor_type is not None:
456     hyper = hypervisor.GetHypervisor(hypervisor_type)
457     hyp_info = hyper.GetNodeInfo()
458     if hyp_info is not None:
459       outputarray.update(hyp_info)
460
461   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462
463   return outputarray
464
465
466 def VerifyNode(what, cluster_name):
467   """Verify the status of the local node.
468
469   Based on the input L{what} parameter, various checks are done on the
470   local node.
471
472   If the I{filelist} key is present, this list of
473   files is checksummed and the file/checksum pairs are returned.
474
475   If the I{nodelist} key is present, we check that we have
476   connectivity via ssh with the target nodes (and check the hostname
477   report).
478
479   If the I{node-net-test} key is present, we check that we have
480   connectivity to the given nodes via both primary IP and, if
481   applicable, secondary IPs.
482
483   @type what: C{dict}
484   @param what: a dictionary of things to check:
485       - filelist: list of files for which to compute checksums
486       - nodelist: list of nodes we should check ssh communication with
487       - node-net-test: list of nodes we should check node daemon port
488         connectivity with
489       - hypervisor: list with hypervisors to run the verify for
490   @rtype: dict
491   @return: a dictionary with the same keys as the input dict, and
492       values representing the result of the checks
493
494   """
495   result = {}
496   my_name = netutils.Hostname.GetSysName()
497   port = netutils.GetDaemonPort(constants.NODED)
498   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
499
500   if constants.NV_HYPERVISOR in what and vm_capable:
501     result[constants.NV_HYPERVISOR] = tmp = {}
502     for hv_name in what[constants.NV_HYPERVISOR]:
503       try:
504         val = hypervisor.GetHypervisor(hv_name).Verify()
505       except errors.HypervisorError, err:
506         val = "Error while checking hypervisor: %s" % str(err)
507       tmp[hv_name] = val
508
509   if constants.NV_HVPARAMS in what and vm_capable:
510     result[constants.NV_HVPARAMS] = tmp = []
511     for source, hv_name, hvparms in what[constants.NV_HVPARAMS]:
512       try:
513         logging.info("Validating hv %s, %s", hv_name, hvparms)
514         hypervisor.GetHypervisor(hv_name).ValidateParameters(hvparms)
515       except errors.HypervisorError, err:
516         tmp.append((source, hv_name, str(err)))
517
518   if constants.NV_FILELIST in what:
519     result[constants.NV_FILELIST] = utils.FingerprintFiles(
520       what[constants.NV_FILELIST])
521
522   if constants.NV_NODELIST in what:
523     result[constants.NV_NODELIST] = tmp = {}
524     random.shuffle(what[constants.NV_NODELIST])
525     for node in what[constants.NV_NODELIST]:
526       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
527       if not success:
528         tmp[node] = message
529
530   if constants.NV_NODENETTEST in what:
531     result[constants.NV_NODENETTEST] = tmp = {}
532     my_pip = my_sip = None
533     for name, pip, sip in what[constants.NV_NODENETTEST]:
534       if name == my_name:
535         my_pip = pip
536         my_sip = sip
537         break
538     if not my_pip:
539       tmp[my_name] = ("Can't find my own primary/secondary IP"
540                       " in the node list")
541     else:
542       for name, pip, sip in what[constants.NV_NODENETTEST]:
543         fail = []
544         if not netutils.TcpPing(pip, port, source=my_pip):
545           fail.append("primary")
546         if sip != pip:
547           if not netutils.TcpPing(sip, port, source=my_sip):
548             fail.append("secondary")
549         if fail:
550           tmp[name] = ("failure using the %s interface(s)" %
551                        " and ".join(fail))
552
553   if constants.NV_MASTERIP in what:
554     # FIXME: add checks on incoming data structures (here and in the
555     # rest of the function)
556     master_name, master_ip = what[constants.NV_MASTERIP]
557     if master_name == my_name:
558       source = constants.IP4_ADDRESS_LOCALHOST
559     else:
560       source = None
561     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
562                                                   source=source)
563
564   if constants.NV_OOB_PATHS in what:
565     result[constants.NV_OOB_PATHS] = tmp = []
566     for path in what[constants.NV_OOB_PATHS]:
567       try:
568         st = os.stat(path)
569       except OSError, err:
570         tmp.append("error stating out of band helper: %s" % err)
571       else:
572         if stat.S_ISREG(st.st_mode):
573           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
574             tmp.append(None)
575           else:
576             tmp.append("out of band helper %s is not executable" % path)
577         else:
578           tmp.append("out of band helper %s is not a file" % path)
579
580   if constants.NV_LVLIST in what and vm_capable:
581     try:
582       val = GetVolumeList(utils.ListVolumeGroups().keys())
583     except RPCFail, err:
584       val = str(err)
585     result[constants.NV_LVLIST] = val
586
587   if constants.NV_INSTANCELIST in what and vm_capable:
588     # GetInstanceList can fail
589     try:
590       val = GetInstanceList(what[constants.NV_INSTANCELIST])
591     except RPCFail, err:
592       val = str(err)
593     result[constants.NV_INSTANCELIST] = val
594
595   if constants.NV_VGLIST in what and vm_capable:
596     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
597
598   if constants.NV_PVLIST in what and vm_capable:
599     result[constants.NV_PVLIST] = \
600       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
601                                    filter_allocatable=False)
602
603   if constants.NV_VERSION in what:
604     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
605                                     constants.RELEASE_VERSION)
606
607   if constants.NV_HVINFO in what and vm_capable:
608     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
609     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
610
611   if constants.NV_DRBDLIST in what and vm_capable:
612     try:
613       used_minors = bdev.DRBD8.GetUsedDevs().keys()
614     except errors.BlockDeviceError, err:
615       logging.warning("Can't get used minors list", exc_info=True)
616       used_minors = str(err)
617     result[constants.NV_DRBDLIST] = used_minors
618
619   if constants.NV_DRBDHELPER in what and vm_capable:
620     status = True
621     try:
622       payload = bdev.BaseDRBD.GetUsermodeHelper()
623     except errors.BlockDeviceError, err:
624       logging.error("Can't get DRBD usermode helper: %s", str(err))
625       status = False
626       payload = str(err)
627     result[constants.NV_DRBDHELPER] = (status, payload)
628
629   if constants.NV_NODESETUP in what:
630     result[constants.NV_NODESETUP] = tmpr = []
631     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
632       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
633                   " under /sys, missing required directories /sys/block"
634                   " and /sys/class/net")
635     if (not os.path.isdir("/proc/sys") or
636         not os.path.isfile("/proc/sysrq-trigger")):
637       tmpr.append("The procfs filesystem doesn't seem to be mounted"
638                   " under /proc, missing required directory /proc/sys and"
639                   " the file /proc/sysrq-trigger")
640
641   if constants.NV_TIME in what:
642     result[constants.NV_TIME] = utils.SplitTime(time.time())
643
644   if constants.NV_OSLIST in what and vm_capable:
645     result[constants.NV_OSLIST] = DiagnoseOS()
646
647   return result
648
649
650 def GetBlockDevSizes(devices):
651   """Return the size of the given block devices
652
653   @type devices: list
654   @param devices: list of block device nodes to query
655   @rtype: dict
656   @return:
657     dictionary of all block devices under /dev (key). The value is their
658     size in MiB.
659
660     {'/dev/disk/by-uuid/123456-12321231-312312-312': 124}
661
662   """
663   DEV_PREFIX = "/dev/"
664   blockdevs = {}
665
666   for devpath in devices:
667     if os.path.commonprefix([DEV_PREFIX, devpath]) != DEV_PREFIX:
668       continue
669
670     try:
671       st = os.stat(devpath)
672     except EnvironmentError, err:
673       logging.warning("Error stat()'ing device %s: %s", devpath, str(err))
674       continue
675
676     if stat.S_ISBLK(st.st_mode):
677       result = utils.RunCmd(["blockdev", "--getsize64", devpath])
678       if result.failed:
679         # We don't want to fail, just do not list this device as available
680         logging.warning("Cannot get size for block device %s", devpath)
681         continue
682
683       size = int(result.stdout) / (1024 * 1024)
684       blockdevs[devpath] = size
685   return blockdevs
686
687
688 def GetVolumeList(vg_names):
689   """Compute list of logical volumes and their size.
690
691   @type vg_names: list
692   @param vg_names: the volume groups whose LVs we should list, or
693       empty for all volume groups
694   @rtype: dict
695   @return:
696       dictionary of all partions (key) with value being a tuple of
697       their size (in MiB), inactive and online status::
698
699         {'xenvg/test1': ('20.06', True, True)}
700
701       in case of errors, a string is returned with the error
702       details.
703
704   """
705   lvs = {}
706   sep = '|'
707   if not vg_names:
708     vg_names = []
709   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
710                          "--separator=%s" % sep,
711                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
712   if result.failed:
713     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
714
715   for line in result.stdout.splitlines():
716     line = line.strip()
717     match = _LVSLINE_REGEX.match(line)
718     if not match:
719       logging.error("Invalid line returned from lvs output: '%s'", line)
720       continue
721     vg_name, name, size, attr = match.groups()
722     inactive = attr[4] == '-'
723     online = attr[5] == 'o'
724     virtual = attr[0] == 'v'
725     if virtual:
726       # we don't want to report such volumes as existing, since they
727       # don't really hold data
728       continue
729     lvs[vg_name+"/"+name] = (size, inactive, online)
730
731   return lvs
732
733
734 def ListVolumeGroups():
735   """List the volume groups and their size.
736
737   @rtype: dict
738   @return: dictionary with keys volume name and values the
739       size of the volume
740
741   """
742   return utils.ListVolumeGroups()
743
744
745 def NodeVolumes():
746   """List all volumes on this node.
747
748   @rtype: list
749   @return:
750     A list of dictionaries, each having four keys:
751       - name: the logical volume name,
752       - size: the size of the logical volume
753       - dev: the physical device on which the LV lives
754       - vg: the volume group to which it belongs
755
756     In case of errors, we return an empty list and log the
757     error.
758
759     Note that since a logical volume can live on multiple physical
760     volumes, the resulting list might include a logical volume
761     multiple times.
762
763   """
764   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
765                          "--separator=|",
766                          "--options=lv_name,lv_size,devices,vg_name"])
767   if result.failed:
768     _Fail("Failed to list logical volumes, lvs output: %s",
769           result.output)
770
771   def parse_dev(dev):
772     return dev.split('(')[0]
773
774   def handle_dev(dev):
775     return [parse_dev(x) for x in dev.split(",")]
776
777   def map_line(line):
778     line = [v.strip() for v in line]
779     return [{'name': line[0], 'size': line[1],
780              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
781
782   all_devs = []
783   for line in result.stdout.splitlines():
784     if line.count('|') >= 3:
785       all_devs.extend(map_line(line.split('|')))
786     else:
787       logging.warning("Strange line in the output from lvs: '%s'", line)
788   return all_devs
789
790
791 def BridgesExist(bridges_list):
792   """Check if a list of bridges exist on the current node.
793
794   @rtype: boolean
795   @return: C{True} if all of them exist, C{False} otherwise
796
797   """
798   missing = []
799   for bridge in bridges_list:
800     if not utils.BridgeExists(bridge):
801       missing.append(bridge)
802
803   if missing:
804     _Fail("Missing bridges %s", utils.CommaJoin(missing))
805
806
807 def GetInstanceList(hypervisor_list):
808   """Provides a list of instances.
809
810   @type hypervisor_list: list
811   @param hypervisor_list: the list of hypervisors to query information
812
813   @rtype: list
814   @return: a list of all running instances on the current node
815     - instance1.example.com
816     - instance2.example.com
817
818   """
819   results = []
820   for hname in hypervisor_list:
821     try:
822       names = hypervisor.GetHypervisor(hname).ListInstances()
823       results.extend(names)
824     except errors.HypervisorError, err:
825       _Fail("Error enumerating instances (hypervisor %s): %s",
826             hname, err, exc=True)
827
828   return results
829
830
831 def GetInstanceInfo(instance, hname):
832   """Gives back the information about an instance as a dictionary.
833
834   @type instance: string
835   @param instance: the instance name
836   @type hname: string
837   @param hname: the hypervisor type of the instance
838
839   @rtype: dict
840   @return: dictionary with the following keys:
841       - memory: memory size of instance (int)
842       - state: xen state of instance (string)
843       - time: cpu time of instance (float)
844
845   """
846   output = {}
847
848   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
849   if iinfo is not None:
850     output['memory'] = iinfo[2]
851     output['state'] = iinfo[4]
852     output['time'] = iinfo[5]
853
854   return output
855
856
857 def GetInstanceMigratable(instance):
858   """Gives whether an instance can be migrated.
859
860   @type instance: L{objects.Instance}
861   @param instance: object representing the instance to be checked.
862
863   @rtype: tuple
864   @return: tuple of (result, description) where:
865       - result: whether the instance can be migrated or not
866       - description: a description of the issue, if relevant
867
868   """
869   hyper = hypervisor.GetHypervisor(instance.hypervisor)
870   iname = instance.name
871   if iname not in hyper.ListInstances():
872     _Fail("Instance %s is not running", iname)
873
874   for idx in range(len(instance.disks)):
875     link_name = _GetBlockDevSymlinkPath(iname, idx)
876     if not os.path.islink(link_name):
877       logging.warning("Instance %s is missing symlink %s for disk %d",
878                       iname, link_name, idx)
879
880
881 def GetAllInstancesInfo(hypervisor_list):
882   """Gather data about all instances.
883
884   This is the equivalent of L{GetInstanceInfo}, except that it
885   computes data for all instances at once, thus being faster if one
886   needs data about more than one instance.
887
888   @type hypervisor_list: list
889   @param hypervisor_list: list of hypervisors to query for instance data
890
891   @rtype: dict
892   @return: dictionary of instance: data, with data having the following keys:
893       - memory: memory size of instance (int)
894       - state: xen state of instance (string)
895       - time: cpu time of instance (float)
896       - vcpus: the number of vcpus
897
898   """
899   output = {}
900
901   for hname in hypervisor_list:
902     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
903     if iinfo:
904       for name, _, memory, vcpus, state, times in iinfo:
905         value = {
906           'memory': memory,
907           'vcpus': vcpus,
908           'state': state,
909           'time': times,
910           }
911         if name in output:
912           # we only check static parameters, like memory and vcpus,
913           # and not state and time which can change between the
914           # invocations of the different hypervisors
915           for key in 'memory', 'vcpus':
916             if value[key] != output[name][key]:
917               _Fail("Instance %s is running twice"
918                     " with different parameters", name)
919         output[name] = value
920
921   return output
922
923
924 def _InstanceLogName(kind, os_name, instance):
925   """Compute the OS log filename for a given instance and operation.
926
927   The instance name and os name are passed in as strings since not all
928   operations have these as part of an instance object.
929
930   @type kind: string
931   @param kind: the operation type (e.g. add, import, etc.)
932   @type os_name: string
933   @param os_name: the os name
934   @type instance: string
935   @param instance: the name of the instance being imported/added/etc.
936
937   """
938   # TODO: Use tempfile.mkstemp to create unique filename
939   base = ("%s-%s-%s-%s.log" %
940           (kind, os_name, instance, utils.TimestampForFilename()))
941   return utils.PathJoin(constants.LOG_OS_DIR, base)
942
943
944 def InstanceOsAdd(instance, reinstall, debug):
945   """Add an OS to an instance.
946
947   @type instance: L{objects.Instance}
948   @param instance: Instance whose OS is to be installed
949   @type reinstall: boolean
950   @param reinstall: whether this is an instance reinstall
951   @type debug: integer
952   @param debug: debug level, passed to the OS scripts
953   @rtype: None
954
955   """
956   inst_os = OSFromDisk(instance.os)
957
958   create_env = OSEnvironment(instance, inst_os, debug)
959   if reinstall:
960     create_env['INSTANCE_REINSTALL'] = "1"
961
962   logfile = _InstanceLogName("add", instance.os, instance.name)
963
964   result = utils.RunCmd([inst_os.create_script], env=create_env,
965                         cwd=inst_os.path, output=logfile,)
966   if result.failed:
967     logging.error("os create command '%s' returned error: %s, logfile: %s,"
968                   " output: %s", result.cmd, result.fail_reason, logfile,
969                   result.output)
970     lines = [utils.SafeEncode(val)
971              for val in utils.TailFile(logfile, lines=20)]
972     _Fail("OS create script failed (%s), last lines in the"
973           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
974
975
976 def RunRenameInstance(instance, old_name, debug):
977   """Run the OS rename script for an instance.
978
979   @type instance: L{objects.Instance}
980   @param instance: Instance whose OS is to be installed
981   @type old_name: string
982   @param old_name: previous instance name
983   @type debug: integer
984   @param debug: debug level, passed to the OS scripts
985   @rtype: boolean
986   @return: the success of the operation
987
988   """
989   inst_os = OSFromDisk(instance.os)
990
991   rename_env = OSEnvironment(instance, inst_os, debug)
992   rename_env['OLD_INSTANCE_NAME'] = old_name
993
994   logfile = _InstanceLogName("rename", instance.os,
995                              "%s-%s" % (old_name, instance.name))
996
997   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
998                         cwd=inst_os.path, output=logfile)
999
1000   if result.failed:
1001     logging.error("os create command '%s' returned error: %s output: %s",
1002                   result.cmd, result.fail_reason, result.output)
1003     lines = [utils.SafeEncode(val)
1004              for val in utils.TailFile(logfile, lines=20)]
1005     _Fail("OS rename script failed (%s), last lines in the"
1006           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
1007
1008
1009 def _GetBlockDevSymlinkPath(instance_name, idx):
1010   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
1011                         (instance_name, constants.DISK_SEPARATOR, idx))
1012
1013
1014 def _SymlinkBlockDev(instance_name, device_path, idx):
1015   """Set up symlinks to a instance's block device.
1016
1017   This is an auxiliary function run when an instance is start (on the primary
1018   node) or when an instance is migrated (on the target node).
1019
1020
1021   @param instance_name: the name of the target instance
1022   @param device_path: path of the physical block device, on the node
1023   @param idx: the disk index
1024   @return: absolute path to the disk's symlink
1025
1026   """
1027   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1028   try:
1029     os.symlink(device_path, link_name)
1030   except OSError, err:
1031     if err.errno == errno.EEXIST:
1032       if (not os.path.islink(link_name) or
1033           os.readlink(link_name) != device_path):
1034         os.remove(link_name)
1035         os.symlink(device_path, link_name)
1036     else:
1037       raise
1038
1039   return link_name
1040
1041
1042 def _RemoveBlockDevLinks(instance_name, disks):
1043   """Remove the block device symlinks belonging to the given instance.
1044
1045   """
1046   for idx, _ in enumerate(disks):
1047     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1048     if os.path.islink(link_name):
1049       try:
1050         os.remove(link_name)
1051       except OSError:
1052         logging.exception("Can't remove symlink '%s'", link_name)
1053
1054
1055 def _GatherAndLinkBlockDevs(instance):
1056   """Set up an instance's block device(s).
1057
1058   This is run on the primary node at instance startup. The block
1059   devices must be already assembled.
1060
1061   @type instance: L{objects.Instance}
1062   @param instance: the instance whose disks we shoul assemble
1063   @rtype: list
1064   @return: list of (disk_object, device_path)
1065
1066   """
1067   block_devices = []
1068   for idx, disk in enumerate(instance.disks):
1069     device = _RecursiveFindBD(disk)
1070     if device is None:
1071       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1072                                     str(disk))
1073     device.Open()
1074     try:
1075       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1076     except OSError, e:
1077       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1078                                     e.strerror)
1079
1080     block_devices.append((disk, link_name))
1081
1082   return block_devices
1083
1084
1085 def StartInstance(instance):
1086   """Start an instance.
1087
1088   @type instance: L{objects.Instance}
1089   @param instance: the instance object
1090   @rtype: None
1091
1092   """
1093   running_instances = GetInstanceList([instance.hypervisor])
1094
1095   if instance.name in running_instances:
1096     logging.info("Instance %s already running, not starting", instance.name)
1097     return
1098
1099   try:
1100     block_devices = _GatherAndLinkBlockDevs(instance)
1101     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1102     hyper.StartInstance(instance, block_devices)
1103   except errors.BlockDeviceError, err:
1104     _Fail("Block device error: %s", err, exc=True)
1105   except errors.HypervisorError, err:
1106     _RemoveBlockDevLinks(instance.name, instance.disks)
1107     _Fail("Hypervisor error: %s", err, exc=True)
1108
1109
1110 def InstanceShutdown(instance, timeout):
1111   """Shut an instance down.
1112
1113   @note: this functions uses polling with a hardcoded timeout.
1114
1115   @type instance: L{objects.Instance}
1116   @param instance: the instance object
1117   @type timeout: integer
1118   @param timeout: maximum timeout for soft shutdown
1119   @rtype: None
1120
1121   """
1122   hv_name = instance.hypervisor
1123   hyper = hypervisor.GetHypervisor(hv_name)
1124   iname = instance.name
1125
1126   if instance.name not in hyper.ListInstances():
1127     logging.info("Instance %s not running, doing nothing", iname)
1128     return
1129
1130   class _TryShutdown:
1131     def __init__(self):
1132       self.tried_once = False
1133
1134     def __call__(self):
1135       if iname not in hyper.ListInstances():
1136         return
1137
1138       try:
1139         hyper.StopInstance(instance, retry=self.tried_once)
1140       except errors.HypervisorError, err:
1141         if iname not in hyper.ListInstances():
1142           # if the instance is no longer existing, consider this a
1143           # success and go to cleanup
1144           return
1145
1146         _Fail("Failed to stop instance %s: %s", iname, err)
1147
1148       self.tried_once = True
1149
1150       raise utils.RetryAgain()
1151
1152   try:
1153     utils.Retry(_TryShutdown(), 5, timeout)
1154   except utils.RetryTimeout:
1155     # the shutdown did not succeed
1156     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1157
1158     try:
1159       hyper.StopInstance(instance, force=True)
1160     except errors.HypervisorError, err:
1161       if iname in hyper.ListInstances():
1162         # only raise an error if the instance still exists, otherwise
1163         # the error could simply be "instance ... unknown"!
1164         _Fail("Failed to force stop instance %s: %s", iname, err)
1165
1166     time.sleep(1)
1167
1168     if iname in hyper.ListInstances():
1169       _Fail("Could not shutdown instance %s even by destroy", iname)
1170
1171   try:
1172     hyper.CleanupInstance(instance.name)
1173   except errors.HypervisorError, err:
1174     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1175
1176   _RemoveBlockDevLinks(iname, instance.disks)
1177
1178
1179 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1180   """Reboot an instance.
1181
1182   @type instance: L{objects.Instance}
1183   @param instance: the instance object to reboot
1184   @type reboot_type: str
1185   @param reboot_type: the type of reboot, one the following
1186     constants:
1187       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1188         instance OS, do not recreate the VM
1189       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1190         restart the VM (at the hypervisor level)
1191       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1192         not accepted here, since that mode is handled differently, in
1193         cmdlib, and translates into full stop and start of the
1194         instance (instead of a call_instance_reboot RPC)
1195   @type shutdown_timeout: integer
1196   @param shutdown_timeout: maximum timeout for soft shutdown
1197   @rtype: None
1198
1199   """
1200   running_instances = GetInstanceList([instance.hypervisor])
1201
1202   if instance.name not in running_instances:
1203     _Fail("Cannot reboot instance %s that is not running", instance.name)
1204
1205   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1206   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1207     try:
1208       hyper.RebootInstance(instance)
1209     except errors.HypervisorError, err:
1210       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1211   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1212     try:
1213       InstanceShutdown(instance, shutdown_timeout)
1214       return StartInstance(instance)
1215     except errors.HypervisorError, err:
1216       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1217   else:
1218     _Fail("Invalid reboot_type received: %s", reboot_type)
1219
1220
1221 def MigrationInfo(instance):
1222   """Gather information about an instance to be migrated.
1223
1224   @type instance: L{objects.Instance}
1225   @param instance: the instance definition
1226
1227   """
1228   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1229   try:
1230     info = hyper.MigrationInfo(instance)
1231   except errors.HypervisorError, err:
1232     _Fail("Failed to fetch migration information: %s", err, exc=True)
1233   return info
1234
1235
1236 def AcceptInstance(instance, info, target):
1237   """Prepare the node to accept an instance.
1238
1239   @type instance: L{objects.Instance}
1240   @param instance: the instance definition
1241   @type info: string/data (opaque)
1242   @param info: migration information, from the source node
1243   @type target: string
1244   @param target: target host (usually ip), on this node
1245
1246   """
1247   # TODO: why is this required only for DTS_EXT_MIRROR?
1248   if instance.disk_template in constants.DTS_EXT_MIRROR:
1249     # Create the symlinks, as the disks are not active
1250     # in any way
1251     try:
1252       _GatherAndLinkBlockDevs(instance)
1253     except errors.BlockDeviceError, err:
1254       _Fail("Block device error: %s", err, exc=True)
1255
1256   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1257   try:
1258     hyper.AcceptInstance(instance, info, target)
1259   except errors.HypervisorError, err:
1260     if instance.disk_template in constants.DTS_EXT_MIRROR:
1261       _RemoveBlockDevLinks(instance.name, instance.disks)
1262     _Fail("Failed to accept instance: %s", err, exc=True)
1263
1264
1265 def FinalizeMigration(instance, info, success):
1266   """Finalize any preparation to accept an instance.
1267
1268   @type instance: L{objects.Instance}
1269   @param instance: the instance definition
1270   @type info: string/data (opaque)
1271   @param info: migration information, from the source node
1272   @type success: boolean
1273   @param success: whether the migration was a success or a failure
1274
1275   """
1276   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1277   try:
1278     hyper.FinalizeMigration(instance, info, success)
1279   except errors.HypervisorError, err:
1280     _Fail("Failed to finalize migration: %s", err, exc=True)
1281
1282
1283 def MigrateInstance(instance, target, live):
1284   """Migrates an instance to another node.
1285
1286   @type instance: L{objects.Instance}
1287   @param instance: the instance definition
1288   @type target: string
1289   @param target: the target node name
1290   @type live: boolean
1291   @param live: whether the migration should be done live or not (the
1292       interpretation of this parameter is left to the hypervisor)
1293   @rtype: tuple
1294   @return: a tuple of (success, msg) where:
1295       - succes is a boolean denoting the success/failure of the operation
1296       - msg is a string with details in case of failure
1297
1298   """
1299   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1300
1301   try:
1302     hyper.MigrateInstance(instance, target, live)
1303   except errors.HypervisorError, err:
1304     _Fail("Failed to migrate instance: %s", err, exc=True)
1305
1306
1307 def BlockdevCreate(disk, size, owner, on_primary, info):
1308   """Creates a block device for an instance.
1309
1310   @type disk: L{objects.Disk}
1311   @param disk: the object describing the disk we should create
1312   @type size: int
1313   @param size: the size of the physical underlying device, in MiB
1314   @type owner: str
1315   @param owner: the name of the instance for which disk is created,
1316       used for device cache data
1317   @type on_primary: boolean
1318   @param on_primary:  indicates if it is the primary node or not
1319   @type info: string
1320   @param info: string that will be sent to the physical device
1321       creation, used for example to set (LVM) tags on LVs
1322
1323   @return: the new unique_id of the device (this can sometime be
1324       computed only after creation), or None. On secondary nodes,
1325       it's not required to return anything.
1326
1327   """
1328   # TODO: remove the obsolete 'size' argument
1329   # pylint: disable-msg=W0613
1330   clist = []
1331   if disk.children:
1332     for child in disk.children:
1333       try:
1334         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1335       except errors.BlockDeviceError, err:
1336         _Fail("Can't assemble device %s: %s", child, err)
1337       if on_primary or disk.AssembleOnSecondary():
1338         # we need the children open in case the device itself has to
1339         # be assembled
1340         try:
1341           # pylint: disable-msg=E1103
1342           crdev.Open()
1343         except errors.BlockDeviceError, err:
1344           _Fail("Can't make child '%s' read-write: %s", child, err)
1345       clist.append(crdev)
1346
1347   try:
1348     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1349   except errors.BlockDeviceError, err:
1350     _Fail("Can't create block device: %s", err)
1351
1352   if on_primary or disk.AssembleOnSecondary():
1353     try:
1354       device.Assemble()
1355     except errors.BlockDeviceError, err:
1356       _Fail("Can't assemble device after creation, unusual event: %s", err)
1357     device.SetSyncSpeed(constants.SYNC_SPEED)
1358     if on_primary or disk.OpenOnSecondary():
1359       try:
1360         device.Open(force=True)
1361       except errors.BlockDeviceError, err:
1362         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1363     DevCacheManager.UpdateCache(device.dev_path, owner,
1364                                 on_primary, disk.iv_name)
1365
1366   device.SetInfo(info)
1367
1368   return device.unique_id
1369
1370
1371 def _WipeDevice(path, offset, size):
1372   """This function actually wipes the device.
1373
1374   @param path: The path to the device to wipe
1375   @param offset: The offset in MiB in the file
1376   @param size: The size in MiB to write
1377
1378   """
1379   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1380          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1381          "count=%d" % size]
1382   result = utils.RunCmd(cmd)
1383
1384   if result.failed:
1385     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1386           result.fail_reason, result.output)
1387
1388
1389 def BlockdevWipe(disk, offset, size):
1390   """Wipes a block device.
1391
1392   @type disk: L{objects.Disk}
1393   @param disk: the disk object we want to wipe
1394   @type offset: int
1395   @param offset: The offset in MiB in the file
1396   @type size: int
1397   @param size: The size in MiB to write
1398
1399   """
1400   try:
1401     rdev = _RecursiveFindBD(disk)
1402   except errors.BlockDeviceError:
1403     rdev = None
1404
1405   if not rdev:
1406     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1407
1408   # Do cross verify some of the parameters
1409   if offset > rdev.size:
1410     _Fail("Offset is bigger than device size")
1411   if (offset + size) > rdev.size:
1412     _Fail("The provided offset and size to wipe is bigger than device size")
1413
1414   _WipeDevice(rdev.dev_path, offset, size)
1415
1416
1417 def BlockdevPauseResumeSync(disks, pause):
1418   """Pause or resume the sync of the block device.
1419
1420   @type disks: list of L{objects.Disk}
1421   @param disks: the disks object we want to pause/resume
1422   @type pause: bool
1423   @param pause: Wheater to pause or resume
1424
1425   """
1426   success = []
1427   for disk in disks:
1428     try:
1429       rdev = _RecursiveFindBD(disk)
1430     except errors.BlockDeviceError:
1431       rdev = None
1432
1433     if not rdev:
1434       success.append((False, ("Cannot change sync for device %s:"
1435                               " device not found" % disk.iv_name)))
1436       continue
1437
1438     result = rdev.PauseResumeSync(pause)
1439
1440     if result:
1441       success.append((result, None))
1442     else:
1443       if pause:
1444         msg = "Pause"
1445       else:
1446         msg = "Resume"
1447       success.append((result, "%s for device %s failed" % (msg, disk.iv_name)))
1448
1449   return success
1450
1451
1452 def BlockdevRemove(disk):
1453   """Remove a block device.
1454
1455   @note: This is intended to be called recursively.
1456
1457   @type disk: L{objects.Disk}
1458   @param disk: the disk object we should remove
1459   @rtype: boolean
1460   @return: the success of the operation
1461
1462   """
1463   msgs = []
1464   try:
1465     rdev = _RecursiveFindBD(disk)
1466   except errors.BlockDeviceError, err:
1467     # probably can't attach
1468     logging.info("Can't attach to device %s in remove", disk)
1469     rdev = None
1470   if rdev is not None:
1471     r_path = rdev.dev_path
1472     try:
1473       rdev.Remove()
1474     except errors.BlockDeviceError, err:
1475       msgs.append(str(err))
1476     if not msgs:
1477       DevCacheManager.RemoveCache(r_path)
1478
1479   if disk.children:
1480     for child in disk.children:
1481       try:
1482         BlockdevRemove(child)
1483       except RPCFail, err:
1484         msgs.append(str(err))
1485
1486   if msgs:
1487     _Fail("; ".join(msgs))
1488
1489
1490 def _RecursiveAssembleBD(disk, owner, as_primary):
1491   """Activate a block device for an instance.
1492
1493   This is run on the primary and secondary nodes for an instance.
1494
1495   @note: this function is called recursively.
1496
1497   @type disk: L{objects.Disk}
1498   @param disk: the disk we try to assemble
1499   @type owner: str
1500   @param owner: the name of the instance which owns the disk
1501   @type as_primary: boolean
1502   @param as_primary: if we should make the block device
1503       read/write
1504
1505   @return: the assembled device or None (in case no device
1506       was assembled)
1507   @raise errors.BlockDeviceError: in case there is an error
1508       during the activation of the children or the device
1509       itself
1510
1511   """
1512   children = []
1513   if disk.children:
1514     mcn = disk.ChildrenNeeded()
1515     if mcn == -1:
1516       mcn = 0 # max number of Nones allowed
1517     else:
1518       mcn = len(disk.children) - mcn # max number of Nones
1519     for chld_disk in disk.children:
1520       try:
1521         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1522       except errors.BlockDeviceError, err:
1523         if children.count(None) >= mcn:
1524           raise
1525         cdev = None
1526         logging.error("Error in child activation (but continuing): %s",
1527                       str(err))
1528       children.append(cdev)
1529
1530   if as_primary or disk.AssembleOnSecondary():
1531     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1532     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1533     result = r_dev
1534     if as_primary or disk.OpenOnSecondary():
1535       r_dev.Open()
1536     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1537                                 as_primary, disk.iv_name)
1538
1539   else:
1540     result = True
1541   return result
1542
1543
1544 def BlockdevAssemble(disk, owner, as_primary, idx):
1545   """Activate a block device for an instance.
1546
1547   This is a wrapper over _RecursiveAssembleBD.
1548
1549   @rtype: str or boolean
1550   @return: a C{/dev/...} path for primary nodes, and
1551       C{True} for secondary nodes
1552
1553   """
1554   try:
1555     result = _RecursiveAssembleBD(disk, owner, as_primary)
1556     if isinstance(result, bdev.BlockDev):
1557       # pylint: disable-msg=E1103
1558       result = result.dev_path
1559       if as_primary:
1560         _SymlinkBlockDev(owner, result, idx)
1561   except errors.BlockDeviceError, err:
1562     _Fail("Error while assembling disk: %s", err, exc=True)
1563   except OSError, err:
1564     _Fail("Error while symlinking disk: %s", err, exc=True)
1565
1566   return result
1567
1568
1569 def BlockdevShutdown(disk):
1570   """Shut down a block device.
1571
1572   First, if the device is assembled (Attach() is successful), then
1573   the device is shutdown. Then the children of the device are
1574   shutdown.
1575
1576   This function is called recursively. Note that we don't cache the
1577   children or such, as oppossed to assemble, shutdown of different
1578   devices doesn't require that the upper device was active.
1579
1580   @type disk: L{objects.Disk}
1581   @param disk: the description of the disk we should
1582       shutdown
1583   @rtype: None
1584
1585   """
1586   msgs = []
1587   r_dev = _RecursiveFindBD(disk)
1588   if r_dev is not None:
1589     r_path = r_dev.dev_path
1590     try:
1591       r_dev.Shutdown()
1592       DevCacheManager.RemoveCache(r_path)
1593     except errors.BlockDeviceError, err:
1594       msgs.append(str(err))
1595
1596   if disk.children:
1597     for child in disk.children:
1598       try:
1599         BlockdevShutdown(child)
1600       except RPCFail, err:
1601         msgs.append(str(err))
1602
1603   if msgs:
1604     _Fail("; ".join(msgs))
1605
1606
1607 def BlockdevAddchildren(parent_cdev, new_cdevs):
1608   """Extend a mirrored block device.
1609
1610   @type parent_cdev: L{objects.Disk}
1611   @param parent_cdev: the disk to which we should add children
1612   @type new_cdevs: list of L{objects.Disk}
1613   @param new_cdevs: the list of children which we should add
1614   @rtype: None
1615
1616   """
1617   parent_bdev = _RecursiveFindBD(parent_cdev)
1618   if parent_bdev is None:
1619     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1620   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1621   if new_bdevs.count(None) > 0:
1622     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1623   parent_bdev.AddChildren(new_bdevs)
1624
1625
1626 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1627   """Shrink a mirrored block device.
1628
1629   @type parent_cdev: L{objects.Disk}
1630   @param parent_cdev: the disk from which we should remove children
1631   @type new_cdevs: list of L{objects.Disk}
1632   @param new_cdevs: the list of children which we should remove
1633   @rtype: None
1634
1635   """
1636   parent_bdev = _RecursiveFindBD(parent_cdev)
1637   if parent_bdev is None:
1638     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1639   devs = []
1640   for disk in new_cdevs:
1641     rpath = disk.StaticDevPath()
1642     if rpath is None:
1643       bd = _RecursiveFindBD(disk)
1644       if bd is None:
1645         _Fail("Can't find device %s while removing children", disk)
1646       else:
1647         devs.append(bd.dev_path)
1648     else:
1649       if not utils.IsNormAbsPath(rpath):
1650         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1651       devs.append(rpath)
1652   parent_bdev.RemoveChildren(devs)
1653
1654
1655 def BlockdevGetmirrorstatus(disks):
1656   """Get the mirroring status of a list of devices.
1657
1658   @type disks: list of L{objects.Disk}
1659   @param disks: the list of disks which we should query
1660   @rtype: disk
1661   @return: List of L{objects.BlockDevStatus}, one for each disk
1662   @raise errors.BlockDeviceError: if any of the disks cannot be
1663       found
1664
1665   """
1666   stats = []
1667   for dsk in disks:
1668     rbd = _RecursiveFindBD(dsk)
1669     if rbd is None:
1670       _Fail("Can't find device %s", dsk)
1671
1672     stats.append(rbd.CombinedSyncStatus())
1673
1674   return stats
1675
1676
1677 def BlockdevGetmirrorstatusMulti(disks):
1678   """Get the mirroring status of a list of devices.
1679
1680   @type disks: list of L{objects.Disk}
1681   @param disks: the list of disks which we should query
1682   @rtype: disk
1683   @return: List of tuples, (bool, status), one for each disk; bool denotes
1684     success/failure, status is L{objects.BlockDevStatus} on success, string
1685     otherwise
1686
1687   """
1688   result = []
1689   for disk in disks:
1690     try:
1691       rbd = _RecursiveFindBD(disk)
1692       if rbd is None:
1693         result.append((False, "Can't find device %s" % disk))
1694         continue
1695
1696       status = rbd.CombinedSyncStatus()
1697     except errors.BlockDeviceError, err:
1698       logging.exception("Error while getting disk status")
1699       result.append((False, str(err)))
1700     else:
1701       result.append((True, status))
1702
1703   assert len(disks) == len(result)
1704
1705   return result
1706
1707
1708 def _RecursiveFindBD(disk):
1709   """Check if a device is activated.
1710
1711   If so, return information about the real device.
1712
1713   @type disk: L{objects.Disk}
1714   @param disk: the disk object we need to find
1715
1716   @return: None if the device can't be found,
1717       otherwise the device instance
1718
1719   """
1720   children = []
1721   if disk.children:
1722     for chdisk in disk.children:
1723       children.append(_RecursiveFindBD(chdisk))
1724
1725   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1726
1727
1728 def _OpenRealBD(disk):
1729   """Opens the underlying block device of a disk.
1730
1731   @type disk: L{objects.Disk}
1732   @param disk: the disk object we want to open
1733
1734   """
1735   real_disk = _RecursiveFindBD(disk)
1736   if real_disk is None:
1737     _Fail("Block device '%s' is not set up", disk)
1738
1739   real_disk.Open()
1740
1741   return real_disk
1742
1743
1744 def BlockdevFind(disk):
1745   """Check if a device is activated.
1746
1747   If it is, return information about the real device.
1748
1749   @type disk: L{objects.Disk}
1750   @param disk: the disk to find
1751   @rtype: None or objects.BlockDevStatus
1752   @return: None if the disk cannot be found, otherwise a the current
1753            information
1754
1755   """
1756   try:
1757     rbd = _RecursiveFindBD(disk)
1758   except errors.BlockDeviceError, err:
1759     _Fail("Failed to find device: %s", err, exc=True)
1760
1761   if rbd is None:
1762     return None
1763
1764   return rbd.GetSyncStatus()
1765
1766
1767 def BlockdevGetsize(disks):
1768   """Computes the size of the given disks.
1769
1770   If a disk is not found, returns None instead.
1771
1772   @type disks: list of L{objects.Disk}
1773   @param disks: the list of disk to compute the size for
1774   @rtype: list
1775   @return: list with elements None if the disk cannot be found,
1776       otherwise the size
1777
1778   """
1779   result = []
1780   for cf in disks:
1781     try:
1782       rbd = _RecursiveFindBD(cf)
1783     except errors.BlockDeviceError:
1784       result.append(None)
1785       continue
1786     if rbd is None:
1787       result.append(None)
1788     else:
1789       result.append(rbd.GetActualSize())
1790   return result
1791
1792
1793 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1794   """Export a block device to a remote node.
1795
1796   @type disk: L{objects.Disk}
1797   @param disk: the description of the disk to export
1798   @type dest_node: str
1799   @param dest_node: the destination node to export to
1800   @type dest_path: str
1801   @param dest_path: the destination path on the target node
1802   @type cluster_name: str
1803   @param cluster_name: the cluster name, needed for SSH hostalias
1804   @rtype: None
1805
1806   """
1807   real_disk = _OpenRealBD(disk)
1808
1809   # the block size on the read dd is 1MiB to match our units
1810   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1811                                "dd if=%s bs=1048576 count=%s",
1812                                real_disk.dev_path, str(disk.size))
1813
1814   # we set here a smaller block size as, due to ssh buffering, more
1815   # than 64-128k will mostly ignored; we use nocreat to fail if the
1816   # device is not already there or we pass a wrong path; we use
1817   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1818   # to not buffer too much memory; this means that at best, we flush
1819   # every 64k, which will not be very fast
1820   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1821                                 " oflag=dsync", dest_path)
1822
1823   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1824                                                    constants.GANETI_RUNAS,
1825                                                    destcmd)
1826
1827   # all commands have been checked, so we're safe to combine them
1828   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1829
1830   result = utils.RunCmd(["bash", "-c", command])
1831
1832   if result.failed:
1833     _Fail("Disk copy command '%s' returned error: %s"
1834           " output: %s", command, result.fail_reason, result.output)
1835
1836
1837 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1838   """Write a file to the filesystem.
1839
1840   This allows the master to overwrite(!) a file. It will only perform
1841   the operation if the file belongs to a list of configuration files.
1842
1843   @type file_name: str
1844   @param file_name: the target file name
1845   @type data: str
1846   @param data: the new contents of the file
1847   @type mode: int
1848   @param mode: the mode to give the file (can be None)
1849   @type uid: int
1850   @param uid: the owner of the file (can be -1 for default)
1851   @type gid: int
1852   @param gid: the group of the file (can be -1 for default)
1853   @type atime: float
1854   @param atime: the atime to set on the file (can be None)
1855   @type mtime: float
1856   @param mtime: the mtime to set on the file (can be None)
1857   @rtype: None
1858
1859   """
1860   if not os.path.isabs(file_name):
1861     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1862
1863   if file_name not in _ALLOWED_UPLOAD_FILES:
1864     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1865           file_name)
1866
1867   raw_data = _Decompress(data)
1868
1869   utils.SafeWriteFile(file_name, None,
1870                       data=raw_data, mode=mode, uid=uid, gid=gid,
1871                       atime=atime, mtime=mtime)
1872
1873
1874 def RunOob(oob_program, command, node, timeout):
1875   """Executes oob_program with given command on given node.
1876
1877   @param oob_program: The path to the executable oob_program
1878   @param command: The command to invoke on oob_program
1879   @param node: The node given as an argument to the program
1880   @param timeout: Timeout after which we kill the oob program
1881
1882   @return: stdout
1883   @raise RPCFail: If execution fails for some reason
1884
1885   """
1886   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1887
1888   if result.failed:
1889     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1890           result.fail_reason, result.output)
1891
1892   return result.stdout
1893
1894
1895 def WriteSsconfFiles(values):
1896   """Update all ssconf files.
1897
1898   Wrapper around the SimpleStore.WriteFiles.
1899
1900   """
1901   ssconf.SimpleStore().WriteFiles(values)
1902
1903
1904 def _ErrnoOrStr(err):
1905   """Format an EnvironmentError exception.
1906
1907   If the L{err} argument has an errno attribute, it will be looked up
1908   and converted into a textual C{E...} description. Otherwise the
1909   string representation of the error will be returned.
1910
1911   @type err: L{EnvironmentError}
1912   @param err: the exception to format
1913
1914   """
1915   if hasattr(err, 'errno'):
1916     detail = errno.errorcode[err.errno]
1917   else:
1918     detail = str(err)
1919   return detail
1920
1921
1922 def _OSOndiskAPIVersion(os_dir):
1923   """Compute and return the API version of a given OS.
1924
1925   This function will try to read the API version of the OS residing in
1926   the 'os_dir' directory.
1927
1928   @type os_dir: str
1929   @param os_dir: the directory in which we should look for the OS
1930   @rtype: tuple
1931   @return: tuple (status, data) with status denoting the validity and
1932       data holding either the vaid versions or an error message
1933
1934   """
1935   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1936
1937   try:
1938     st = os.stat(api_file)
1939   except EnvironmentError, err:
1940     return False, ("Required file '%s' not found under path %s: %s" %
1941                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1942
1943   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1944     return False, ("File '%s' in %s is not a regular file" %
1945                    (constants.OS_API_FILE, os_dir))
1946
1947   try:
1948     api_versions = utils.ReadFile(api_file).splitlines()
1949   except EnvironmentError, err:
1950     return False, ("Error while reading the API version file at %s: %s" %
1951                    (api_file, _ErrnoOrStr(err)))
1952
1953   try:
1954     api_versions = [int(version.strip()) for version in api_versions]
1955   except (TypeError, ValueError), err:
1956     return False, ("API version(s) can't be converted to integer: %s" %
1957                    str(err))
1958
1959   return True, api_versions
1960
1961
1962 def DiagnoseOS(top_dirs=None):
1963   """Compute the validity for all OSes.
1964
1965   @type top_dirs: list
1966   @param top_dirs: the list of directories in which to
1967       search (if not given defaults to
1968       L{constants.OS_SEARCH_PATH})
1969   @rtype: list of L{objects.OS}
1970   @return: a list of tuples (name, path, status, diagnose, variants,
1971       parameters, api_version) for all (potential) OSes under all
1972       search paths, where:
1973           - name is the (potential) OS name
1974           - path is the full path to the OS
1975           - status True/False is the validity of the OS
1976           - diagnose is the error message for an invalid OS, otherwise empty
1977           - variants is a list of supported OS variants, if any
1978           - parameters is a list of (name, help) parameters, if any
1979           - api_version is a list of support OS API versions
1980
1981   """
1982   if top_dirs is None:
1983     top_dirs = constants.OS_SEARCH_PATH
1984
1985   result = []
1986   for dir_name in top_dirs:
1987     if os.path.isdir(dir_name):
1988       try:
1989         f_names = utils.ListVisibleFiles(dir_name)
1990       except EnvironmentError, err:
1991         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1992         break
1993       for name in f_names:
1994         os_path = utils.PathJoin(dir_name, name)
1995         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1996         if status:
1997           diagnose = ""
1998           variants = os_inst.supported_variants
1999           parameters = os_inst.supported_parameters
2000           api_versions = os_inst.api_versions
2001         else:
2002           diagnose = os_inst
2003           variants = parameters = api_versions = []
2004         result.append((name, os_path, status, diagnose, variants,
2005                        parameters, api_versions))
2006
2007   return result
2008
2009
2010 def _TryOSFromDisk(name, base_dir=None):
2011   """Create an OS instance from disk.
2012
2013   This function will return an OS instance if the given name is a
2014   valid OS name.
2015
2016   @type base_dir: string
2017   @keyword base_dir: Base directory containing OS installations.
2018                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2019   @rtype: tuple
2020   @return: success and either the OS instance if we find a valid one,
2021       or error message
2022
2023   """
2024   if base_dir is None:
2025     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
2026   else:
2027     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
2028
2029   if os_dir is None:
2030     return False, "Directory for OS %s not found in search path" % name
2031
2032   status, api_versions = _OSOndiskAPIVersion(os_dir)
2033   if not status:
2034     # push the error up
2035     return status, api_versions
2036
2037   if not constants.OS_API_VERSIONS.intersection(api_versions):
2038     return False, ("API version mismatch for path '%s': found %s, want %s." %
2039                    (os_dir, api_versions, constants.OS_API_VERSIONS))
2040
2041   # OS Files dictionary, we will populate it with the absolute path names
2042   os_files = dict.fromkeys(constants.OS_SCRIPTS)
2043
2044   if max(api_versions) >= constants.OS_API_V15:
2045     os_files[constants.OS_VARIANTS_FILE] = ''
2046
2047   if max(api_versions) >= constants.OS_API_V20:
2048     os_files[constants.OS_PARAMETERS_FILE] = ''
2049   else:
2050     del os_files[constants.OS_SCRIPT_VERIFY]
2051
2052   for filename in os_files:
2053     os_files[filename] = utils.PathJoin(os_dir, filename)
2054
2055     try:
2056       st = os.stat(os_files[filename])
2057     except EnvironmentError, err:
2058       return False, ("File '%s' under path '%s' is missing (%s)" %
2059                      (filename, os_dir, _ErrnoOrStr(err)))
2060
2061     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
2062       return False, ("File '%s' under path '%s' is not a regular file" %
2063                      (filename, os_dir))
2064
2065     if filename in constants.OS_SCRIPTS:
2066       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
2067         return False, ("File '%s' under path '%s' is not executable" %
2068                        (filename, os_dir))
2069
2070   variants = []
2071   if constants.OS_VARIANTS_FILE in os_files:
2072     variants_file = os_files[constants.OS_VARIANTS_FILE]
2073     try:
2074       variants = utils.ReadFile(variants_file).splitlines()
2075     except EnvironmentError, err:
2076       return False, ("Error while reading the OS variants file at %s: %s" %
2077                      (variants_file, _ErrnoOrStr(err)))
2078     if not variants:
2079       return False, ("No supported os variant found")
2080
2081   parameters = []
2082   if constants.OS_PARAMETERS_FILE in os_files:
2083     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
2084     try:
2085       parameters = utils.ReadFile(parameters_file).splitlines()
2086     except EnvironmentError, err:
2087       return False, ("Error while reading the OS parameters file at %s: %s" %
2088                      (parameters_file, _ErrnoOrStr(err)))
2089     parameters = [v.split(None, 1) for v in parameters]
2090
2091   os_obj = objects.OS(name=name, path=os_dir,
2092                       create_script=os_files[constants.OS_SCRIPT_CREATE],
2093                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
2094                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
2095                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
2096                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
2097                                                  None),
2098                       supported_variants=variants,
2099                       supported_parameters=parameters,
2100                       api_versions=api_versions)
2101   return True, os_obj
2102
2103
2104 def OSFromDisk(name, base_dir=None):
2105   """Create an OS instance from disk.
2106
2107   This function will return an OS instance if the given name is a
2108   valid OS name. Otherwise, it will raise an appropriate
2109   L{RPCFail} exception, detailing why this is not a valid OS.
2110
2111   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2112   an exception but returns true/false status data.
2113
2114   @type base_dir: string
2115   @keyword base_dir: Base directory containing OS installations.
2116                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2117   @rtype: L{objects.OS}
2118   @return: the OS instance if we find a valid one
2119   @raise RPCFail: if we don't find a valid OS
2120
2121   """
2122   name_only = objects.OS.GetName(name)
2123   status, payload = _TryOSFromDisk(name_only, base_dir)
2124
2125   if not status:
2126     _Fail(payload)
2127
2128   return payload
2129
2130
2131 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2132   """Calculate the basic environment for an os script.
2133
2134   @type os_name: str
2135   @param os_name: full operating system name (including variant)
2136   @type inst_os: L{objects.OS}
2137   @param inst_os: operating system for which the environment is being built
2138   @type os_params: dict
2139   @param os_params: the OS parameters
2140   @type debug: integer
2141   @param debug: debug level (0 or 1, for OS Api 10)
2142   @rtype: dict
2143   @return: dict of environment variables
2144   @raise errors.BlockDeviceError: if the block device
2145       cannot be found
2146
2147   """
2148   result = {}
2149   api_version = \
2150     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2151   result['OS_API_VERSION'] = '%d' % api_version
2152   result['OS_NAME'] = inst_os.name
2153   result['DEBUG_LEVEL'] = '%d' % debug
2154
2155   # OS variants
2156   if api_version >= constants.OS_API_V15:
2157     variant = objects.OS.GetVariant(os_name)
2158     if not variant:
2159       variant = inst_os.supported_variants[0]
2160     result['OS_VARIANT'] = variant
2161
2162   # OS params
2163   for pname, pvalue in os_params.items():
2164     result['OSP_%s' % pname.upper()] = pvalue
2165
2166   return result
2167
2168
2169 def OSEnvironment(instance, inst_os, debug=0):
2170   """Calculate the environment for an os script.
2171
2172   @type instance: L{objects.Instance}
2173   @param instance: target instance for the os script run
2174   @type inst_os: L{objects.OS}
2175   @param inst_os: operating system for which the environment is being built
2176   @type debug: integer
2177   @param debug: debug level (0 or 1, for OS Api 10)
2178   @rtype: dict
2179   @return: dict of environment variables
2180   @raise errors.BlockDeviceError: if the block device
2181       cannot be found
2182
2183   """
2184   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2185
2186   for attr in ["name", "os", "uuid", "ctime", "mtime"]:
2187     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2188
2189   result['HYPERVISOR'] = instance.hypervisor
2190   result['DISK_COUNT'] = '%d' % len(instance.disks)
2191   result['NIC_COUNT'] = '%d' % len(instance.nics)
2192
2193   # Disks
2194   for idx, disk in enumerate(instance.disks):
2195     real_disk = _OpenRealBD(disk)
2196     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2197     result['DISK_%d_ACCESS' % idx] = disk.mode
2198     if constants.HV_DISK_TYPE in instance.hvparams:
2199       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2200         instance.hvparams[constants.HV_DISK_TYPE]
2201     if disk.dev_type in constants.LDS_BLOCK:
2202       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2203     elif disk.dev_type == constants.LD_FILE:
2204       result['DISK_%d_BACKEND_TYPE' % idx] = \
2205         'file:%s' % disk.physical_id[0]
2206
2207   # NICs
2208   for idx, nic in enumerate(instance.nics):
2209     result['NIC_%d_MAC' % idx] = nic.mac
2210     if nic.ip:
2211       result['NIC_%d_IP' % idx] = nic.ip
2212     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2213     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2214       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2215     if nic.nicparams[constants.NIC_LINK]:
2216       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2217     if constants.HV_NIC_TYPE in instance.hvparams:
2218       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2219         instance.hvparams[constants.HV_NIC_TYPE]
2220
2221   # HV/BE params
2222   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2223     for key, value in source.items():
2224       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2225
2226   return result
2227
2228
2229 def BlockdevGrow(disk, amount):
2230   """Grow a stack of block devices.
2231
2232   This function is called recursively, with the childrens being the
2233   first ones to resize.
2234
2235   @type disk: L{objects.Disk}
2236   @param disk: the disk to be grown
2237   @rtype: (status, result)
2238   @return: a tuple with the status of the operation
2239       (True/False), and the errors message if status
2240       is False
2241
2242   """
2243   r_dev = _RecursiveFindBD(disk)
2244   if r_dev is None:
2245     _Fail("Cannot find block device %s", disk)
2246
2247   try:
2248     r_dev.Grow(amount)
2249   except errors.BlockDeviceError, err:
2250     _Fail("Failed to grow block device: %s", err, exc=True)
2251
2252
2253 def BlockdevSnapshot(disk):
2254   """Create a snapshot copy of a block device.
2255
2256   This function is called recursively, and the snapshot is actually created
2257   just for the leaf lvm backend device.
2258
2259   @type disk: L{objects.Disk}
2260   @param disk: the disk to be snapshotted
2261   @rtype: string
2262   @return: snapshot disk ID as (vg, lv)
2263
2264   """
2265   if disk.dev_type == constants.LD_DRBD8:
2266     if not disk.children:
2267       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2268             disk.unique_id)
2269     return BlockdevSnapshot(disk.children[0])
2270   elif disk.dev_type == constants.LD_LV:
2271     r_dev = _RecursiveFindBD(disk)
2272     if r_dev is not None:
2273       # FIXME: choose a saner value for the snapshot size
2274       # let's stay on the safe side and ask for the full size, for now
2275       return r_dev.Snapshot(disk.size)
2276     else:
2277       _Fail("Cannot find block device %s", disk)
2278   else:
2279     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2280           disk.unique_id, disk.dev_type)
2281
2282
2283 def FinalizeExport(instance, snap_disks):
2284   """Write out the export configuration information.
2285
2286   @type instance: L{objects.Instance}
2287   @param instance: the instance which we export, used for
2288       saving configuration
2289   @type snap_disks: list of L{objects.Disk}
2290   @param snap_disks: list of snapshot block devices, which
2291       will be used to get the actual name of the dump file
2292
2293   @rtype: None
2294
2295   """
2296   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2297   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2298
2299   config = objects.SerializableConfigParser()
2300
2301   config.add_section(constants.INISECT_EXP)
2302   config.set(constants.INISECT_EXP, 'version', '0')
2303   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2304   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2305   config.set(constants.INISECT_EXP, 'os', instance.os)
2306   config.set(constants.INISECT_EXP, "compression", "none")
2307
2308   config.add_section(constants.INISECT_INS)
2309   config.set(constants.INISECT_INS, 'name', instance.name)
2310   config.set(constants.INISECT_INS, 'memory', '%d' %
2311              instance.beparams[constants.BE_MEMORY])
2312   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2313              instance.beparams[constants.BE_VCPUS])
2314   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2315   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2316
2317   nic_total = 0
2318   for nic_count, nic in enumerate(instance.nics):
2319     nic_total += 1
2320     config.set(constants.INISECT_INS, 'nic%d_mac' %
2321                nic_count, '%s' % nic.mac)
2322     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2323     for param in constants.NICS_PARAMETER_TYPES:
2324       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2325                  '%s' % nic.nicparams.get(param, None))
2326   # TODO: redundant: on load can read nics until it doesn't exist
2327   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2328
2329   disk_total = 0
2330   for disk_count, disk in enumerate(snap_disks):
2331     if disk:
2332       disk_total += 1
2333       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2334                  ('%s' % disk.iv_name))
2335       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2336                  ('%s' % disk.physical_id[1]))
2337       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2338                  ('%d' % disk.size))
2339
2340   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2341
2342   # New-style hypervisor/backend parameters
2343
2344   config.add_section(constants.INISECT_HYP)
2345   for name, value in instance.hvparams.items():
2346     if name not in constants.HVC_GLOBALS:
2347       config.set(constants.INISECT_HYP, name, str(value))
2348
2349   config.add_section(constants.INISECT_BEP)
2350   for name, value in instance.beparams.items():
2351     config.set(constants.INISECT_BEP, name, str(value))
2352
2353   config.add_section(constants.INISECT_OSP)
2354   for name, value in instance.osparams.items():
2355     config.set(constants.INISECT_OSP, name, str(value))
2356
2357   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2358                   data=config.Dumps())
2359   shutil.rmtree(finaldestdir, ignore_errors=True)
2360   shutil.move(destdir, finaldestdir)
2361
2362
2363 def ExportInfo(dest):
2364   """Get export configuration information.
2365
2366   @type dest: str
2367   @param dest: directory containing the export
2368
2369   @rtype: L{objects.SerializableConfigParser}
2370   @return: a serializable config file containing the
2371       export info
2372
2373   """
2374   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2375
2376   config = objects.SerializableConfigParser()
2377   config.read(cff)
2378
2379   if (not config.has_section(constants.INISECT_EXP) or
2380       not config.has_section(constants.INISECT_INS)):
2381     _Fail("Export info file doesn't have the required fields")
2382
2383   return config.Dumps()
2384
2385
2386 def ListExports():
2387   """Return a list of exports currently available on this machine.
2388
2389   @rtype: list
2390   @return: list of the exports
2391
2392   """
2393   if os.path.isdir(constants.EXPORT_DIR):
2394     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2395   else:
2396     _Fail("No exports directory")
2397
2398
2399 def RemoveExport(export):
2400   """Remove an existing export from the node.
2401
2402   @type export: str
2403   @param export: the name of the export to remove
2404   @rtype: None
2405
2406   """
2407   target = utils.PathJoin(constants.EXPORT_DIR, export)
2408
2409   try:
2410     shutil.rmtree(target)
2411   except EnvironmentError, err:
2412     _Fail("Error while removing the export: %s", err, exc=True)
2413
2414
2415 def BlockdevRename(devlist):
2416   """Rename a list of block devices.
2417
2418   @type devlist: list of tuples
2419   @param devlist: list of tuples of the form  (disk,
2420       new_logical_id, new_physical_id); disk is an
2421       L{objects.Disk} object describing the current disk,
2422       and new logical_id/physical_id is the name we
2423       rename it to
2424   @rtype: boolean
2425   @return: True if all renames succeeded, False otherwise
2426
2427   """
2428   msgs = []
2429   result = True
2430   for disk, unique_id in devlist:
2431     dev = _RecursiveFindBD(disk)
2432     if dev is None:
2433       msgs.append("Can't find device %s in rename" % str(disk))
2434       result = False
2435       continue
2436     try:
2437       old_rpath = dev.dev_path
2438       dev.Rename(unique_id)
2439       new_rpath = dev.dev_path
2440       if old_rpath != new_rpath:
2441         DevCacheManager.RemoveCache(old_rpath)
2442         # FIXME: we should add the new cache information here, like:
2443         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2444         # but we don't have the owner here - maybe parse from existing
2445         # cache? for now, we only lose lvm data when we rename, which
2446         # is less critical than DRBD or MD
2447     except errors.BlockDeviceError, err:
2448       msgs.append("Can't rename device '%s' to '%s': %s" %
2449                   (dev, unique_id, err))
2450       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2451       result = False
2452   if not result:
2453     _Fail("; ".join(msgs))
2454
2455
2456 def _TransformFileStorageDir(fs_dir):
2457   """Checks whether given file_storage_dir is valid.
2458
2459   Checks wheter the given fs_dir is within the cluster-wide default
2460   file_storage_dir or the shared_file_storage_dir, which are stored in
2461   SimpleStore. Only paths under those directories are allowed.
2462
2463   @type fs_dir: str
2464   @param fs_dir: the path to check
2465
2466   @return: the normalized path if valid, None otherwise
2467
2468   """
2469   if not constants.ENABLE_FILE_STORAGE:
2470     _Fail("File storage disabled at configure time")
2471   cfg = _GetConfig()
2472   fs_dir = os.path.normpath(fs_dir)
2473   base_fstore = cfg.GetFileStorageDir()
2474   base_shared = cfg.GetSharedFileStorageDir()
2475   if ((os.path.commonprefix([fs_dir, base_fstore]) != base_fstore) and
2476       (os.path.commonprefix([fs_dir, base_shared]) != base_shared)):
2477     _Fail("File storage directory '%s' is not under base file"
2478           " storage directory '%s' or shared storage directory '%s'",
2479           fs_dir, base_fstore, base_shared)
2480   return fs_dir
2481
2482
2483 def CreateFileStorageDir(file_storage_dir):
2484   """Create file storage directory.
2485
2486   @type file_storage_dir: str
2487   @param file_storage_dir: directory to create
2488
2489   @rtype: tuple
2490   @return: tuple with first element a boolean indicating wheter dir
2491       creation was successful or not
2492
2493   """
2494   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2495   if os.path.exists(file_storage_dir):
2496     if not os.path.isdir(file_storage_dir):
2497       _Fail("Specified storage dir '%s' is not a directory",
2498             file_storage_dir)
2499   else:
2500     try:
2501       os.makedirs(file_storage_dir, 0750)
2502     except OSError, err:
2503       _Fail("Cannot create file storage directory '%s': %s",
2504             file_storage_dir, err, exc=True)
2505
2506
2507 def RemoveFileStorageDir(file_storage_dir):
2508   """Remove file storage directory.
2509
2510   Remove it only if it's empty. If not log an error and return.
2511
2512   @type file_storage_dir: str
2513   @param file_storage_dir: the directory we should cleanup
2514   @rtype: tuple (success,)
2515   @return: tuple of one element, C{success}, denoting
2516       whether the operation was successful
2517
2518   """
2519   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2520   if os.path.exists(file_storage_dir):
2521     if not os.path.isdir(file_storage_dir):
2522       _Fail("Specified Storage directory '%s' is not a directory",
2523             file_storage_dir)
2524     # deletes dir only if empty, otherwise we want to fail the rpc call
2525     try:
2526       os.rmdir(file_storage_dir)
2527     except OSError, err:
2528       _Fail("Cannot remove file storage directory '%s': %s",
2529             file_storage_dir, err)
2530
2531
2532 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2533   """Rename the file storage directory.
2534
2535   @type old_file_storage_dir: str
2536   @param old_file_storage_dir: the current path
2537   @type new_file_storage_dir: str
2538   @param new_file_storage_dir: the name we should rename to
2539   @rtype: tuple (success,)
2540   @return: tuple of one element, C{success}, denoting
2541       whether the operation was successful
2542
2543   """
2544   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2545   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2546   if not os.path.exists(new_file_storage_dir):
2547     if os.path.isdir(old_file_storage_dir):
2548       try:
2549         os.rename(old_file_storage_dir, new_file_storage_dir)
2550       except OSError, err:
2551         _Fail("Cannot rename '%s' to '%s': %s",
2552               old_file_storage_dir, new_file_storage_dir, err)
2553     else:
2554       _Fail("Specified storage dir '%s' is not a directory",
2555             old_file_storage_dir)
2556   else:
2557     if os.path.exists(old_file_storage_dir):
2558       _Fail("Cannot rename '%s' to '%s': both locations exist",
2559             old_file_storage_dir, new_file_storage_dir)
2560
2561
2562 def _EnsureJobQueueFile(file_name):
2563   """Checks whether the given filename is in the queue directory.
2564
2565   @type file_name: str
2566   @param file_name: the file name we should check
2567   @rtype: None
2568   @raises RPCFail: if the file is not valid
2569
2570   """
2571   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2572   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2573
2574   if not result:
2575     _Fail("Passed job queue file '%s' does not belong to"
2576           " the queue directory '%s'", file_name, queue_dir)
2577
2578
2579 def JobQueueUpdate(file_name, content):
2580   """Updates a file in the queue directory.
2581
2582   This is just a wrapper over L{utils.io.WriteFile}, with proper
2583   checking.
2584
2585   @type file_name: str
2586   @param file_name: the job file name
2587   @type content: str
2588   @param content: the new job contents
2589   @rtype: boolean
2590   @return: the success of the operation
2591
2592   """
2593   _EnsureJobQueueFile(file_name)
2594   getents = runtime.GetEnts()
2595
2596   # Write and replace the file atomically
2597   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2598                   gid=getents.masterd_gid)
2599
2600
2601 def JobQueueRename(old, new):
2602   """Renames a job queue file.
2603
2604   This is just a wrapper over os.rename with proper checking.
2605
2606   @type old: str
2607   @param old: the old (actual) file name
2608   @type new: str
2609   @param new: the desired file name
2610   @rtype: tuple
2611   @return: the success of the operation and payload
2612
2613   """
2614   _EnsureJobQueueFile(old)
2615   _EnsureJobQueueFile(new)
2616
2617   utils.RenameFile(old, new, mkdir=True)
2618
2619
2620 def BlockdevClose(instance_name, disks):
2621   """Closes the given block devices.
2622
2623   This means they will be switched to secondary mode (in case of
2624   DRBD).
2625
2626   @param instance_name: if the argument is not empty, the symlinks
2627       of this instance will be removed
2628   @type disks: list of L{objects.Disk}
2629   @param disks: the list of disks to be closed
2630   @rtype: tuple (success, message)
2631   @return: a tuple of success and message, where success
2632       indicates the succes of the operation, and message
2633       which will contain the error details in case we
2634       failed
2635
2636   """
2637   bdevs = []
2638   for cf in disks:
2639     rd = _RecursiveFindBD(cf)
2640     if rd is None:
2641       _Fail("Can't find device %s", cf)
2642     bdevs.append(rd)
2643
2644   msg = []
2645   for rd in bdevs:
2646     try:
2647       rd.Close()
2648     except errors.BlockDeviceError, err:
2649       msg.append(str(err))
2650   if msg:
2651     _Fail("Can't make devices secondary: %s", ",".join(msg))
2652   else:
2653     if instance_name:
2654       _RemoveBlockDevLinks(instance_name, disks)
2655
2656
2657 def ValidateHVParams(hvname, hvparams):
2658   """Validates the given hypervisor parameters.
2659
2660   @type hvname: string
2661   @param hvname: the hypervisor name
2662   @type hvparams: dict
2663   @param hvparams: the hypervisor parameters to be validated
2664   @rtype: None
2665
2666   """
2667   try:
2668     hv_type = hypervisor.GetHypervisor(hvname)
2669     hv_type.ValidateParameters(hvparams)
2670   except errors.HypervisorError, err:
2671     _Fail(str(err), log=False)
2672
2673
2674 def _CheckOSPList(os_obj, parameters):
2675   """Check whether a list of parameters is supported by the OS.
2676
2677   @type os_obj: L{objects.OS}
2678   @param os_obj: OS object to check
2679   @type parameters: list
2680   @param parameters: the list of parameters to check
2681
2682   """
2683   supported = [v[0] for v in os_obj.supported_parameters]
2684   delta = frozenset(parameters).difference(supported)
2685   if delta:
2686     _Fail("The following parameters are not supported"
2687           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2688
2689
2690 def ValidateOS(required, osname, checks, osparams):
2691   """Validate the given OS' parameters.
2692
2693   @type required: boolean
2694   @param required: whether absence of the OS should translate into
2695       failure or not
2696   @type osname: string
2697   @param osname: the OS to be validated
2698   @type checks: list
2699   @param checks: list of the checks to run (currently only 'parameters')
2700   @type osparams: dict
2701   @param osparams: dictionary with OS parameters
2702   @rtype: boolean
2703   @return: True if the validation passed, or False if the OS was not
2704       found and L{required} was false
2705
2706   """
2707   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2708     _Fail("Unknown checks required for OS %s: %s", osname,
2709           set(checks).difference(constants.OS_VALIDATE_CALLS))
2710
2711   name_only = objects.OS.GetName(osname)
2712   status, tbv = _TryOSFromDisk(name_only, None)
2713
2714   if not status:
2715     if required:
2716       _Fail(tbv)
2717     else:
2718       return False
2719
2720   if max(tbv.api_versions) < constants.OS_API_V20:
2721     return True
2722
2723   if constants.OS_VALIDATE_PARAMETERS in checks:
2724     _CheckOSPList(tbv, osparams.keys())
2725
2726   validate_env = OSCoreEnv(osname, tbv, osparams)
2727   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2728                         cwd=tbv.path)
2729   if result.failed:
2730     logging.error("os validate command '%s' returned error: %s output: %s",
2731                   result.cmd, result.fail_reason, result.output)
2732     _Fail("OS validation script failed (%s), output: %s",
2733           result.fail_reason, result.output, log=False)
2734
2735   return True
2736
2737
2738 def DemoteFromMC():
2739   """Demotes the current node from master candidate role.
2740
2741   """
2742   # try to ensure we're not the master by mistake
2743   master, myself = ssconf.GetMasterAndMyself()
2744   if master == myself:
2745     _Fail("ssconf status shows I'm the master node, will not demote")
2746
2747   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2748   if not result.failed:
2749     _Fail("The master daemon is running, will not demote")
2750
2751   try:
2752     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2753       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2754   except EnvironmentError, err:
2755     if err.errno != errno.ENOENT:
2756       _Fail("Error while backing up cluster file: %s", err, exc=True)
2757
2758   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2759
2760
2761 def _GetX509Filenames(cryptodir, name):
2762   """Returns the full paths for the private key and certificate.
2763
2764   """
2765   return (utils.PathJoin(cryptodir, name),
2766           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2767           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2768
2769
2770 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2771   """Creates a new X509 certificate for SSL/TLS.
2772
2773   @type validity: int
2774   @param validity: Validity in seconds
2775   @rtype: tuple; (string, string)
2776   @return: Certificate name and public part
2777
2778   """
2779   (key_pem, cert_pem) = \
2780     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2781                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2782
2783   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2784                               prefix="x509-%s-" % utils.TimestampForFilename())
2785   try:
2786     name = os.path.basename(cert_dir)
2787     assert len(name) > 5
2788
2789     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2790
2791     utils.WriteFile(key_file, mode=0400, data=key_pem)
2792     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2793
2794     # Never return private key as it shouldn't leave the node
2795     return (name, cert_pem)
2796   except Exception:
2797     shutil.rmtree(cert_dir, ignore_errors=True)
2798     raise
2799
2800
2801 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2802   """Removes a X509 certificate.
2803
2804   @type name: string
2805   @param name: Certificate name
2806
2807   """
2808   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2809
2810   utils.RemoveFile(key_file)
2811   utils.RemoveFile(cert_file)
2812
2813   try:
2814     os.rmdir(cert_dir)
2815   except EnvironmentError, err:
2816     _Fail("Cannot remove certificate directory '%s': %s",
2817           cert_dir, err)
2818
2819
2820 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2821   """Returns the command for the requested input/output.
2822
2823   @type instance: L{objects.Instance}
2824   @param instance: The instance object
2825   @param mode: Import/export mode
2826   @param ieio: Input/output type
2827   @param ieargs: Input/output arguments
2828
2829   """
2830   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2831
2832   env = None
2833   prefix = None
2834   suffix = None
2835   exp_size = None
2836
2837   if ieio == constants.IEIO_FILE:
2838     (filename, ) = ieargs
2839
2840     if not utils.IsNormAbsPath(filename):
2841       _Fail("Path '%s' is not normalized or absolute", filename)
2842
2843     directory = os.path.normpath(os.path.dirname(filename))
2844
2845     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2846         constants.EXPORT_DIR):
2847       _Fail("File '%s' is not under exports directory '%s'",
2848             filename, constants.EXPORT_DIR)
2849
2850     # Create directory
2851     utils.Makedirs(directory, mode=0750)
2852
2853     quoted_filename = utils.ShellQuote(filename)
2854
2855     if mode == constants.IEM_IMPORT:
2856       suffix = "> %s" % quoted_filename
2857     elif mode == constants.IEM_EXPORT:
2858       suffix = "< %s" % quoted_filename
2859
2860       # Retrieve file size
2861       try:
2862         st = os.stat(filename)
2863       except EnvironmentError, err:
2864         logging.error("Can't stat(2) %s: %s", filename, err)
2865       else:
2866         exp_size = utils.BytesToMebibyte(st.st_size)
2867
2868   elif ieio == constants.IEIO_RAW_DISK:
2869     (disk, ) = ieargs
2870
2871     real_disk = _OpenRealBD(disk)
2872
2873     if mode == constants.IEM_IMPORT:
2874       # we set here a smaller block size as, due to transport buffering, more
2875       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2876       # is not already there or we pass a wrong path; we use notrunc to no
2877       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2878       # much memory; this means that at best, we flush every 64k, which will
2879       # not be very fast
2880       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2881                                     " bs=%s oflag=dsync"),
2882                                     real_disk.dev_path,
2883                                     str(64 * 1024))
2884
2885     elif mode == constants.IEM_EXPORT:
2886       # the block size on the read dd is 1MiB to match our units
2887       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2888                                    real_disk.dev_path,
2889                                    str(1024 * 1024), # 1 MB
2890                                    str(disk.size))
2891       exp_size = disk.size
2892
2893   elif ieio == constants.IEIO_SCRIPT:
2894     (disk, disk_index, ) = ieargs
2895
2896     assert isinstance(disk_index, (int, long))
2897
2898     real_disk = _OpenRealBD(disk)
2899
2900     inst_os = OSFromDisk(instance.os)
2901     env = OSEnvironment(instance, inst_os)
2902
2903     if mode == constants.IEM_IMPORT:
2904       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2905       env["IMPORT_INDEX"] = str(disk_index)
2906       script = inst_os.import_script
2907
2908     elif mode == constants.IEM_EXPORT:
2909       env["EXPORT_DEVICE"] = real_disk.dev_path
2910       env["EXPORT_INDEX"] = str(disk_index)
2911       script = inst_os.export_script
2912
2913     # TODO: Pass special environment only to script
2914     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2915
2916     if mode == constants.IEM_IMPORT:
2917       suffix = "| %s" % script_cmd
2918
2919     elif mode == constants.IEM_EXPORT:
2920       prefix = "%s |" % script_cmd
2921
2922     # Let script predict size
2923     exp_size = constants.IE_CUSTOM_SIZE
2924
2925   else:
2926     _Fail("Invalid %s I/O mode %r", mode, ieio)
2927
2928   return (env, prefix, suffix, exp_size)
2929
2930
2931 def _CreateImportExportStatusDir(prefix):
2932   """Creates status directory for import/export.
2933
2934   """
2935   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2936                           prefix=("%s-%s-" %
2937                                   (prefix, utils.TimestampForFilename())))
2938
2939
2940 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2941   """Starts an import or export daemon.
2942
2943   @param mode: Import/output mode
2944   @type opts: L{objects.ImportExportOptions}
2945   @param opts: Daemon options
2946   @type host: string
2947   @param host: Remote host for export (None for import)
2948   @type port: int
2949   @param port: Remote port for export (None for import)
2950   @type instance: L{objects.Instance}
2951   @param instance: Instance object
2952   @param ieio: Input/output type
2953   @param ieioargs: Input/output arguments
2954
2955   """
2956   if mode == constants.IEM_IMPORT:
2957     prefix = "import"
2958
2959     if not (host is None and port is None):
2960       _Fail("Can not specify host or port on import")
2961
2962   elif mode == constants.IEM_EXPORT:
2963     prefix = "export"
2964
2965     if host is None or port is None:
2966       _Fail("Host and port must be specified for an export")
2967
2968   else:
2969     _Fail("Invalid mode %r", mode)
2970
2971   if (opts.key_name is None) ^ (opts.ca_pem is None):
2972     _Fail("Cluster certificate can only be used for both key and CA")
2973
2974   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2975     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2976
2977   if opts.key_name is None:
2978     # Use server.pem
2979     key_path = constants.NODED_CERT_FILE
2980     cert_path = constants.NODED_CERT_FILE
2981     assert opts.ca_pem is None
2982   else:
2983     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2984                                                  opts.key_name)
2985     assert opts.ca_pem is not None
2986
2987   for i in [key_path, cert_path]:
2988     if not os.path.exists(i):
2989       _Fail("File '%s' does not exist" % i)
2990
2991   status_dir = _CreateImportExportStatusDir(prefix)
2992   try:
2993     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2994     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2995     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2996
2997     if opts.ca_pem is None:
2998       # Use server.pem
2999       ca = utils.ReadFile(constants.NODED_CERT_FILE)
3000     else:
3001       ca = opts.ca_pem
3002
3003     # Write CA file
3004     utils.WriteFile(ca_file, data=ca, mode=0400)
3005
3006     cmd = [
3007       constants.IMPORT_EXPORT_DAEMON,
3008       status_file, mode,
3009       "--key=%s" % key_path,
3010       "--cert=%s" % cert_path,
3011       "--ca=%s" % ca_file,
3012       ]
3013
3014     if host:
3015       cmd.append("--host=%s" % host)
3016
3017     if port:
3018       cmd.append("--port=%s" % port)
3019
3020     if opts.ipv6:
3021       cmd.append("--ipv6")
3022     else:
3023       cmd.append("--ipv4")
3024
3025     if opts.compress:
3026       cmd.append("--compress=%s" % opts.compress)
3027
3028     if opts.magic:
3029       cmd.append("--magic=%s" % opts.magic)
3030
3031     if exp_size is not None:
3032       cmd.append("--expected-size=%s" % exp_size)
3033
3034     if cmd_prefix:
3035       cmd.append("--cmd-prefix=%s" % cmd_prefix)
3036
3037     if cmd_suffix:
3038       cmd.append("--cmd-suffix=%s" % cmd_suffix)
3039
3040     if mode == constants.IEM_EXPORT:
3041       # Retry connection a few times when connecting to remote peer
3042       cmd.append("--connect-retries=%s" % constants.RIE_CONNECT_RETRIES)
3043       cmd.append("--connect-timeout=%s" % constants.RIE_CONNECT_ATTEMPT_TIMEOUT)
3044     elif opts.connect_timeout is not None:
3045       assert mode == constants.IEM_IMPORT
3046       # Overall timeout for establishing connection while listening
3047       cmd.append("--connect-timeout=%s" % opts.connect_timeout)
3048
3049     logfile = _InstanceLogName(prefix, instance.os, instance.name)
3050
3051     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
3052     # support for receiving a file descriptor for output
3053     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
3054                       output=logfile)
3055
3056     # The import/export name is simply the status directory name
3057     return os.path.basename(status_dir)
3058
3059   except Exception:
3060     shutil.rmtree(status_dir, ignore_errors=True)
3061     raise
3062
3063
3064 def GetImportExportStatus(names):
3065   """Returns import/export daemon status.
3066
3067   @type names: sequence
3068   @param names: List of names
3069   @rtype: List of dicts
3070   @return: Returns a list of the state of each named import/export or None if a
3071            status couldn't be read
3072
3073   """
3074   result = []
3075
3076   for name in names:
3077     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
3078                                  _IES_STATUS_FILE)
3079
3080     try:
3081       data = utils.ReadFile(status_file)
3082     except EnvironmentError, err:
3083       if err.errno != errno.ENOENT:
3084         raise
3085       data = None
3086
3087     if not data:
3088       result.append(None)
3089       continue
3090
3091     result.append(serializer.LoadJson(data))
3092
3093   return result
3094
3095
3096 def AbortImportExport(name):
3097   """Sends SIGTERM to a running import/export daemon.
3098
3099   """
3100   logging.info("Abort import/export %s", name)
3101
3102   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3103   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3104
3105   if pid:
3106     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
3107                  name, pid)
3108     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
3109
3110
3111 def CleanupImportExport(name):
3112   """Cleanup after an import or export.
3113
3114   If the import/export daemon is still running it's killed. Afterwards the
3115   whole status directory is removed.
3116
3117   """
3118   logging.info("Finalizing import/export %s", name)
3119
3120   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3121
3122   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3123
3124   if pid:
3125     logging.info("Import/export %s is still running with PID %s",
3126                  name, pid)
3127     utils.KillProcess(pid, waitpid=False)
3128
3129   shutil.rmtree(status_dir, ignore_errors=True)
3130
3131
3132 def _FindDisks(nodes_ip, disks):
3133   """Sets the physical ID on disks and returns the block devices.
3134
3135   """
3136   # set the correct physical ID
3137   my_name = netutils.Hostname.GetSysName()
3138   for cf in disks:
3139     cf.SetPhysicalID(my_name, nodes_ip)
3140
3141   bdevs = []
3142
3143   for cf in disks:
3144     rd = _RecursiveFindBD(cf)
3145     if rd is None:
3146       _Fail("Can't find device %s", cf)
3147     bdevs.append(rd)
3148   return bdevs
3149
3150
3151 def DrbdDisconnectNet(nodes_ip, disks):
3152   """Disconnects the network on a list of drbd devices.
3153
3154   """
3155   bdevs = _FindDisks(nodes_ip, disks)
3156
3157   # disconnect disks
3158   for rd in bdevs:
3159     try:
3160       rd.DisconnectNet()
3161     except errors.BlockDeviceError, err:
3162       _Fail("Can't change network configuration to standalone mode: %s",
3163             err, exc=True)
3164
3165
3166 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3167   """Attaches the network on a list of drbd devices.
3168
3169   """
3170   bdevs = _FindDisks(nodes_ip, disks)
3171
3172   if multimaster:
3173     for idx, rd in enumerate(bdevs):
3174       try:
3175         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3176       except EnvironmentError, err:
3177         _Fail("Can't create symlink: %s", err)
3178   # reconnect disks, switch to new master configuration and if
3179   # needed primary mode
3180   for rd in bdevs:
3181     try:
3182       rd.AttachNet(multimaster)
3183     except errors.BlockDeviceError, err:
3184       _Fail("Can't change network configuration: %s", err)
3185
3186   # wait until the disks are connected; we need to retry the re-attach
3187   # if the device becomes standalone, as this might happen if the one
3188   # node disconnects and reconnects in a different mode before the
3189   # other node reconnects; in this case, one or both of the nodes will
3190   # decide it has wrong configuration and switch to standalone
3191
3192   def _Attach():
3193     all_connected = True
3194
3195     for rd in bdevs:
3196       stats = rd.GetProcStatus()
3197
3198       all_connected = (all_connected and
3199                        (stats.is_connected or stats.is_in_resync))
3200
3201       if stats.is_standalone:
3202         # peer had different config info and this node became
3203         # standalone, even though this should not happen with the
3204         # new staged way of changing disk configs
3205         try:
3206           rd.AttachNet(multimaster)
3207         except errors.BlockDeviceError, err:
3208           _Fail("Can't change network configuration: %s", err)
3209
3210     if not all_connected:
3211       raise utils.RetryAgain()
3212
3213   try:
3214     # Start with a delay of 100 miliseconds and go up to 5 seconds
3215     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3216   except utils.RetryTimeout:
3217     _Fail("Timeout in disk reconnecting")
3218
3219   if multimaster:
3220     # change to primary mode
3221     for rd in bdevs:
3222       try:
3223         rd.Open()
3224       except errors.BlockDeviceError, err:
3225         _Fail("Can't change to primary mode: %s", err)
3226
3227
3228 def DrbdWaitSync(nodes_ip, disks):
3229   """Wait until DRBDs have synchronized.
3230
3231   """
3232   def _helper(rd):
3233     stats = rd.GetProcStatus()
3234     if not (stats.is_connected or stats.is_in_resync):
3235       raise utils.RetryAgain()
3236     return stats
3237
3238   bdevs = _FindDisks(nodes_ip, disks)
3239
3240   min_resync = 100
3241   alldone = True
3242   for rd in bdevs:
3243     try:
3244       # poll each second for 15 seconds
3245       stats = utils.Retry(_helper, 1, 15, args=[rd])
3246     except utils.RetryTimeout:
3247       stats = rd.GetProcStatus()
3248       # last check
3249       if not (stats.is_connected or stats.is_in_resync):
3250         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3251     alldone = alldone and (not stats.is_in_resync)
3252     if stats.sync_percent is not None:
3253       min_resync = min(min_resync, stats.sync_percent)
3254
3255   return (alldone, min_resync)
3256
3257
3258 def GetDrbdUsermodeHelper():
3259   """Returns DRBD usermode helper currently configured.
3260
3261   """
3262   try:
3263     return bdev.BaseDRBD.GetUsermodeHelper()
3264   except errors.BlockDeviceError, err:
3265     _Fail(str(err))
3266
3267
3268 def PowercycleNode(hypervisor_type):
3269   """Hard-powercycle the node.
3270
3271   Because we need to return first, and schedule the powercycle in the
3272   background, we won't be able to report failures nicely.
3273
3274   """
3275   hyper = hypervisor.GetHypervisor(hypervisor_type)
3276   try:
3277     pid = os.fork()
3278   except OSError:
3279     # if we can't fork, we'll pretend that we're in the child process
3280     pid = 0
3281   if pid > 0:
3282     return "Reboot scheduled in 5 seconds"
3283   # ensure the child is running on ram
3284   try:
3285     utils.Mlockall()
3286   except Exception: # pylint: disable-msg=W0703
3287     pass
3288   time.sleep(5)
3289   hyper.PowercycleNode()
3290
3291
3292 class HooksRunner(object):
3293   """Hook runner.
3294
3295   This class is instantiated on the node side (ganeti-noded) and not
3296   on the master side.
3297
3298   """
3299   def __init__(self, hooks_base_dir=None):
3300     """Constructor for hooks runner.
3301
3302     @type hooks_base_dir: str or None
3303     @param hooks_base_dir: if not None, this overrides the
3304         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3305
3306     """
3307     if hooks_base_dir is None:
3308       hooks_base_dir = constants.HOOKS_BASE_DIR
3309     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3310     # constant
3311     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3312
3313   def RunHooks(self, hpath, phase, env):
3314     """Run the scripts in the hooks directory.
3315
3316     @type hpath: str
3317     @param hpath: the path to the hooks directory which
3318         holds the scripts
3319     @type phase: str
3320     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3321         L{constants.HOOKS_PHASE_POST}
3322     @type env: dict
3323     @param env: dictionary with the environment for the hook
3324     @rtype: list
3325     @return: list of 3-element tuples:
3326       - script path
3327       - script result, either L{constants.HKR_SUCCESS} or
3328         L{constants.HKR_FAIL}
3329       - output of the script
3330
3331     @raise errors.ProgrammerError: for invalid input
3332         parameters
3333
3334     """
3335     if phase == constants.HOOKS_PHASE_PRE:
3336       suffix = "pre"
3337     elif phase == constants.HOOKS_PHASE_POST:
3338       suffix = "post"
3339     else:
3340       _Fail("Unknown hooks phase '%s'", phase)
3341
3342
3343     subdir = "%s-%s.d" % (hpath, suffix)
3344     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3345
3346     results = []
3347
3348     if not os.path.isdir(dir_name):
3349       # for non-existing/non-dirs, we simply exit instead of logging a
3350       # warning at every operation
3351       return results
3352
3353     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3354
3355     for (relname, relstatus, runresult)  in runparts_results:
3356       if relstatus == constants.RUNPARTS_SKIP:
3357         rrval = constants.HKR_SKIP
3358         output = ""
3359       elif relstatus == constants.RUNPARTS_ERR:
3360         rrval = constants.HKR_FAIL
3361         output = "Hook script execution error: %s" % runresult
3362       elif relstatus == constants.RUNPARTS_RUN:
3363         if runresult.failed:
3364           rrval = constants.HKR_FAIL
3365         else:
3366           rrval = constants.HKR_SUCCESS
3367         output = utils.SafeEncode(runresult.output.strip())
3368       results.append(("%s/%s" % (subdir, relname), rrval, output))
3369
3370     return results
3371
3372
3373 class IAllocatorRunner(object):
3374   """IAllocator runner.
3375
3376   This class is instantiated on the node side (ganeti-noded) and not on
3377   the master side.
3378
3379   """
3380   @staticmethod
3381   def Run(name, idata):
3382     """Run an iallocator script.
3383
3384     @type name: str
3385     @param name: the iallocator script name
3386     @type idata: str
3387     @param idata: the allocator input data
3388
3389     @rtype: tuple
3390     @return: two element tuple of:
3391        - status
3392        - either error message or stdout of allocator (for success)
3393
3394     """
3395     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3396                                   os.path.isfile)
3397     if alloc_script is None:
3398       _Fail("iallocator module '%s' not found in the search path", name)
3399
3400     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3401     try:
3402       os.write(fd, idata)
3403       os.close(fd)
3404       result = utils.RunCmd([alloc_script, fin_name])
3405       if result.failed:
3406         _Fail("iallocator module '%s' failed: %s, output '%s'",
3407               name, result.fail_reason, result.output)
3408     finally:
3409       os.unlink(fin_name)
3410
3411     return result.stdout
3412
3413
3414 class DevCacheManager(object):
3415   """Simple class for managing a cache of block device information.
3416
3417   """
3418   _DEV_PREFIX = "/dev/"
3419   _ROOT_DIR = constants.BDEV_CACHE_DIR
3420
3421   @classmethod
3422   def _ConvertPath(cls, dev_path):
3423     """Converts a /dev/name path to the cache file name.
3424
3425     This replaces slashes with underscores and strips the /dev
3426     prefix. It then returns the full path to the cache file.
3427
3428     @type dev_path: str
3429     @param dev_path: the C{/dev/} path name
3430     @rtype: str
3431     @return: the converted path name
3432
3433     """
3434     if dev_path.startswith(cls._DEV_PREFIX):
3435       dev_path = dev_path[len(cls._DEV_PREFIX):]
3436     dev_path = dev_path.replace("/", "_")
3437     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3438     return fpath
3439
3440   @classmethod
3441   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3442     """Updates the cache information for a given device.
3443
3444     @type dev_path: str
3445     @param dev_path: the pathname of the device
3446     @type owner: str
3447     @param owner: the owner (instance name) of the device
3448     @type on_primary: bool
3449     @param on_primary: whether this is the primary
3450         node nor not
3451     @type iv_name: str
3452     @param iv_name: the instance-visible name of the
3453         device, as in objects.Disk.iv_name
3454
3455     @rtype: None
3456
3457     """
3458     if dev_path is None:
3459       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3460       return
3461     fpath = cls._ConvertPath(dev_path)
3462     if on_primary:
3463       state = "primary"
3464     else:
3465       state = "secondary"
3466     if iv_name is None:
3467       iv_name = "not_visible"
3468     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3469     try:
3470       utils.WriteFile(fpath, data=fdata)
3471     except EnvironmentError, err:
3472       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3473
3474   @classmethod
3475   def RemoveCache(cls, dev_path):
3476     """Remove data for a dev_path.
3477
3478     This is just a wrapper over L{utils.io.RemoveFile} with a converted
3479     path name and logging.
3480
3481     @type dev_path: str
3482     @param dev_path: the pathname of the device
3483
3484     @rtype: None
3485
3486     """
3487     if dev_path is None:
3488       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3489       return
3490     fpath = cls._ConvertPath(dev_path)
3491     try:
3492       utils.RemoveFile(fpath)
3493     except EnvironmentError, err:
3494       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)