Adding additional VerifyNode checks to backend
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.RAPI_USERS_FILE,
200     constants.CONFD_HMAC_KEY,
201     constants.CLUSTER_DOMAIN_SECRET_FILE,
202     ])
203
204   for hv_name in constants.HYPER_TYPES:
205     hv_class = hypervisor.GetHypervisorClass(hv_name)
206     allowed_files.update(hv_class.GetAncillaryFiles())
207
208   return frozenset(allowed_files)
209
210
211 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212
213
214 def JobQueuePurge():
215   """Removes job queue files and archived jobs.
216
217   @rtype: tuple
218   @return: True, None
219
220   """
221   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
222   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223
224
225 def GetMasterInfo():
226   """Returns master information.
227
228   This is an utility function to compute master information, either
229   for consumption here or from the node daemon.
230
231   @rtype: tuple
232   @return: master_netdev, master_ip, master_name, primary_ip_family
233   @raise RPCFail: in case of errors
234
235   """
236   try:
237     cfg = _GetConfig()
238     master_netdev = cfg.GetMasterNetdev()
239     master_ip = cfg.GetMasterIP()
240     master_node = cfg.GetMasterNode()
241     primary_ip_family = cfg.GetPrimaryIPFamily()
242   except errors.ConfigurationError, err:
243     _Fail("Cluster configuration incomplete: %s", err, exc=True)
244   return (master_netdev, master_ip, master_node, primary_ip_family)
245
246
247 def StartMaster(start_daemons, no_voting):
248   """Activate local node as master node.
249
250   The function will either try activate the IP address of the master
251   (unless someone else has it) or also start the master daemons, based
252   on the start_daemons parameter.
253
254   @type start_daemons: boolean
255   @param start_daemons: whether to start the master daemons
256       (ganeti-masterd and ganeti-rapi), or (if false) activate the
257       master ip
258   @type no_voting: boolean
259   @param no_voting: whether to start ganeti-masterd without a node vote
260       (if start_daemons is True), but still non-interactively
261   @rtype: None
262
263   """
264   # GetMasterInfo will raise an exception if not able to return data
265   master_netdev, master_ip, _, family = GetMasterInfo()
266
267   err_msgs = []
268   # either start the master and rapi daemons
269   if start_daemons:
270     if no_voting:
271       masterd_args = "--no-voting --yes-do-it"
272     else:
273       masterd_args = ""
274
275     env = {
276       "EXTRA_MASTERD_ARGS": masterd_args,
277       }
278
279     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
280     if result.failed:
281       msg = "Can't start Ganeti master: %s" % result.output
282       logging.error(msg)
283       err_msgs.append(msg)
284   # or activate the IP
285   else:
286     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
287       if netutils.IPAddress.Own(master_ip):
288         # we already have the ip:
289         logging.debug("Master IP already configured, doing nothing")
290       else:
291         msg = "Someone else has the master ip, not activating"
292         logging.error(msg)
293         err_msgs.append(msg)
294     else:
295       ipcls = netutils.IP4Address
296       if family == netutils.IP6Address.family:
297         ipcls = netutils.IP6Address
298
299       result = utils.RunCmd(["ip", "address", "add",
300                              "%s/%d" % (master_ip, ipcls.iplen),
301                              "dev", master_netdev, "label",
302                              "%s:0" % master_netdev])
303       if result.failed:
304         msg = "Can't activate master IP: %s" % result.output
305         logging.error(msg)
306         err_msgs.append(msg)
307
308       # we ignore the exit code of the following cmds
309       if ipcls == netutils.IP4Address:
310         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
311                       master_ip, master_ip])
312       elif ipcls == netutils.IP6Address:
313         try:
314           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
315         except errors.OpExecError:
316           # TODO: Better error reporting
317           logging.warning("Can't execute ndisc6, please install if missing")
318
319   if err_msgs:
320     _Fail("; ".join(err_msgs))
321
322
323 def StopMaster(stop_daemons):
324   """Deactivate this node as master.
325
326   The function will always try to deactivate the IP address of the
327   master. It will also stop the master daemons depending on the
328   stop_daemons parameter.
329
330   @type stop_daemons: boolean
331   @param stop_daemons: whether to also stop the master daemons
332       (ganeti-masterd and ganeti-rapi)
333   @rtype: None
334
335   """
336   # TODO: log and report back to the caller the error failures; we
337   # need to decide in which case we fail the RPC for this
338
339   # GetMasterInfo will raise an exception if not able to return data
340   master_netdev, master_ip, _, family = GetMasterInfo()
341
342   ipcls = netutils.IP4Address
343   if family == netutils.IP6Address.family:
344     ipcls = netutils.IP6Address
345
346   result = utils.RunCmd(["ip", "address", "del",
347                          "%s/%d" % (master_ip, ipcls.iplen),
348                          "dev", master_netdev])
349   if result.failed:
350     logging.error("Can't remove the master IP, error: %s", result.output)
351     # but otherwise ignore the failure
352
353   if stop_daemons:
354     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
355     if result.failed:
356       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
357                     " and error %s",
358                     result.cmd, result.exit_code, result.output)
359
360
361 def EtcHostsModify(mode, host, ip):
362   """Modify a host entry in /etc/hosts.
363
364   @param mode: The mode to operate. Either add or remove entry
365   @param host: The host to operate on
366   @param ip: The ip associated with the entry
367
368   """
369   if mode == constants.ETC_HOSTS_ADD:
370     if not ip:
371       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
372               " present")
373     utils.AddHostToEtcHosts(host, ip)
374   elif mode == constants.ETC_HOSTS_REMOVE:
375     if ip:
376       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
377               " parameter is present")
378     utils.RemoveHostFromEtcHosts(host)
379   else:
380     RPCFail("Mode not supported")
381
382
383 def LeaveCluster(modify_ssh_setup):
384   """Cleans up and remove the current node.
385
386   This function cleans up and prepares the current node to be removed
387   from the cluster.
388
389   If processing is successful, then it raises an
390   L{errors.QuitGanetiException} which is used as a special case to
391   shutdown the node daemon.
392
393   @param modify_ssh_setup: boolean
394
395   """
396   _CleanDirectory(constants.DATA_DIR)
397   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
398   JobQueuePurge()
399
400   if modify_ssh_setup:
401     try:
402       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
403
404       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
405
406       utils.RemoveFile(priv_key)
407       utils.RemoveFile(pub_key)
408     except errors.OpExecError:
409       logging.exception("Error while processing ssh files")
410
411   try:
412     utils.RemoveFile(constants.CONFD_HMAC_KEY)
413     utils.RemoveFile(constants.RAPI_CERT_FILE)
414     utils.RemoveFile(constants.NODED_CERT_FILE)
415   except: # pylint: disable-msg=W0702
416     logging.exception("Error while removing cluster secrets")
417
418   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
419   if result.failed:
420     logging.error("Command %s failed with exitcode %s and error %s",
421                   result.cmd, result.exit_code, result.output)
422
423   # Raise a custom exception (handled in ganeti-noded)
424   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
425
426
427 def GetNodeInfo(vgname, hypervisor_type):
428   """Gives back a hash with different information about the node.
429
430   @type vgname: C{string}
431   @param vgname: the name of the volume group to ask for disk space information
432   @type hypervisor_type: C{str}
433   @param hypervisor_type: the name of the hypervisor to ask for
434       memory information
435   @rtype: C{dict}
436   @return: dictionary with the following keys:
437       - vg_size is the size of the configured volume group in MiB
438       - vg_free is the free size of the volume group in MiB
439       - memory_dom0 is the memory allocated for domain0 in MiB
440       - memory_free is the currently available (free) ram in MiB
441       - memory_total is the total number of ram in MiB
442
443   """
444   outputarray = {}
445
446   if vgname is not None:
447     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
448     vg_free = vg_size = None
449     if vginfo:
450       vg_free = int(round(vginfo[0][0], 0))
451       vg_size = int(round(vginfo[0][1], 0))
452     outputarray['vg_size'] = vg_size
453     outputarray['vg_free'] = vg_free
454
455   if hypervisor_type is not None:
456     hyper = hypervisor.GetHypervisor(hypervisor_type)
457     hyp_info = hyper.GetNodeInfo()
458     if hyp_info is not None:
459       outputarray.update(hyp_info)
460
461   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462
463   return outputarray
464
465
466 def VerifyNode(what, cluster_name):
467   """Verify the status of the local node.
468
469   Based on the input L{what} parameter, various checks are done on the
470   local node.
471
472   If the I{filelist} key is present, this list of
473   files is checksummed and the file/checksum pairs are returned.
474
475   If the I{nodelist} key is present, we check that we have
476   connectivity via ssh with the target nodes (and check the hostname
477   report).
478
479   If the I{node-net-test} key is present, we check that we have
480   connectivity to the given nodes via both primary IP and, if
481   applicable, secondary IPs.
482
483   @type what: C{dict}
484   @param what: a dictionary of things to check:
485       - filelist: list of files for which to compute checksums
486       - nodelist: list of nodes we should check ssh communication with
487       - node-net-test: list of nodes we should check node daemon port
488         connectivity with
489       - hypervisor: list with hypervisors to run the verify for
490   @rtype: dict
491   @return: a dictionary with the same keys as the input dict, and
492       values representing the result of the checks
493
494   """
495   result = {}
496   my_name = netutils.Hostname.GetSysName()
497   port = netutils.GetDaemonPort(constants.NODED)
498   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
499
500   if constants.NV_HYPERVISOR in what and vm_capable:
501     result[constants.NV_HYPERVISOR] = tmp = {}
502     for hv_name in what[constants.NV_HYPERVISOR]:
503       try:
504         val = hypervisor.GetHypervisor(hv_name).Verify()
505       except errors.HypervisorError, err:
506         val = "Error while checking hypervisor: %s" % str(err)
507       tmp[hv_name] = val
508
509   if constants.NV_FILELIST in what:
510     result[constants.NV_FILELIST] = utils.FingerprintFiles(
511       what[constants.NV_FILELIST])
512
513   if constants.NV_NODELIST in what:
514     result[constants.NV_NODELIST] = tmp = {}
515     random.shuffle(what[constants.NV_NODELIST])
516     for node in what[constants.NV_NODELIST]:
517       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
518       if not success:
519         tmp[node] = message
520
521   if constants.NV_NODENETTEST in what:
522     result[constants.NV_NODENETTEST] = tmp = {}
523     my_pip = my_sip = None
524     for name, pip, sip in what[constants.NV_NODENETTEST]:
525       if name == my_name:
526         my_pip = pip
527         my_sip = sip
528         break
529     if not my_pip:
530       tmp[my_name] = ("Can't find my own primary/secondary IP"
531                       " in the node list")
532     else:
533       for name, pip, sip in what[constants.NV_NODENETTEST]:
534         fail = []
535         if not netutils.TcpPing(pip, port, source=my_pip):
536           fail.append("primary")
537         if sip != pip:
538           if not netutils.TcpPing(sip, port, source=my_sip):
539             fail.append("secondary")
540         if fail:
541           tmp[name] = ("failure using the %s interface(s)" %
542                        " and ".join(fail))
543
544   if constants.NV_MASTERIP in what:
545     # FIXME: add checks on incoming data structures (here and in the
546     # rest of the function)
547     master_name, master_ip = what[constants.NV_MASTERIP]
548     if master_name == my_name:
549       source = constants.IP4_ADDRESS_LOCALHOST
550     else:
551       source = None
552     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
553                                                   source=source)
554
555   if constants.NV_OOB_PATHS in what:
556     result[constants.NV_OOB_PATHS] = tmp = []
557     for path in what[constants.NV_OOB_PATHS]:
558       try:
559         st = os.stat(path)
560       except OSError, err:
561         tmp.append("error stating out of band helper: %s" % err)
562       else:
563         if stat.S_ISREG(st.st_mode):
564           if stat.S_IMODE(st.st_mode) & stat.S_IXUSR:
565             tmp.append(None)
566           else:
567             tmp.append("out of band helper %s is not executable" % path)
568         else:
569           tmp.append("out of band helper %s is not a file" % path)
570
571   if constants.NV_LVLIST in what and vm_capable:
572     try:
573       val = GetVolumeList(utils.ListVolumeGroups().keys())
574     except RPCFail, err:
575       val = str(err)
576     result[constants.NV_LVLIST] = val
577
578   if constants.NV_INSTANCELIST in what and vm_capable:
579     # GetInstanceList can fail
580     try:
581       val = GetInstanceList(what[constants.NV_INSTANCELIST])
582     except RPCFail, err:
583       val = str(err)
584     result[constants.NV_INSTANCELIST] = val
585
586   if constants.NV_VGLIST in what and vm_capable:
587     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
588
589   if constants.NV_PVLIST in what and vm_capable:
590     result[constants.NV_PVLIST] = \
591       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
592                                    filter_allocatable=False)
593
594   if constants.NV_VERSION in what:
595     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
596                                     constants.RELEASE_VERSION)
597
598   if constants.NV_HVINFO in what and vm_capable:
599     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
600     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
601
602   if constants.NV_DRBDLIST in what and vm_capable:
603     try:
604       used_minors = bdev.DRBD8.GetUsedDevs().keys()
605     except errors.BlockDeviceError, err:
606       logging.warning("Can't get used minors list", exc_info=True)
607       used_minors = str(err)
608     result[constants.NV_DRBDLIST] = used_minors
609
610   if constants.NV_DRBDHELPER in what and vm_capable:
611     status = True
612     try:
613       payload = bdev.BaseDRBD.GetUsermodeHelper()
614     except errors.BlockDeviceError, err:
615       logging.error("Can't get DRBD usermode helper: %s", str(err))
616       status = False
617       payload = str(err)
618     result[constants.NV_DRBDHELPER] = (status, payload)
619
620   if constants.NV_NODESETUP in what:
621     result[constants.NV_NODESETUP] = tmpr = []
622     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
623       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
624                   " under /sys, missing required directories /sys/block"
625                   " and /sys/class/net")
626     if (not os.path.isdir("/proc/sys") or
627         not os.path.isfile("/proc/sysrq-trigger")):
628       tmpr.append("The procfs filesystem doesn't seem to be mounted"
629                   " under /proc, missing required directory /proc/sys and"
630                   " the file /proc/sysrq-trigger")
631
632   if constants.NV_TIME in what:
633     result[constants.NV_TIME] = utils.SplitTime(time.time())
634
635   if constants.NV_OSLIST in what and vm_capable:
636     result[constants.NV_OSLIST] = DiagnoseOS()
637
638   return result
639
640
641 def GetVolumeList(vg_names):
642   """Compute list of logical volumes and their size.
643
644   @type vg_names: list
645   @param vg_names: the volume groups whose LVs we should list
646   @rtype: dict
647   @return:
648       dictionary of all partions (key) with value being a tuple of
649       their size (in MiB), inactive and online status::
650
651         {'xenvg/test1': ('20.06', True, True)}
652
653       in case of errors, a string is returned with the error
654       details.
655
656   """
657   lvs = {}
658   sep = '|'
659   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
660                          "--separator=%s" % sep,
661                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
662   if result.failed:
663     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
664
665   for line in result.stdout.splitlines():
666     line = line.strip()
667     match = _LVSLINE_REGEX.match(line)
668     if not match:
669       logging.error("Invalid line returned from lvs output: '%s'", line)
670       continue
671     vg_name, name, size, attr = match.groups()
672     inactive = attr[4] == '-'
673     online = attr[5] == 'o'
674     virtual = attr[0] == 'v'
675     if virtual:
676       # we don't want to report such volumes as existing, since they
677       # don't really hold data
678       continue
679     lvs[vg_name+"/"+name] = (size, inactive, online)
680
681   return lvs
682
683
684 def ListVolumeGroups():
685   """List the volume groups and their size.
686
687   @rtype: dict
688   @return: dictionary with keys volume name and values the
689       size of the volume
690
691   """
692   return utils.ListVolumeGroups()
693
694
695 def NodeVolumes():
696   """List all volumes on this node.
697
698   @rtype: list
699   @return:
700     A list of dictionaries, each having four keys:
701       - name: the logical volume name,
702       - size: the size of the logical volume
703       - dev: the physical device on which the LV lives
704       - vg: the volume group to which it belongs
705
706     In case of errors, we return an empty list and log the
707     error.
708
709     Note that since a logical volume can live on multiple physical
710     volumes, the resulting list might include a logical volume
711     multiple times.
712
713   """
714   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
715                          "--separator=|",
716                          "--options=lv_name,lv_size,devices,vg_name"])
717   if result.failed:
718     _Fail("Failed to list logical volumes, lvs output: %s",
719           result.output)
720
721   def parse_dev(dev):
722     return dev.split('(')[0]
723
724   def handle_dev(dev):
725     return [parse_dev(x) for x in dev.split(",")]
726
727   def map_line(line):
728     line = [v.strip() for v in line]
729     return [{'name': line[0], 'size': line[1],
730              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
731
732   all_devs = []
733   for line in result.stdout.splitlines():
734     if line.count('|') >= 3:
735       all_devs.extend(map_line(line.split('|')))
736     else:
737       logging.warning("Strange line in the output from lvs: '%s'", line)
738   return all_devs
739
740
741 def BridgesExist(bridges_list):
742   """Check if a list of bridges exist on the current node.
743
744   @rtype: boolean
745   @return: C{True} if all of them exist, C{False} otherwise
746
747   """
748   missing = []
749   for bridge in bridges_list:
750     if not utils.BridgeExists(bridge):
751       missing.append(bridge)
752
753   if missing:
754     _Fail("Missing bridges %s", utils.CommaJoin(missing))
755
756
757 def GetInstanceList(hypervisor_list):
758   """Provides a list of instances.
759
760   @type hypervisor_list: list
761   @param hypervisor_list: the list of hypervisors to query information
762
763   @rtype: list
764   @return: a list of all running instances on the current node
765     - instance1.example.com
766     - instance2.example.com
767
768   """
769   results = []
770   for hname in hypervisor_list:
771     try:
772       names = hypervisor.GetHypervisor(hname).ListInstances()
773       results.extend(names)
774     except errors.HypervisorError, err:
775       _Fail("Error enumerating instances (hypervisor %s): %s",
776             hname, err, exc=True)
777
778   return results
779
780
781 def GetInstanceInfo(instance, hname):
782   """Gives back the information about an instance as a dictionary.
783
784   @type instance: string
785   @param instance: the instance name
786   @type hname: string
787   @param hname: the hypervisor type of the instance
788
789   @rtype: dict
790   @return: dictionary with the following keys:
791       - memory: memory size of instance (int)
792       - state: xen state of instance (string)
793       - time: cpu time of instance (float)
794
795   """
796   output = {}
797
798   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
799   if iinfo is not None:
800     output['memory'] = iinfo[2]
801     output['state'] = iinfo[4]
802     output['time'] = iinfo[5]
803
804   return output
805
806
807 def GetInstanceMigratable(instance):
808   """Gives whether an instance can be migrated.
809
810   @type instance: L{objects.Instance}
811   @param instance: object representing the instance to be checked.
812
813   @rtype: tuple
814   @return: tuple of (result, description) where:
815       - result: whether the instance can be migrated or not
816       - description: a description of the issue, if relevant
817
818   """
819   hyper = hypervisor.GetHypervisor(instance.hypervisor)
820   iname = instance.name
821   if iname not in hyper.ListInstances():
822     _Fail("Instance %s is not running", iname)
823
824   for idx in range(len(instance.disks)):
825     link_name = _GetBlockDevSymlinkPath(iname, idx)
826     if not os.path.islink(link_name):
827       logging.warning("Instance %s is missing symlink %s for disk %d",
828                       iname, link_name, idx)
829
830
831 def GetAllInstancesInfo(hypervisor_list):
832   """Gather data about all instances.
833
834   This is the equivalent of L{GetInstanceInfo}, except that it
835   computes data for all instances at once, thus being faster if one
836   needs data about more than one instance.
837
838   @type hypervisor_list: list
839   @param hypervisor_list: list of hypervisors to query for instance data
840
841   @rtype: dict
842   @return: dictionary of instance: data, with data having the following keys:
843       - memory: memory size of instance (int)
844       - state: xen state of instance (string)
845       - time: cpu time of instance (float)
846       - vcpus: the number of vcpus
847
848   """
849   output = {}
850
851   for hname in hypervisor_list:
852     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
853     if iinfo:
854       for name, _, memory, vcpus, state, times in iinfo:
855         value = {
856           'memory': memory,
857           'vcpus': vcpus,
858           'state': state,
859           'time': times,
860           }
861         if name in output:
862           # we only check static parameters, like memory and vcpus,
863           # and not state and time which can change between the
864           # invocations of the different hypervisors
865           for key in 'memory', 'vcpus':
866             if value[key] != output[name][key]:
867               _Fail("Instance %s is running twice"
868                     " with different parameters", name)
869         output[name] = value
870
871   return output
872
873
874 def _InstanceLogName(kind, os_name, instance):
875   """Compute the OS log filename for a given instance and operation.
876
877   The instance name and os name are passed in as strings since not all
878   operations have these as part of an instance object.
879
880   @type kind: string
881   @param kind: the operation type (e.g. add, import, etc.)
882   @type os_name: string
883   @param os_name: the os name
884   @type instance: string
885   @param instance: the name of the instance being imported/added/etc.
886
887   """
888   # TODO: Use tempfile.mkstemp to create unique filename
889   base = ("%s-%s-%s-%s.log" %
890           (kind, os_name, instance, utils.TimestampForFilename()))
891   return utils.PathJoin(constants.LOG_OS_DIR, base)
892
893
894 def InstanceOsAdd(instance, reinstall, debug):
895   """Add an OS to an instance.
896
897   @type instance: L{objects.Instance}
898   @param instance: Instance whose OS is to be installed
899   @type reinstall: boolean
900   @param reinstall: whether this is an instance reinstall
901   @type debug: integer
902   @param debug: debug level, passed to the OS scripts
903   @rtype: None
904
905   """
906   inst_os = OSFromDisk(instance.os)
907
908   create_env = OSEnvironment(instance, inst_os, debug)
909   if reinstall:
910     create_env['INSTANCE_REINSTALL'] = "1"
911
912   logfile = _InstanceLogName("add", instance.os, instance.name)
913
914   result = utils.RunCmd([inst_os.create_script], env=create_env,
915                         cwd=inst_os.path, output=logfile,)
916   if result.failed:
917     logging.error("os create command '%s' returned error: %s, logfile: %s,"
918                   " output: %s", result.cmd, result.fail_reason, logfile,
919                   result.output)
920     lines = [utils.SafeEncode(val)
921              for val in utils.TailFile(logfile, lines=20)]
922     _Fail("OS create script failed (%s), last lines in the"
923           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
924
925
926 def RunRenameInstance(instance, old_name, debug):
927   """Run the OS rename script for an instance.
928
929   @type instance: L{objects.Instance}
930   @param instance: Instance whose OS is to be installed
931   @type old_name: string
932   @param old_name: previous instance name
933   @type debug: integer
934   @param debug: debug level, passed to the OS scripts
935   @rtype: boolean
936   @return: the success of the operation
937
938   """
939   inst_os = OSFromDisk(instance.os)
940
941   rename_env = OSEnvironment(instance, inst_os, debug)
942   rename_env['OLD_INSTANCE_NAME'] = old_name
943
944   logfile = _InstanceLogName("rename", instance.os,
945                              "%s-%s" % (old_name, instance.name))
946
947   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
948                         cwd=inst_os.path, output=logfile)
949
950   if result.failed:
951     logging.error("os create command '%s' returned error: %s output: %s",
952                   result.cmd, result.fail_reason, result.output)
953     lines = [utils.SafeEncode(val)
954              for val in utils.TailFile(logfile, lines=20)]
955     _Fail("OS rename script failed (%s), last lines in the"
956           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
957
958
959 def _GetBlockDevSymlinkPath(instance_name, idx):
960   return utils.PathJoin(constants.DISK_LINKS_DIR, "%s%s%d" %
961                         (instance_name, constants.DISK_SEPARATOR, idx))
962
963
964 def _SymlinkBlockDev(instance_name, device_path, idx):
965   """Set up symlinks to a instance's block device.
966
967   This is an auxiliary function run when an instance is start (on the primary
968   node) or when an instance is migrated (on the target node).
969
970
971   @param instance_name: the name of the target instance
972   @param device_path: path of the physical block device, on the node
973   @param idx: the disk index
974   @return: absolute path to the disk's symlink
975
976   """
977   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
978   try:
979     os.symlink(device_path, link_name)
980   except OSError, err:
981     if err.errno == errno.EEXIST:
982       if (not os.path.islink(link_name) or
983           os.readlink(link_name) != device_path):
984         os.remove(link_name)
985         os.symlink(device_path, link_name)
986     else:
987       raise
988
989   return link_name
990
991
992 def _RemoveBlockDevLinks(instance_name, disks):
993   """Remove the block device symlinks belonging to the given instance.
994
995   """
996   for idx, _ in enumerate(disks):
997     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
998     if os.path.islink(link_name):
999       try:
1000         os.remove(link_name)
1001       except OSError:
1002         logging.exception("Can't remove symlink '%s'", link_name)
1003
1004
1005 def _GatherAndLinkBlockDevs(instance):
1006   """Set up an instance's block device(s).
1007
1008   This is run on the primary node at instance startup. The block
1009   devices must be already assembled.
1010
1011   @type instance: L{objects.Instance}
1012   @param instance: the instance whose disks we shoul assemble
1013   @rtype: list
1014   @return: list of (disk_object, device_path)
1015
1016   """
1017   block_devices = []
1018   for idx, disk in enumerate(instance.disks):
1019     device = _RecursiveFindBD(disk)
1020     if device is None:
1021       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1022                                     str(disk))
1023     device.Open()
1024     try:
1025       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1026     except OSError, e:
1027       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1028                                     e.strerror)
1029
1030     block_devices.append((disk, link_name))
1031
1032   return block_devices
1033
1034
1035 def StartInstance(instance):
1036   """Start an instance.
1037
1038   @type instance: L{objects.Instance}
1039   @param instance: the instance object
1040   @rtype: None
1041
1042   """
1043   running_instances = GetInstanceList([instance.hypervisor])
1044
1045   if instance.name in running_instances:
1046     logging.info("Instance %s already running, not starting", instance.name)
1047     return
1048
1049   try:
1050     block_devices = _GatherAndLinkBlockDevs(instance)
1051     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1052     hyper.StartInstance(instance, block_devices)
1053   except errors.BlockDeviceError, err:
1054     _Fail("Block device error: %s", err, exc=True)
1055   except errors.HypervisorError, err:
1056     _RemoveBlockDevLinks(instance.name, instance.disks)
1057     _Fail("Hypervisor error: %s", err, exc=True)
1058
1059
1060 def InstanceShutdown(instance, timeout):
1061   """Shut an instance down.
1062
1063   @note: this functions uses polling with a hardcoded timeout.
1064
1065   @type instance: L{objects.Instance}
1066   @param instance: the instance object
1067   @type timeout: integer
1068   @param timeout: maximum timeout for soft shutdown
1069   @rtype: None
1070
1071   """
1072   hv_name = instance.hypervisor
1073   hyper = hypervisor.GetHypervisor(hv_name)
1074   iname = instance.name
1075
1076   if instance.name not in hyper.ListInstances():
1077     logging.info("Instance %s not running, doing nothing", iname)
1078     return
1079
1080   class _TryShutdown:
1081     def __init__(self):
1082       self.tried_once = False
1083
1084     def __call__(self):
1085       if iname not in hyper.ListInstances():
1086         return
1087
1088       try:
1089         hyper.StopInstance(instance, retry=self.tried_once)
1090       except errors.HypervisorError, err:
1091         if iname not in hyper.ListInstances():
1092           # if the instance is no longer existing, consider this a
1093           # success and go to cleanup
1094           return
1095
1096         _Fail("Failed to stop instance %s: %s", iname, err)
1097
1098       self.tried_once = True
1099
1100       raise utils.RetryAgain()
1101
1102   try:
1103     utils.Retry(_TryShutdown(), 5, timeout)
1104   except utils.RetryTimeout:
1105     # the shutdown did not succeed
1106     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1107
1108     try:
1109       hyper.StopInstance(instance, force=True)
1110     except errors.HypervisorError, err:
1111       if iname in hyper.ListInstances():
1112         # only raise an error if the instance still exists, otherwise
1113         # the error could simply be "instance ... unknown"!
1114         _Fail("Failed to force stop instance %s: %s", iname, err)
1115
1116     time.sleep(1)
1117
1118     if iname in hyper.ListInstances():
1119       _Fail("Could not shutdown instance %s even by destroy", iname)
1120
1121   try:
1122     hyper.CleanupInstance(instance.name)
1123   except errors.HypervisorError, err:
1124     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1125
1126   _RemoveBlockDevLinks(iname, instance.disks)
1127
1128
1129 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1130   """Reboot an instance.
1131
1132   @type instance: L{objects.Instance}
1133   @param instance: the instance object to reboot
1134   @type reboot_type: str
1135   @param reboot_type: the type of reboot, one the following
1136     constants:
1137       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1138         instance OS, do not recreate the VM
1139       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1140         restart the VM (at the hypervisor level)
1141       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1142         not accepted here, since that mode is handled differently, in
1143         cmdlib, and translates into full stop and start of the
1144         instance (instead of a call_instance_reboot RPC)
1145   @type shutdown_timeout: integer
1146   @param shutdown_timeout: maximum timeout for soft shutdown
1147   @rtype: None
1148
1149   """
1150   running_instances = GetInstanceList([instance.hypervisor])
1151
1152   if instance.name not in running_instances:
1153     _Fail("Cannot reboot instance %s that is not running", instance.name)
1154
1155   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1156   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1157     try:
1158       hyper.RebootInstance(instance)
1159     except errors.HypervisorError, err:
1160       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1161   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1162     try:
1163       InstanceShutdown(instance, shutdown_timeout)
1164       return StartInstance(instance)
1165     except errors.HypervisorError, err:
1166       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1167   else:
1168     _Fail("Invalid reboot_type received: %s", reboot_type)
1169
1170
1171 def MigrationInfo(instance):
1172   """Gather information about an instance to be migrated.
1173
1174   @type instance: L{objects.Instance}
1175   @param instance: the instance definition
1176
1177   """
1178   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1179   try:
1180     info = hyper.MigrationInfo(instance)
1181   except errors.HypervisorError, err:
1182     _Fail("Failed to fetch migration information: %s", err, exc=True)
1183   return info
1184
1185
1186 def AcceptInstance(instance, info, target):
1187   """Prepare the node to accept an instance.
1188
1189   @type instance: L{objects.Instance}
1190   @param instance: the instance definition
1191   @type info: string/data (opaque)
1192   @param info: migration information, from the source node
1193   @type target: string
1194   @param target: target host (usually ip), on this node
1195
1196   """
1197   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1198   try:
1199     hyper.AcceptInstance(instance, info, target)
1200   except errors.HypervisorError, err:
1201     _Fail("Failed to accept instance: %s", err, exc=True)
1202
1203
1204 def FinalizeMigration(instance, info, success):
1205   """Finalize any preparation to accept an instance.
1206
1207   @type instance: L{objects.Instance}
1208   @param instance: the instance definition
1209   @type info: string/data (opaque)
1210   @param info: migration information, from the source node
1211   @type success: boolean
1212   @param success: whether the migration was a success or a failure
1213
1214   """
1215   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1216   try:
1217     hyper.FinalizeMigration(instance, info, success)
1218   except errors.HypervisorError, err:
1219     _Fail("Failed to finalize migration: %s", err, exc=True)
1220
1221
1222 def MigrateInstance(instance, target, live):
1223   """Migrates an instance to another node.
1224
1225   @type instance: L{objects.Instance}
1226   @param instance: the instance definition
1227   @type target: string
1228   @param target: the target node name
1229   @type live: boolean
1230   @param live: whether the migration should be done live or not (the
1231       interpretation of this parameter is left to the hypervisor)
1232   @rtype: tuple
1233   @return: a tuple of (success, msg) where:
1234       - succes is a boolean denoting the success/failure of the operation
1235       - msg is a string with details in case of failure
1236
1237   """
1238   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1239
1240   try:
1241     hyper.MigrateInstance(instance, target, live)
1242   except errors.HypervisorError, err:
1243     _Fail("Failed to migrate instance: %s", err, exc=True)
1244
1245
1246 def BlockdevCreate(disk, size, owner, on_primary, info):
1247   """Creates a block device for an instance.
1248
1249   @type disk: L{objects.Disk}
1250   @param disk: the object describing the disk we should create
1251   @type size: int
1252   @param size: the size of the physical underlying device, in MiB
1253   @type owner: str
1254   @param owner: the name of the instance for which disk is created,
1255       used for device cache data
1256   @type on_primary: boolean
1257   @param on_primary:  indicates if it is the primary node or not
1258   @type info: string
1259   @param info: string that will be sent to the physical device
1260       creation, used for example to set (LVM) tags on LVs
1261
1262   @return: the new unique_id of the device (this can sometime be
1263       computed only after creation), or None. On secondary nodes,
1264       it's not required to return anything.
1265
1266   """
1267   # TODO: remove the obsolete 'size' argument
1268   # pylint: disable-msg=W0613
1269   clist = []
1270   if disk.children:
1271     for child in disk.children:
1272       try:
1273         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1274       except errors.BlockDeviceError, err:
1275         _Fail("Can't assemble device %s: %s", child, err)
1276       if on_primary or disk.AssembleOnSecondary():
1277         # we need the children open in case the device itself has to
1278         # be assembled
1279         try:
1280           # pylint: disable-msg=E1103
1281           crdev.Open()
1282         except errors.BlockDeviceError, err:
1283           _Fail("Can't make child '%s' read-write: %s", child, err)
1284       clist.append(crdev)
1285
1286   try:
1287     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1288   except errors.BlockDeviceError, err:
1289     _Fail("Can't create block device: %s", err)
1290
1291   if on_primary or disk.AssembleOnSecondary():
1292     try:
1293       device.Assemble()
1294     except errors.BlockDeviceError, err:
1295       _Fail("Can't assemble device after creation, unusual event: %s", err)
1296     device.SetSyncSpeed(constants.SYNC_SPEED)
1297     if on_primary or disk.OpenOnSecondary():
1298       try:
1299         device.Open(force=True)
1300       except errors.BlockDeviceError, err:
1301         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1302     DevCacheManager.UpdateCache(device.dev_path, owner,
1303                                 on_primary, disk.iv_name)
1304
1305   device.SetInfo(info)
1306
1307   return device.unique_id
1308
1309
1310 def _WipeDevice(path, offset, size):
1311   """This function actually wipes the device.
1312
1313   @param path: The path to the device to wipe
1314   @param offset: The offset in MiB in the file
1315   @param size: The size in MiB to write
1316
1317   """
1318   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1319          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1320          "count=%d" % size]
1321   result = utils.RunCmd(cmd)
1322
1323   if result.failed:
1324     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1325           result.fail_reason, result.output)
1326
1327
1328 def BlockdevWipe(disk, offset, size):
1329   """Wipes a block device.
1330
1331   @type disk: L{objects.Disk}
1332   @param disk: the disk object we want to wipe
1333   @type offset: int
1334   @param offset: The offset in MiB in the file
1335   @type size: int
1336   @param size: The size in MiB to write
1337
1338   """
1339   try:
1340     rdev = _RecursiveFindBD(disk)
1341   except errors.BlockDeviceError:
1342     rdev = None
1343
1344   if not rdev:
1345     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1346
1347   # Do cross verify some of the parameters
1348   if offset > rdev.size:
1349     _Fail("Offset is bigger than device size")
1350   if (offset + size) > rdev.size:
1351     _Fail("The provided offset and size to wipe is bigger than device size")
1352
1353   _WipeDevice(rdev.dev_path, offset, size)
1354
1355
1356 def BlockdevRemove(disk):
1357   """Remove a block device.
1358
1359   @note: This is intended to be called recursively.
1360
1361   @type disk: L{objects.Disk}
1362   @param disk: the disk object we should remove
1363   @rtype: boolean
1364   @return: the success of the operation
1365
1366   """
1367   msgs = []
1368   try:
1369     rdev = _RecursiveFindBD(disk)
1370   except errors.BlockDeviceError, err:
1371     # probably can't attach
1372     logging.info("Can't attach to device %s in remove", disk)
1373     rdev = None
1374   if rdev is not None:
1375     r_path = rdev.dev_path
1376     try:
1377       rdev.Remove()
1378     except errors.BlockDeviceError, err:
1379       msgs.append(str(err))
1380     if not msgs:
1381       DevCacheManager.RemoveCache(r_path)
1382
1383   if disk.children:
1384     for child in disk.children:
1385       try:
1386         BlockdevRemove(child)
1387       except RPCFail, err:
1388         msgs.append(str(err))
1389
1390   if msgs:
1391     _Fail("; ".join(msgs))
1392
1393
1394 def _RecursiveAssembleBD(disk, owner, as_primary):
1395   """Activate a block device for an instance.
1396
1397   This is run on the primary and secondary nodes for an instance.
1398
1399   @note: this function is called recursively.
1400
1401   @type disk: L{objects.Disk}
1402   @param disk: the disk we try to assemble
1403   @type owner: str
1404   @param owner: the name of the instance which owns the disk
1405   @type as_primary: boolean
1406   @param as_primary: if we should make the block device
1407       read/write
1408
1409   @return: the assembled device or None (in case no device
1410       was assembled)
1411   @raise errors.BlockDeviceError: in case there is an error
1412       during the activation of the children or the device
1413       itself
1414
1415   """
1416   children = []
1417   if disk.children:
1418     mcn = disk.ChildrenNeeded()
1419     if mcn == -1:
1420       mcn = 0 # max number of Nones allowed
1421     else:
1422       mcn = len(disk.children) - mcn # max number of Nones
1423     for chld_disk in disk.children:
1424       try:
1425         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1426       except errors.BlockDeviceError, err:
1427         if children.count(None) >= mcn:
1428           raise
1429         cdev = None
1430         logging.error("Error in child activation (but continuing): %s",
1431                       str(err))
1432       children.append(cdev)
1433
1434   if as_primary or disk.AssembleOnSecondary():
1435     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1436     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1437     result = r_dev
1438     if as_primary or disk.OpenOnSecondary():
1439       r_dev.Open()
1440     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1441                                 as_primary, disk.iv_name)
1442
1443   else:
1444     result = True
1445   return result
1446
1447
1448 def BlockdevAssemble(disk, owner, as_primary):
1449   """Activate a block device for an instance.
1450
1451   This is a wrapper over _RecursiveAssembleBD.
1452
1453   @rtype: str or boolean
1454   @return: a C{/dev/...} path for primary nodes, and
1455       C{True} for secondary nodes
1456
1457   """
1458   try:
1459     result = _RecursiveAssembleBD(disk, owner, as_primary)
1460     if isinstance(result, bdev.BlockDev):
1461       # pylint: disable-msg=E1103
1462       result = result.dev_path
1463   except errors.BlockDeviceError, err:
1464     _Fail("Error while assembling disk: %s", err, exc=True)
1465
1466   return result
1467
1468
1469 def BlockdevShutdown(disk):
1470   """Shut down a block device.
1471
1472   First, if the device is assembled (Attach() is successful), then
1473   the device is shutdown. Then the children of the device are
1474   shutdown.
1475
1476   This function is called recursively. Note that we don't cache the
1477   children or such, as oppossed to assemble, shutdown of different
1478   devices doesn't require that the upper device was active.
1479
1480   @type disk: L{objects.Disk}
1481   @param disk: the description of the disk we should
1482       shutdown
1483   @rtype: None
1484
1485   """
1486   msgs = []
1487   r_dev = _RecursiveFindBD(disk)
1488   if r_dev is not None:
1489     r_path = r_dev.dev_path
1490     try:
1491       r_dev.Shutdown()
1492       DevCacheManager.RemoveCache(r_path)
1493     except errors.BlockDeviceError, err:
1494       msgs.append(str(err))
1495
1496   if disk.children:
1497     for child in disk.children:
1498       try:
1499         BlockdevShutdown(child)
1500       except RPCFail, err:
1501         msgs.append(str(err))
1502
1503   if msgs:
1504     _Fail("; ".join(msgs))
1505
1506
1507 def BlockdevAddchildren(parent_cdev, new_cdevs):
1508   """Extend a mirrored block device.
1509
1510   @type parent_cdev: L{objects.Disk}
1511   @param parent_cdev: the disk to which we should add children
1512   @type new_cdevs: list of L{objects.Disk}
1513   @param new_cdevs: the list of children which we should add
1514   @rtype: None
1515
1516   """
1517   parent_bdev = _RecursiveFindBD(parent_cdev)
1518   if parent_bdev is None:
1519     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1520   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1521   if new_bdevs.count(None) > 0:
1522     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1523   parent_bdev.AddChildren(new_bdevs)
1524
1525
1526 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1527   """Shrink a mirrored block device.
1528
1529   @type parent_cdev: L{objects.Disk}
1530   @param parent_cdev: the disk from which we should remove children
1531   @type new_cdevs: list of L{objects.Disk}
1532   @param new_cdevs: the list of children which we should remove
1533   @rtype: None
1534
1535   """
1536   parent_bdev = _RecursiveFindBD(parent_cdev)
1537   if parent_bdev is None:
1538     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1539   devs = []
1540   for disk in new_cdevs:
1541     rpath = disk.StaticDevPath()
1542     if rpath is None:
1543       bd = _RecursiveFindBD(disk)
1544       if bd is None:
1545         _Fail("Can't find device %s while removing children", disk)
1546       else:
1547         devs.append(bd.dev_path)
1548     else:
1549       if not utils.IsNormAbsPath(rpath):
1550         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1551       devs.append(rpath)
1552   parent_bdev.RemoveChildren(devs)
1553
1554
1555 def BlockdevGetmirrorstatus(disks):
1556   """Get the mirroring status of a list of devices.
1557
1558   @type disks: list of L{objects.Disk}
1559   @param disks: the list of disks which we should query
1560   @rtype: disk
1561   @return: List of L{objects.BlockDevStatus}, one for each disk
1562   @raise errors.BlockDeviceError: if any of the disks cannot be
1563       found
1564
1565   """
1566   stats = []
1567   for dsk in disks:
1568     rbd = _RecursiveFindBD(dsk)
1569     if rbd is None:
1570       _Fail("Can't find device %s", dsk)
1571
1572     stats.append(rbd.CombinedSyncStatus())
1573
1574   return stats
1575
1576
1577 def BlockdevGetmirrorstatusMulti(disks):
1578   """Get the mirroring status of a list of devices.
1579
1580   @type disks: list of L{objects.Disk}
1581   @param disks: the list of disks which we should query
1582   @rtype: disk
1583   @return: List of tuples, (bool, status), one for each disk; bool denotes
1584     success/failure, status is L{objects.BlockDevStatus} on success, string
1585     otherwise
1586
1587   """
1588   result = []
1589   for disk in disks:
1590     try:
1591       rbd = _RecursiveFindBD(disk)
1592       if rbd is None:
1593         result.append((False, "Can't find device %s" % disk))
1594         continue
1595
1596       status = rbd.CombinedSyncStatus()
1597     except errors.BlockDeviceError, err:
1598       logging.exception("Error while getting disk status")
1599       result.append((False, str(err)))
1600     else:
1601       result.append((True, status))
1602
1603   assert len(disks) == len(result)
1604
1605   return result
1606
1607
1608 def _RecursiveFindBD(disk):
1609   """Check if a device is activated.
1610
1611   If so, return information about the real device.
1612
1613   @type disk: L{objects.Disk}
1614   @param disk: the disk object we need to find
1615
1616   @return: None if the device can't be found,
1617       otherwise the device instance
1618
1619   """
1620   children = []
1621   if disk.children:
1622     for chdisk in disk.children:
1623       children.append(_RecursiveFindBD(chdisk))
1624
1625   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1626
1627
1628 def _OpenRealBD(disk):
1629   """Opens the underlying block device of a disk.
1630
1631   @type disk: L{objects.Disk}
1632   @param disk: the disk object we want to open
1633
1634   """
1635   real_disk = _RecursiveFindBD(disk)
1636   if real_disk is None:
1637     _Fail("Block device '%s' is not set up", disk)
1638
1639   real_disk.Open()
1640
1641   return real_disk
1642
1643
1644 def BlockdevFind(disk):
1645   """Check if a device is activated.
1646
1647   If it is, return information about the real device.
1648
1649   @type disk: L{objects.Disk}
1650   @param disk: the disk to find
1651   @rtype: None or objects.BlockDevStatus
1652   @return: None if the disk cannot be found, otherwise a the current
1653            information
1654
1655   """
1656   try:
1657     rbd = _RecursiveFindBD(disk)
1658   except errors.BlockDeviceError, err:
1659     _Fail("Failed to find device: %s", err, exc=True)
1660
1661   if rbd is None:
1662     return None
1663
1664   return rbd.GetSyncStatus()
1665
1666
1667 def BlockdevGetsize(disks):
1668   """Computes the size of the given disks.
1669
1670   If a disk is not found, returns None instead.
1671
1672   @type disks: list of L{objects.Disk}
1673   @param disks: the list of disk to compute the size for
1674   @rtype: list
1675   @return: list with elements None if the disk cannot be found,
1676       otherwise the size
1677
1678   """
1679   result = []
1680   for cf in disks:
1681     try:
1682       rbd = _RecursiveFindBD(cf)
1683     except errors.BlockDeviceError:
1684       result.append(None)
1685       continue
1686     if rbd is None:
1687       result.append(None)
1688     else:
1689       result.append(rbd.GetActualSize())
1690   return result
1691
1692
1693 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1694   """Export a block device to a remote node.
1695
1696   @type disk: L{objects.Disk}
1697   @param disk: the description of the disk to export
1698   @type dest_node: str
1699   @param dest_node: the destination node to export to
1700   @type dest_path: str
1701   @param dest_path: the destination path on the target node
1702   @type cluster_name: str
1703   @param cluster_name: the cluster name, needed for SSH hostalias
1704   @rtype: None
1705
1706   """
1707   real_disk = _OpenRealBD(disk)
1708
1709   # the block size on the read dd is 1MiB to match our units
1710   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1711                                "dd if=%s bs=1048576 count=%s",
1712                                real_disk.dev_path, str(disk.size))
1713
1714   # we set here a smaller block size as, due to ssh buffering, more
1715   # than 64-128k will mostly ignored; we use nocreat to fail if the
1716   # device is not already there or we pass a wrong path; we use
1717   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1718   # to not buffer too much memory; this means that at best, we flush
1719   # every 64k, which will not be very fast
1720   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1721                                 " oflag=dsync", dest_path)
1722
1723   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1724                                                    constants.GANETI_RUNAS,
1725                                                    destcmd)
1726
1727   # all commands have been checked, so we're safe to combine them
1728   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1729
1730   result = utils.RunCmd(["bash", "-c", command])
1731
1732   if result.failed:
1733     _Fail("Disk copy command '%s' returned error: %s"
1734           " output: %s", command, result.fail_reason, result.output)
1735
1736
1737 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1738   """Write a file to the filesystem.
1739
1740   This allows the master to overwrite(!) a file. It will only perform
1741   the operation if the file belongs to a list of configuration files.
1742
1743   @type file_name: str
1744   @param file_name: the target file name
1745   @type data: str
1746   @param data: the new contents of the file
1747   @type mode: int
1748   @param mode: the mode to give the file (can be None)
1749   @type uid: int
1750   @param uid: the owner of the file (can be -1 for default)
1751   @type gid: int
1752   @param gid: the group of the file (can be -1 for default)
1753   @type atime: float
1754   @param atime: the atime to set on the file (can be None)
1755   @type mtime: float
1756   @param mtime: the mtime to set on the file (can be None)
1757   @rtype: None
1758
1759   """
1760   if not os.path.isabs(file_name):
1761     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1762
1763   if file_name not in _ALLOWED_UPLOAD_FILES:
1764     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1765           file_name)
1766
1767   raw_data = _Decompress(data)
1768
1769   utils.SafeWriteFile(file_name, None,
1770                       data=raw_data, mode=mode, uid=uid, gid=gid,
1771                       atime=atime, mtime=mtime)
1772
1773
1774 def RunOob(oob_program, command, node, timeout):
1775   """Executes oob_program with given command on given node.
1776
1777   @param oob_program: The path to the executable oob_program
1778   @param command: The command to invoke on oob_program
1779   @param node: The node given as an argument to the program
1780   @param timeout: Timeout after which we kill the oob program
1781
1782   @return: stdout
1783   @raise RPCFail: If execution fails for some reason
1784
1785   """
1786   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1787
1788   if result.failed:
1789     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1790           result.fail_reason, result.output)
1791
1792   return result.stdout
1793
1794
1795 def WriteSsconfFiles(values):
1796   """Update all ssconf files.
1797
1798   Wrapper around the SimpleStore.WriteFiles.
1799
1800   """
1801   ssconf.SimpleStore().WriteFiles(values)
1802
1803
1804 def _ErrnoOrStr(err):
1805   """Format an EnvironmentError exception.
1806
1807   If the L{err} argument has an errno attribute, it will be looked up
1808   and converted into a textual C{E...} description. Otherwise the
1809   string representation of the error will be returned.
1810
1811   @type err: L{EnvironmentError}
1812   @param err: the exception to format
1813
1814   """
1815   if hasattr(err, 'errno'):
1816     detail = errno.errorcode[err.errno]
1817   else:
1818     detail = str(err)
1819   return detail
1820
1821
1822 def _OSOndiskAPIVersion(os_dir):
1823   """Compute and return the API version of a given OS.
1824
1825   This function will try to read the API version of the OS residing in
1826   the 'os_dir' directory.
1827
1828   @type os_dir: str
1829   @param os_dir: the directory in which we should look for the OS
1830   @rtype: tuple
1831   @return: tuple (status, data) with status denoting the validity and
1832       data holding either the vaid versions or an error message
1833
1834   """
1835   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1836
1837   try:
1838     st = os.stat(api_file)
1839   except EnvironmentError, err:
1840     return False, ("Required file '%s' not found under path %s: %s" %
1841                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1842
1843   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1844     return False, ("File '%s' in %s is not a regular file" %
1845                    (constants.OS_API_FILE, os_dir))
1846
1847   try:
1848     api_versions = utils.ReadFile(api_file).splitlines()
1849   except EnvironmentError, err:
1850     return False, ("Error while reading the API version file at %s: %s" %
1851                    (api_file, _ErrnoOrStr(err)))
1852
1853   try:
1854     api_versions = [int(version.strip()) for version in api_versions]
1855   except (TypeError, ValueError), err:
1856     return False, ("API version(s) can't be converted to integer: %s" %
1857                    str(err))
1858
1859   return True, api_versions
1860
1861
1862 def DiagnoseOS(top_dirs=None):
1863   """Compute the validity for all OSes.
1864
1865   @type top_dirs: list
1866   @param top_dirs: the list of directories in which to
1867       search (if not given defaults to
1868       L{constants.OS_SEARCH_PATH})
1869   @rtype: list of L{objects.OS}
1870   @return: a list of tuples (name, path, status, diagnose, variants,
1871       parameters, api_version) for all (potential) OSes under all
1872       search paths, where:
1873           - name is the (potential) OS name
1874           - path is the full path to the OS
1875           - status True/False is the validity of the OS
1876           - diagnose is the error message for an invalid OS, otherwise empty
1877           - variants is a list of supported OS variants, if any
1878           - parameters is a list of (name, help) parameters, if any
1879           - api_version is a list of support OS API versions
1880
1881   """
1882   if top_dirs is None:
1883     top_dirs = constants.OS_SEARCH_PATH
1884
1885   result = []
1886   for dir_name in top_dirs:
1887     if os.path.isdir(dir_name):
1888       try:
1889         f_names = utils.ListVisibleFiles(dir_name)
1890       except EnvironmentError, err:
1891         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1892         break
1893       for name in f_names:
1894         os_path = utils.PathJoin(dir_name, name)
1895         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1896         if status:
1897           diagnose = ""
1898           variants = os_inst.supported_variants
1899           parameters = os_inst.supported_parameters
1900           api_versions = os_inst.api_versions
1901         else:
1902           diagnose = os_inst
1903           variants = parameters = api_versions = []
1904         result.append((name, os_path, status, diagnose, variants,
1905                        parameters, api_versions))
1906
1907   return result
1908
1909
1910 def _TryOSFromDisk(name, base_dir=None):
1911   """Create an OS instance from disk.
1912
1913   This function will return an OS instance if the given name is a
1914   valid OS name.
1915
1916   @type base_dir: string
1917   @keyword base_dir: Base directory containing OS installations.
1918                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1919   @rtype: tuple
1920   @return: success and either the OS instance if we find a valid one,
1921       or error message
1922
1923   """
1924   if base_dir is None:
1925     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1926   else:
1927     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1928
1929   if os_dir is None:
1930     return False, "Directory for OS %s not found in search path" % name
1931
1932   status, api_versions = _OSOndiskAPIVersion(os_dir)
1933   if not status:
1934     # push the error up
1935     return status, api_versions
1936
1937   if not constants.OS_API_VERSIONS.intersection(api_versions):
1938     return False, ("API version mismatch for path '%s': found %s, want %s." %
1939                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1940
1941   # OS Files dictionary, we will populate it with the absolute path names
1942   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1943
1944   if max(api_versions) >= constants.OS_API_V15:
1945     os_files[constants.OS_VARIANTS_FILE] = ''
1946
1947   if max(api_versions) >= constants.OS_API_V20:
1948     os_files[constants.OS_PARAMETERS_FILE] = ''
1949   else:
1950     del os_files[constants.OS_SCRIPT_VERIFY]
1951
1952   for filename in os_files:
1953     os_files[filename] = utils.PathJoin(os_dir, filename)
1954
1955     try:
1956       st = os.stat(os_files[filename])
1957     except EnvironmentError, err:
1958       return False, ("File '%s' under path '%s' is missing (%s)" %
1959                      (filename, os_dir, _ErrnoOrStr(err)))
1960
1961     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1962       return False, ("File '%s' under path '%s' is not a regular file" %
1963                      (filename, os_dir))
1964
1965     if filename in constants.OS_SCRIPTS:
1966       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1967         return False, ("File '%s' under path '%s' is not executable" %
1968                        (filename, os_dir))
1969
1970   variants = []
1971   if constants.OS_VARIANTS_FILE in os_files:
1972     variants_file = os_files[constants.OS_VARIANTS_FILE]
1973     try:
1974       variants = utils.ReadFile(variants_file).splitlines()
1975     except EnvironmentError, err:
1976       return False, ("Error while reading the OS variants file at %s: %s" %
1977                      (variants_file, _ErrnoOrStr(err)))
1978     if not variants:
1979       return False, ("No supported os variant found")
1980
1981   parameters = []
1982   if constants.OS_PARAMETERS_FILE in os_files:
1983     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1984     try:
1985       parameters = utils.ReadFile(parameters_file).splitlines()
1986     except EnvironmentError, err:
1987       return False, ("Error while reading the OS parameters file at %s: %s" %
1988                      (parameters_file, _ErrnoOrStr(err)))
1989     parameters = [v.split(None, 1) for v in parameters]
1990
1991   os_obj = objects.OS(name=name, path=os_dir,
1992                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1993                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1994                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1995                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1996                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1997                                                  None),
1998                       supported_variants=variants,
1999                       supported_parameters=parameters,
2000                       api_versions=api_versions)
2001   return True, os_obj
2002
2003
2004 def OSFromDisk(name, base_dir=None):
2005   """Create an OS instance from disk.
2006
2007   This function will return an OS instance if the given name is a
2008   valid OS name. Otherwise, it will raise an appropriate
2009   L{RPCFail} exception, detailing why this is not a valid OS.
2010
2011   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
2012   an exception but returns true/false status data.
2013
2014   @type base_dir: string
2015   @keyword base_dir: Base directory containing OS installations.
2016                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2017   @rtype: L{objects.OS}
2018   @return: the OS instance if we find a valid one
2019   @raise RPCFail: if we don't find a valid OS
2020
2021   """
2022   name_only = objects.OS.GetName(name)
2023   status, payload = _TryOSFromDisk(name_only, base_dir)
2024
2025   if not status:
2026     _Fail(payload)
2027
2028   return payload
2029
2030
2031 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2032   """Calculate the basic environment for an os script.
2033
2034   @type os_name: str
2035   @param os_name: full operating system name (including variant)
2036   @type inst_os: L{objects.OS}
2037   @param inst_os: operating system for which the environment is being built
2038   @type os_params: dict
2039   @param os_params: the OS parameters
2040   @type debug: integer
2041   @param debug: debug level (0 or 1, for OS Api 10)
2042   @rtype: dict
2043   @return: dict of environment variables
2044   @raise errors.BlockDeviceError: if the block device
2045       cannot be found
2046
2047   """
2048   result = {}
2049   api_version = \
2050     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2051   result['OS_API_VERSION'] = '%d' % api_version
2052   result['OS_NAME'] = inst_os.name
2053   result['DEBUG_LEVEL'] = '%d' % debug
2054
2055   # OS variants
2056   if api_version >= constants.OS_API_V15:
2057     variant = objects.OS.GetVariant(os_name)
2058     if not variant:
2059       variant = inst_os.supported_variants[0]
2060     result['OS_VARIANT'] = variant
2061
2062   # OS params
2063   for pname, pvalue in os_params.items():
2064     result['OSP_%s' % pname.upper()] = pvalue
2065
2066   return result
2067
2068
2069 def OSEnvironment(instance, inst_os, debug=0):
2070   """Calculate the environment for an os script.
2071
2072   @type instance: L{objects.Instance}
2073   @param instance: target instance for the os script run
2074   @type inst_os: L{objects.OS}
2075   @param inst_os: operating system for which the environment is being built
2076   @type debug: integer
2077   @param debug: debug level (0 or 1, for OS Api 10)
2078   @rtype: dict
2079   @return: dict of environment variables
2080   @raise errors.BlockDeviceError: if the block device
2081       cannot be found
2082
2083   """
2084   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2085
2086   for attr in ["name", "os", "uuid", "ctime", "mtime"]:
2087     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2088
2089   result['HYPERVISOR'] = instance.hypervisor
2090   result['DISK_COUNT'] = '%d' % len(instance.disks)
2091   result['NIC_COUNT'] = '%d' % len(instance.nics)
2092
2093   # Disks
2094   for idx, disk in enumerate(instance.disks):
2095     real_disk = _OpenRealBD(disk)
2096     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2097     result['DISK_%d_ACCESS' % idx] = disk.mode
2098     if constants.HV_DISK_TYPE in instance.hvparams:
2099       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2100         instance.hvparams[constants.HV_DISK_TYPE]
2101     if disk.dev_type in constants.LDS_BLOCK:
2102       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2103     elif disk.dev_type == constants.LD_FILE:
2104       result['DISK_%d_BACKEND_TYPE' % idx] = \
2105         'file:%s' % disk.physical_id[0]
2106
2107   # NICs
2108   for idx, nic in enumerate(instance.nics):
2109     result['NIC_%d_MAC' % idx] = nic.mac
2110     if nic.ip:
2111       result['NIC_%d_IP' % idx] = nic.ip
2112     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2113     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2114       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2115     if nic.nicparams[constants.NIC_LINK]:
2116       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2117     if constants.HV_NIC_TYPE in instance.hvparams:
2118       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2119         instance.hvparams[constants.HV_NIC_TYPE]
2120
2121   # HV/BE params
2122   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2123     for key, value in source.items():
2124       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2125
2126   return result
2127
2128
2129 def BlockdevGrow(disk, amount):
2130   """Grow a stack of block devices.
2131
2132   This function is called recursively, with the childrens being the
2133   first ones to resize.
2134
2135   @type disk: L{objects.Disk}
2136   @param disk: the disk to be grown
2137   @rtype: (status, result)
2138   @return: a tuple with the status of the operation
2139       (True/False), and the errors message if status
2140       is False
2141
2142   """
2143   r_dev = _RecursiveFindBD(disk)
2144   if r_dev is None:
2145     _Fail("Cannot find block device %s", disk)
2146
2147   try:
2148     r_dev.Grow(amount)
2149   except errors.BlockDeviceError, err:
2150     _Fail("Failed to grow block device: %s", err, exc=True)
2151
2152
2153 def BlockdevSnapshot(disk):
2154   """Create a snapshot copy of a block device.
2155
2156   This function is called recursively, and the snapshot is actually created
2157   just for the leaf lvm backend device.
2158
2159   @type disk: L{objects.Disk}
2160   @param disk: the disk to be snapshotted
2161   @rtype: string
2162   @return: snapshot disk ID as (vg, lv)
2163
2164   """
2165   if disk.dev_type == constants.LD_DRBD8:
2166     if not disk.children:
2167       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2168             disk.unique_id)
2169     return BlockdevSnapshot(disk.children[0])
2170   elif disk.dev_type == constants.LD_LV:
2171     r_dev = _RecursiveFindBD(disk)
2172     if r_dev is not None:
2173       # FIXME: choose a saner value for the snapshot size
2174       # let's stay on the safe side and ask for the full size, for now
2175       return r_dev.Snapshot(disk.size)
2176     else:
2177       _Fail("Cannot find block device %s", disk)
2178   else:
2179     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2180           disk.unique_id, disk.dev_type)
2181
2182
2183 def FinalizeExport(instance, snap_disks):
2184   """Write out the export configuration information.
2185
2186   @type instance: L{objects.Instance}
2187   @param instance: the instance which we export, used for
2188       saving configuration
2189   @type snap_disks: list of L{objects.Disk}
2190   @param snap_disks: list of snapshot block devices, which
2191       will be used to get the actual name of the dump file
2192
2193   @rtype: None
2194
2195   """
2196   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2197   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2198
2199   config = objects.SerializableConfigParser()
2200
2201   config.add_section(constants.INISECT_EXP)
2202   config.set(constants.INISECT_EXP, 'version', '0')
2203   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2204   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2205   config.set(constants.INISECT_EXP, 'os', instance.os)
2206   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2207
2208   config.add_section(constants.INISECT_INS)
2209   config.set(constants.INISECT_INS, 'name', instance.name)
2210   config.set(constants.INISECT_INS, 'memory', '%d' %
2211              instance.beparams[constants.BE_MEMORY])
2212   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2213              instance.beparams[constants.BE_VCPUS])
2214   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2215   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2216
2217   nic_total = 0
2218   for nic_count, nic in enumerate(instance.nics):
2219     nic_total += 1
2220     config.set(constants.INISECT_INS, 'nic%d_mac' %
2221                nic_count, '%s' % nic.mac)
2222     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2223     for param in constants.NICS_PARAMETER_TYPES:
2224       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2225                  '%s' % nic.nicparams.get(param, None))
2226   # TODO: redundant: on load can read nics until it doesn't exist
2227   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2228
2229   disk_total = 0
2230   for disk_count, disk in enumerate(snap_disks):
2231     if disk:
2232       disk_total += 1
2233       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2234                  ('%s' % disk.iv_name))
2235       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2236                  ('%s' % disk.physical_id[1]))
2237       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2238                  ('%d' % disk.size))
2239
2240   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2241
2242   # New-style hypervisor/backend parameters
2243
2244   config.add_section(constants.INISECT_HYP)
2245   for name, value in instance.hvparams.items():
2246     if name not in constants.HVC_GLOBALS:
2247       config.set(constants.INISECT_HYP, name, str(value))
2248
2249   config.add_section(constants.INISECT_BEP)
2250   for name, value in instance.beparams.items():
2251     config.set(constants.INISECT_BEP, name, str(value))
2252
2253   config.add_section(constants.INISECT_OSP)
2254   for name, value in instance.osparams.items():
2255     config.set(constants.INISECT_OSP, name, str(value))
2256
2257   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2258                   data=config.Dumps())
2259   shutil.rmtree(finaldestdir, ignore_errors=True)
2260   shutil.move(destdir, finaldestdir)
2261
2262
2263 def ExportInfo(dest):
2264   """Get export configuration information.
2265
2266   @type dest: str
2267   @param dest: directory containing the export
2268
2269   @rtype: L{objects.SerializableConfigParser}
2270   @return: a serializable config file containing the
2271       export info
2272
2273   """
2274   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2275
2276   config = objects.SerializableConfigParser()
2277   config.read(cff)
2278
2279   if (not config.has_section(constants.INISECT_EXP) or
2280       not config.has_section(constants.INISECT_INS)):
2281     _Fail("Export info file doesn't have the required fields")
2282
2283   return config.Dumps()
2284
2285
2286 def ListExports():
2287   """Return a list of exports currently available on this machine.
2288
2289   @rtype: list
2290   @return: list of the exports
2291
2292   """
2293   if os.path.isdir(constants.EXPORT_DIR):
2294     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2295   else:
2296     _Fail("No exports directory")
2297
2298
2299 def RemoveExport(export):
2300   """Remove an existing export from the node.
2301
2302   @type export: str
2303   @param export: the name of the export to remove
2304   @rtype: None
2305
2306   """
2307   target = utils.PathJoin(constants.EXPORT_DIR, export)
2308
2309   try:
2310     shutil.rmtree(target)
2311   except EnvironmentError, err:
2312     _Fail("Error while removing the export: %s", err, exc=True)
2313
2314
2315 def BlockdevRename(devlist):
2316   """Rename a list of block devices.
2317
2318   @type devlist: list of tuples
2319   @param devlist: list of tuples of the form  (disk,
2320       new_logical_id, new_physical_id); disk is an
2321       L{objects.Disk} object describing the current disk,
2322       and new logical_id/physical_id is the name we
2323       rename it to
2324   @rtype: boolean
2325   @return: True if all renames succeeded, False otherwise
2326
2327   """
2328   msgs = []
2329   result = True
2330   for disk, unique_id in devlist:
2331     dev = _RecursiveFindBD(disk)
2332     if dev is None:
2333       msgs.append("Can't find device %s in rename" % str(disk))
2334       result = False
2335       continue
2336     try:
2337       old_rpath = dev.dev_path
2338       dev.Rename(unique_id)
2339       new_rpath = dev.dev_path
2340       if old_rpath != new_rpath:
2341         DevCacheManager.RemoveCache(old_rpath)
2342         # FIXME: we should add the new cache information here, like:
2343         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2344         # but we don't have the owner here - maybe parse from existing
2345         # cache? for now, we only lose lvm data when we rename, which
2346         # is less critical than DRBD or MD
2347     except errors.BlockDeviceError, err:
2348       msgs.append("Can't rename device '%s' to '%s': %s" %
2349                   (dev, unique_id, err))
2350       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2351       result = False
2352   if not result:
2353     _Fail("; ".join(msgs))
2354
2355
2356 def _TransformFileStorageDir(file_storage_dir):
2357   """Checks whether given file_storage_dir is valid.
2358
2359   Checks wheter the given file_storage_dir is within the cluster-wide
2360   default file_storage_dir stored in SimpleStore. Only paths under that
2361   directory are allowed.
2362
2363   @type file_storage_dir: str
2364   @param file_storage_dir: the path to check
2365
2366   @return: the normalized path if valid, None otherwise
2367
2368   """
2369   if not constants.ENABLE_FILE_STORAGE:
2370     _Fail("File storage disabled at configure time")
2371   cfg = _GetConfig()
2372   file_storage_dir = os.path.normpath(file_storage_dir)
2373   base_file_storage_dir = cfg.GetFileStorageDir()
2374   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2375       base_file_storage_dir):
2376     _Fail("File storage directory '%s' is not under base file"
2377           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2378   return file_storage_dir
2379
2380
2381 def CreateFileStorageDir(file_storage_dir):
2382   """Create file storage directory.
2383
2384   @type file_storage_dir: str
2385   @param file_storage_dir: directory to create
2386
2387   @rtype: tuple
2388   @return: tuple with first element a boolean indicating wheter dir
2389       creation was successful or not
2390
2391   """
2392   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2393   if os.path.exists(file_storage_dir):
2394     if not os.path.isdir(file_storage_dir):
2395       _Fail("Specified storage dir '%s' is not a directory",
2396             file_storage_dir)
2397   else:
2398     try:
2399       os.makedirs(file_storage_dir, 0750)
2400     except OSError, err:
2401       _Fail("Cannot create file storage directory '%s': %s",
2402             file_storage_dir, err, exc=True)
2403
2404
2405 def RemoveFileStorageDir(file_storage_dir):
2406   """Remove file storage directory.
2407
2408   Remove it only if it's empty. If not log an error and return.
2409
2410   @type file_storage_dir: str
2411   @param file_storage_dir: the directory we should cleanup
2412   @rtype: tuple (success,)
2413   @return: tuple of one element, C{success}, denoting
2414       whether the operation was successful
2415
2416   """
2417   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2418   if os.path.exists(file_storage_dir):
2419     if not os.path.isdir(file_storage_dir):
2420       _Fail("Specified Storage directory '%s' is not a directory",
2421             file_storage_dir)
2422     # deletes dir only if empty, otherwise we want to fail the rpc call
2423     try:
2424       os.rmdir(file_storage_dir)
2425     except OSError, err:
2426       _Fail("Cannot remove file storage directory '%s': %s",
2427             file_storage_dir, err)
2428
2429
2430 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2431   """Rename the file storage directory.
2432
2433   @type old_file_storage_dir: str
2434   @param old_file_storage_dir: the current path
2435   @type new_file_storage_dir: str
2436   @param new_file_storage_dir: the name we should rename to
2437   @rtype: tuple (success,)
2438   @return: tuple of one element, C{success}, denoting
2439       whether the operation was successful
2440
2441   """
2442   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2443   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2444   if not os.path.exists(new_file_storage_dir):
2445     if os.path.isdir(old_file_storage_dir):
2446       try:
2447         os.rename(old_file_storage_dir, new_file_storage_dir)
2448       except OSError, err:
2449         _Fail("Cannot rename '%s' to '%s': %s",
2450               old_file_storage_dir, new_file_storage_dir, err)
2451     else:
2452       _Fail("Specified storage dir '%s' is not a directory",
2453             old_file_storage_dir)
2454   else:
2455     if os.path.exists(old_file_storage_dir):
2456       _Fail("Cannot rename '%s' to '%s': both locations exist",
2457             old_file_storage_dir, new_file_storage_dir)
2458
2459
2460 def _EnsureJobQueueFile(file_name):
2461   """Checks whether the given filename is in the queue directory.
2462
2463   @type file_name: str
2464   @param file_name: the file name we should check
2465   @rtype: None
2466   @raises RPCFail: if the file is not valid
2467
2468   """
2469   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2470   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2471
2472   if not result:
2473     _Fail("Passed job queue file '%s' does not belong to"
2474           " the queue directory '%s'", file_name, queue_dir)
2475
2476
2477 def JobQueueUpdate(file_name, content):
2478   """Updates a file in the queue directory.
2479
2480   This is just a wrapper over L{utils.WriteFile}, with proper
2481   checking.
2482
2483   @type file_name: str
2484   @param file_name: the job file name
2485   @type content: str
2486   @param content: the new job contents
2487   @rtype: boolean
2488   @return: the success of the operation
2489
2490   """
2491   _EnsureJobQueueFile(file_name)
2492   getents = runtime.GetEnts()
2493
2494   # Write and replace the file atomically
2495   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2496                   gid=getents.masterd_gid)
2497
2498
2499 def JobQueueRename(old, new):
2500   """Renames a job queue file.
2501
2502   This is just a wrapper over os.rename with proper checking.
2503
2504   @type old: str
2505   @param old: the old (actual) file name
2506   @type new: str
2507   @param new: the desired file name
2508   @rtype: tuple
2509   @return: the success of the operation and payload
2510
2511   """
2512   _EnsureJobQueueFile(old)
2513   _EnsureJobQueueFile(new)
2514
2515   utils.RenameFile(old, new, mkdir=True)
2516
2517
2518 def BlockdevClose(instance_name, disks):
2519   """Closes the given block devices.
2520
2521   This means they will be switched to secondary mode (in case of
2522   DRBD).
2523
2524   @param instance_name: if the argument is not empty, the symlinks
2525       of this instance will be removed
2526   @type disks: list of L{objects.Disk}
2527   @param disks: the list of disks to be closed
2528   @rtype: tuple (success, message)
2529   @return: a tuple of success and message, where success
2530       indicates the succes of the operation, and message
2531       which will contain the error details in case we
2532       failed
2533
2534   """
2535   bdevs = []
2536   for cf in disks:
2537     rd = _RecursiveFindBD(cf)
2538     if rd is None:
2539       _Fail("Can't find device %s", cf)
2540     bdevs.append(rd)
2541
2542   msg = []
2543   for rd in bdevs:
2544     try:
2545       rd.Close()
2546     except errors.BlockDeviceError, err:
2547       msg.append(str(err))
2548   if msg:
2549     _Fail("Can't make devices secondary: %s", ",".join(msg))
2550   else:
2551     if instance_name:
2552       _RemoveBlockDevLinks(instance_name, disks)
2553
2554
2555 def ValidateHVParams(hvname, hvparams):
2556   """Validates the given hypervisor parameters.
2557
2558   @type hvname: string
2559   @param hvname: the hypervisor name
2560   @type hvparams: dict
2561   @param hvparams: the hypervisor parameters to be validated
2562   @rtype: None
2563
2564   """
2565   try:
2566     hv_type = hypervisor.GetHypervisor(hvname)
2567     hv_type.ValidateParameters(hvparams)
2568   except errors.HypervisorError, err:
2569     _Fail(str(err), log=False)
2570
2571
2572 def _CheckOSPList(os_obj, parameters):
2573   """Check whether a list of parameters is supported by the OS.
2574
2575   @type os_obj: L{objects.OS}
2576   @param os_obj: OS object to check
2577   @type parameters: list
2578   @param parameters: the list of parameters to check
2579
2580   """
2581   supported = [v[0] for v in os_obj.supported_parameters]
2582   delta = frozenset(parameters).difference(supported)
2583   if delta:
2584     _Fail("The following parameters are not supported"
2585           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2586
2587
2588 def ValidateOS(required, osname, checks, osparams):
2589   """Validate the given OS' parameters.
2590
2591   @type required: boolean
2592   @param required: whether absence of the OS should translate into
2593       failure or not
2594   @type osname: string
2595   @param osname: the OS to be validated
2596   @type checks: list
2597   @param checks: list of the checks to run (currently only 'parameters')
2598   @type osparams: dict
2599   @param osparams: dictionary with OS parameters
2600   @rtype: boolean
2601   @return: True if the validation passed, or False if the OS was not
2602       found and L{required} was false
2603
2604   """
2605   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2606     _Fail("Unknown checks required for OS %s: %s", osname,
2607           set(checks).difference(constants.OS_VALIDATE_CALLS))
2608
2609   name_only = objects.OS.GetName(osname)
2610   status, tbv = _TryOSFromDisk(name_only, None)
2611
2612   if not status:
2613     if required:
2614       _Fail(tbv)
2615     else:
2616       return False
2617
2618   if max(tbv.api_versions) < constants.OS_API_V20:
2619     return True
2620
2621   if constants.OS_VALIDATE_PARAMETERS in checks:
2622     _CheckOSPList(tbv, osparams.keys())
2623
2624   validate_env = OSCoreEnv(osname, tbv, osparams)
2625   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2626                         cwd=tbv.path)
2627   if result.failed:
2628     logging.error("os validate command '%s' returned error: %s output: %s",
2629                   result.cmd, result.fail_reason, result.output)
2630     _Fail("OS validation script failed (%s), output: %s",
2631           result.fail_reason, result.output, log=False)
2632
2633   return True
2634
2635
2636 def DemoteFromMC():
2637   """Demotes the current node from master candidate role.
2638
2639   """
2640   # try to ensure we're not the master by mistake
2641   master, myself = ssconf.GetMasterAndMyself()
2642   if master == myself:
2643     _Fail("ssconf status shows I'm the master node, will not demote")
2644
2645   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2646   if not result.failed:
2647     _Fail("The master daemon is running, will not demote")
2648
2649   try:
2650     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2651       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2652   except EnvironmentError, err:
2653     if err.errno != errno.ENOENT:
2654       _Fail("Error while backing up cluster file: %s", err, exc=True)
2655
2656   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2657
2658
2659 def _GetX509Filenames(cryptodir, name):
2660   """Returns the full paths for the private key and certificate.
2661
2662   """
2663   return (utils.PathJoin(cryptodir, name),
2664           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2665           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2666
2667
2668 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2669   """Creates a new X509 certificate for SSL/TLS.
2670
2671   @type validity: int
2672   @param validity: Validity in seconds
2673   @rtype: tuple; (string, string)
2674   @return: Certificate name and public part
2675
2676   """
2677   (key_pem, cert_pem) = \
2678     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2679                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2680
2681   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2682                               prefix="x509-%s-" % utils.TimestampForFilename())
2683   try:
2684     name = os.path.basename(cert_dir)
2685     assert len(name) > 5
2686
2687     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2688
2689     utils.WriteFile(key_file, mode=0400, data=key_pem)
2690     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2691
2692     # Never return private key as it shouldn't leave the node
2693     return (name, cert_pem)
2694   except Exception:
2695     shutil.rmtree(cert_dir, ignore_errors=True)
2696     raise
2697
2698
2699 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2700   """Removes a X509 certificate.
2701
2702   @type name: string
2703   @param name: Certificate name
2704
2705   """
2706   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2707
2708   utils.RemoveFile(key_file)
2709   utils.RemoveFile(cert_file)
2710
2711   try:
2712     os.rmdir(cert_dir)
2713   except EnvironmentError, err:
2714     _Fail("Cannot remove certificate directory '%s': %s",
2715           cert_dir, err)
2716
2717
2718 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2719   """Returns the command for the requested input/output.
2720
2721   @type instance: L{objects.Instance}
2722   @param instance: The instance object
2723   @param mode: Import/export mode
2724   @param ieio: Input/output type
2725   @param ieargs: Input/output arguments
2726
2727   """
2728   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2729
2730   env = None
2731   prefix = None
2732   suffix = None
2733   exp_size = None
2734
2735   if ieio == constants.IEIO_FILE:
2736     (filename, ) = ieargs
2737
2738     if not utils.IsNormAbsPath(filename):
2739       _Fail("Path '%s' is not normalized or absolute", filename)
2740
2741     directory = os.path.normpath(os.path.dirname(filename))
2742
2743     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2744         constants.EXPORT_DIR):
2745       _Fail("File '%s' is not under exports directory '%s'",
2746             filename, constants.EXPORT_DIR)
2747
2748     # Create directory
2749     utils.Makedirs(directory, mode=0750)
2750
2751     quoted_filename = utils.ShellQuote(filename)
2752
2753     if mode == constants.IEM_IMPORT:
2754       suffix = "> %s" % quoted_filename
2755     elif mode == constants.IEM_EXPORT:
2756       suffix = "< %s" % quoted_filename
2757
2758       # Retrieve file size
2759       try:
2760         st = os.stat(filename)
2761       except EnvironmentError, err:
2762         logging.error("Can't stat(2) %s: %s", filename, err)
2763       else:
2764         exp_size = utils.BytesToMebibyte(st.st_size)
2765
2766   elif ieio == constants.IEIO_RAW_DISK:
2767     (disk, ) = ieargs
2768
2769     real_disk = _OpenRealBD(disk)
2770
2771     if mode == constants.IEM_IMPORT:
2772       # we set here a smaller block size as, due to transport buffering, more
2773       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2774       # is not already there or we pass a wrong path; we use notrunc to no
2775       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2776       # much memory; this means that at best, we flush every 64k, which will
2777       # not be very fast
2778       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2779                                     " bs=%s oflag=dsync"),
2780                                     real_disk.dev_path,
2781                                     str(64 * 1024))
2782
2783     elif mode == constants.IEM_EXPORT:
2784       # the block size on the read dd is 1MiB to match our units
2785       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2786                                    real_disk.dev_path,
2787                                    str(1024 * 1024), # 1 MB
2788                                    str(disk.size))
2789       exp_size = disk.size
2790
2791   elif ieio == constants.IEIO_SCRIPT:
2792     (disk, disk_index, ) = ieargs
2793
2794     assert isinstance(disk_index, (int, long))
2795
2796     real_disk = _OpenRealBD(disk)
2797
2798     inst_os = OSFromDisk(instance.os)
2799     env = OSEnvironment(instance, inst_os)
2800
2801     if mode == constants.IEM_IMPORT:
2802       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2803       env["IMPORT_INDEX"] = str(disk_index)
2804       script = inst_os.import_script
2805
2806     elif mode == constants.IEM_EXPORT:
2807       env["EXPORT_DEVICE"] = real_disk.dev_path
2808       env["EXPORT_INDEX"] = str(disk_index)
2809       script = inst_os.export_script
2810
2811     # TODO: Pass special environment only to script
2812     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2813
2814     if mode == constants.IEM_IMPORT:
2815       suffix = "| %s" % script_cmd
2816
2817     elif mode == constants.IEM_EXPORT:
2818       prefix = "%s |" % script_cmd
2819
2820     # Let script predict size
2821     exp_size = constants.IE_CUSTOM_SIZE
2822
2823   else:
2824     _Fail("Invalid %s I/O mode %r", mode, ieio)
2825
2826   return (env, prefix, suffix, exp_size)
2827
2828
2829 def _CreateImportExportStatusDir(prefix):
2830   """Creates status directory for import/export.
2831
2832   """
2833   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2834                           prefix=("%s-%s-" %
2835                                   (prefix, utils.TimestampForFilename())))
2836
2837
2838 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2839   """Starts an import or export daemon.
2840
2841   @param mode: Import/output mode
2842   @type opts: L{objects.ImportExportOptions}
2843   @param opts: Daemon options
2844   @type host: string
2845   @param host: Remote host for export (None for import)
2846   @type port: int
2847   @param port: Remote port for export (None for import)
2848   @type instance: L{objects.Instance}
2849   @param instance: Instance object
2850   @param ieio: Input/output type
2851   @param ieioargs: Input/output arguments
2852
2853   """
2854   if mode == constants.IEM_IMPORT:
2855     prefix = "import"
2856
2857     if not (host is None and port is None):
2858       _Fail("Can not specify host or port on import")
2859
2860   elif mode == constants.IEM_EXPORT:
2861     prefix = "export"
2862
2863     if host is None or port is None:
2864       _Fail("Host and port must be specified for an export")
2865
2866   else:
2867     _Fail("Invalid mode %r", mode)
2868
2869   if (opts.key_name is None) ^ (opts.ca_pem is None):
2870     _Fail("Cluster certificate can only be used for both key and CA")
2871
2872   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2873     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2874
2875   if opts.key_name is None:
2876     # Use server.pem
2877     key_path = constants.NODED_CERT_FILE
2878     cert_path = constants.NODED_CERT_FILE
2879     assert opts.ca_pem is None
2880   else:
2881     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2882                                                  opts.key_name)
2883     assert opts.ca_pem is not None
2884
2885   for i in [key_path, cert_path]:
2886     if not os.path.exists(i):
2887       _Fail("File '%s' does not exist" % i)
2888
2889   status_dir = _CreateImportExportStatusDir(prefix)
2890   try:
2891     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2892     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2893     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2894
2895     if opts.ca_pem is None:
2896       # Use server.pem
2897       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2898     else:
2899       ca = opts.ca_pem
2900
2901     # Write CA file
2902     utils.WriteFile(ca_file, data=ca, mode=0400)
2903
2904     cmd = [
2905       constants.IMPORT_EXPORT_DAEMON,
2906       status_file, mode,
2907       "--key=%s" % key_path,
2908       "--cert=%s" % cert_path,
2909       "--ca=%s" % ca_file,
2910       ]
2911
2912     if host:
2913       cmd.append("--host=%s" % host)
2914
2915     if port:
2916       cmd.append("--port=%s" % port)
2917
2918     if opts.ipv6:
2919       cmd.append("--ipv6")
2920     else:
2921       cmd.append("--ipv4")
2922
2923     if opts.compress:
2924       cmd.append("--compress=%s" % opts.compress)
2925
2926     if opts.magic:
2927       cmd.append("--magic=%s" % opts.magic)
2928
2929     if exp_size is not None:
2930       cmd.append("--expected-size=%s" % exp_size)
2931
2932     if cmd_prefix:
2933       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2934
2935     if cmd_suffix:
2936       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2937
2938     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2939
2940     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2941     # support for receiving a file descriptor for output
2942     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2943                       output=logfile)
2944
2945     # The import/export name is simply the status directory name
2946     return os.path.basename(status_dir)
2947
2948   except Exception:
2949     shutil.rmtree(status_dir, ignore_errors=True)
2950     raise
2951
2952
2953 def GetImportExportStatus(names):
2954   """Returns import/export daemon status.
2955
2956   @type names: sequence
2957   @param names: List of names
2958   @rtype: List of dicts
2959   @return: Returns a list of the state of each named import/export or None if a
2960            status couldn't be read
2961
2962   """
2963   result = []
2964
2965   for name in names:
2966     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2967                                  _IES_STATUS_FILE)
2968
2969     try:
2970       data = utils.ReadFile(status_file)
2971     except EnvironmentError, err:
2972       if err.errno != errno.ENOENT:
2973         raise
2974       data = None
2975
2976     if not data:
2977       result.append(None)
2978       continue
2979
2980     result.append(serializer.LoadJson(data))
2981
2982   return result
2983
2984
2985 def AbortImportExport(name):
2986   """Sends SIGTERM to a running import/export daemon.
2987
2988   """
2989   logging.info("Abort import/export %s", name)
2990
2991   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2992   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2993
2994   if pid:
2995     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2996                  name, pid)
2997     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2998
2999
3000 def CleanupImportExport(name):
3001   """Cleanup after an import or export.
3002
3003   If the import/export daemon is still running it's killed. Afterwards the
3004   whole status directory is removed.
3005
3006   """
3007   logging.info("Finalizing import/export %s", name)
3008
3009   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
3010
3011   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
3012
3013   if pid:
3014     logging.info("Import/export %s is still running with PID %s",
3015                  name, pid)
3016     utils.KillProcess(pid, waitpid=False)
3017
3018   shutil.rmtree(status_dir, ignore_errors=True)
3019
3020
3021 def _FindDisks(nodes_ip, disks):
3022   """Sets the physical ID on disks and returns the block devices.
3023
3024   """
3025   # set the correct physical ID
3026   my_name = netutils.Hostname.GetSysName()
3027   for cf in disks:
3028     cf.SetPhysicalID(my_name, nodes_ip)
3029
3030   bdevs = []
3031
3032   for cf in disks:
3033     rd = _RecursiveFindBD(cf)
3034     if rd is None:
3035       _Fail("Can't find device %s", cf)
3036     bdevs.append(rd)
3037   return bdevs
3038
3039
3040 def DrbdDisconnectNet(nodes_ip, disks):
3041   """Disconnects the network on a list of drbd devices.
3042
3043   """
3044   bdevs = _FindDisks(nodes_ip, disks)
3045
3046   # disconnect disks
3047   for rd in bdevs:
3048     try:
3049       rd.DisconnectNet()
3050     except errors.BlockDeviceError, err:
3051       _Fail("Can't change network configuration to standalone mode: %s",
3052             err, exc=True)
3053
3054
3055 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3056   """Attaches the network on a list of drbd devices.
3057
3058   """
3059   bdevs = _FindDisks(nodes_ip, disks)
3060
3061   if multimaster:
3062     for idx, rd in enumerate(bdevs):
3063       try:
3064         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3065       except EnvironmentError, err:
3066         _Fail("Can't create symlink: %s", err)
3067   # reconnect disks, switch to new master configuration and if
3068   # needed primary mode
3069   for rd in bdevs:
3070     try:
3071       rd.AttachNet(multimaster)
3072     except errors.BlockDeviceError, err:
3073       _Fail("Can't change network configuration: %s", err)
3074
3075   # wait until the disks are connected; we need to retry the re-attach
3076   # if the device becomes standalone, as this might happen if the one
3077   # node disconnects and reconnects in a different mode before the
3078   # other node reconnects; in this case, one or both of the nodes will
3079   # decide it has wrong configuration and switch to standalone
3080
3081   def _Attach():
3082     all_connected = True
3083
3084     for rd in bdevs:
3085       stats = rd.GetProcStatus()
3086
3087       all_connected = (all_connected and
3088                        (stats.is_connected or stats.is_in_resync))
3089
3090       if stats.is_standalone:
3091         # peer had different config info and this node became
3092         # standalone, even though this should not happen with the
3093         # new staged way of changing disk configs
3094         try:
3095           rd.AttachNet(multimaster)
3096         except errors.BlockDeviceError, err:
3097           _Fail("Can't change network configuration: %s", err)
3098
3099     if not all_connected:
3100       raise utils.RetryAgain()
3101
3102   try:
3103     # Start with a delay of 100 miliseconds and go up to 5 seconds
3104     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3105   except utils.RetryTimeout:
3106     _Fail("Timeout in disk reconnecting")
3107
3108   if multimaster:
3109     # change to primary mode
3110     for rd in bdevs:
3111       try:
3112         rd.Open()
3113       except errors.BlockDeviceError, err:
3114         _Fail("Can't change to primary mode: %s", err)
3115
3116
3117 def DrbdWaitSync(nodes_ip, disks):
3118   """Wait until DRBDs have synchronized.
3119
3120   """
3121   def _helper(rd):
3122     stats = rd.GetProcStatus()
3123     if not (stats.is_connected or stats.is_in_resync):
3124       raise utils.RetryAgain()
3125     return stats
3126
3127   bdevs = _FindDisks(nodes_ip, disks)
3128
3129   min_resync = 100
3130   alldone = True
3131   for rd in bdevs:
3132     try:
3133       # poll each second for 15 seconds
3134       stats = utils.Retry(_helper, 1, 15, args=[rd])
3135     except utils.RetryTimeout:
3136       stats = rd.GetProcStatus()
3137       # last check
3138       if not (stats.is_connected or stats.is_in_resync):
3139         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3140     alldone = alldone and (not stats.is_in_resync)
3141     if stats.sync_percent is not None:
3142       min_resync = min(min_resync, stats.sync_percent)
3143
3144   return (alldone, min_resync)
3145
3146
3147 def GetDrbdUsermodeHelper():
3148   """Returns DRBD usermode helper currently configured.
3149
3150   """
3151   try:
3152     return bdev.BaseDRBD.GetUsermodeHelper()
3153   except errors.BlockDeviceError, err:
3154     _Fail(str(err))
3155
3156
3157 def PowercycleNode(hypervisor_type):
3158   """Hard-powercycle the node.
3159
3160   Because we need to return first, and schedule the powercycle in the
3161   background, we won't be able to report failures nicely.
3162
3163   """
3164   hyper = hypervisor.GetHypervisor(hypervisor_type)
3165   try:
3166     pid = os.fork()
3167   except OSError:
3168     # if we can't fork, we'll pretend that we're in the child process
3169     pid = 0
3170   if pid > 0:
3171     return "Reboot scheduled in 5 seconds"
3172   # ensure the child is running on ram
3173   try:
3174     utils.Mlockall()
3175   except Exception: # pylint: disable-msg=W0703
3176     pass
3177   time.sleep(5)
3178   hyper.PowercycleNode()
3179
3180
3181 class HooksRunner(object):
3182   """Hook runner.
3183
3184   This class is instantiated on the node side (ganeti-noded) and not
3185   on the master side.
3186
3187   """
3188   def __init__(self, hooks_base_dir=None):
3189     """Constructor for hooks runner.
3190
3191     @type hooks_base_dir: str or None
3192     @param hooks_base_dir: if not None, this overrides the
3193         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3194
3195     """
3196     if hooks_base_dir is None:
3197       hooks_base_dir = constants.HOOKS_BASE_DIR
3198     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3199     # constant
3200     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3201
3202   def RunHooks(self, hpath, phase, env):
3203     """Run the scripts in the hooks directory.
3204
3205     @type hpath: str
3206     @param hpath: the path to the hooks directory which
3207         holds the scripts
3208     @type phase: str
3209     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3210         L{constants.HOOKS_PHASE_POST}
3211     @type env: dict
3212     @param env: dictionary with the environment for the hook
3213     @rtype: list
3214     @return: list of 3-element tuples:
3215       - script path
3216       - script result, either L{constants.HKR_SUCCESS} or
3217         L{constants.HKR_FAIL}
3218       - output of the script
3219
3220     @raise errors.ProgrammerError: for invalid input
3221         parameters
3222
3223     """
3224     if phase == constants.HOOKS_PHASE_PRE:
3225       suffix = "pre"
3226     elif phase == constants.HOOKS_PHASE_POST:
3227       suffix = "post"
3228     else:
3229       _Fail("Unknown hooks phase '%s'", phase)
3230
3231
3232     subdir = "%s-%s.d" % (hpath, suffix)
3233     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3234
3235     results = []
3236
3237     if not os.path.isdir(dir_name):
3238       # for non-existing/non-dirs, we simply exit instead of logging a
3239       # warning at every operation
3240       return results
3241
3242     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3243
3244     for (relname, relstatus, runresult)  in runparts_results:
3245       if relstatus == constants.RUNPARTS_SKIP:
3246         rrval = constants.HKR_SKIP
3247         output = ""
3248       elif relstatus == constants.RUNPARTS_ERR:
3249         rrval = constants.HKR_FAIL
3250         output = "Hook script execution error: %s" % runresult
3251       elif relstatus == constants.RUNPARTS_RUN:
3252         if runresult.failed:
3253           rrval = constants.HKR_FAIL
3254         else:
3255           rrval = constants.HKR_SUCCESS
3256         output = utils.SafeEncode(runresult.output.strip())
3257       results.append(("%s/%s" % (subdir, relname), rrval, output))
3258
3259     return results
3260
3261
3262 class IAllocatorRunner(object):
3263   """IAllocator runner.
3264
3265   This class is instantiated on the node side (ganeti-noded) and not on
3266   the master side.
3267
3268   """
3269   @staticmethod
3270   def Run(name, idata):
3271     """Run an iallocator script.
3272
3273     @type name: str
3274     @param name: the iallocator script name
3275     @type idata: str
3276     @param idata: the allocator input data
3277
3278     @rtype: tuple
3279     @return: two element tuple of:
3280        - status
3281        - either error message or stdout of allocator (for success)
3282
3283     """
3284     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3285                                   os.path.isfile)
3286     if alloc_script is None:
3287       _Fail("iallocator module '%s' not found in the search path", name)
3288
3289     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3290     try:
3291       os.write(fd, idata)
3292       os.close(fd)
3293       result = utils.RunCmd([alloc_script, fin_name])
3294       if result.failed:
3295         _Fail("iallocator module '%s' failed: %s, output '%s'",
3296               name, result.fail_reason, result.output)
3297     finally:
3298       os.unlink(fin_name)
3299
3300     return result.stdout
3301
3302
3303 class DevCacheManager(object):
3304   """Simple class for managing a cache of block device information.
3305
3306   """
3307   _DEV_PREFIX = "/dev/"
3308   _ROOT_DIR = constants.BDEV_CACHE_DIR
3309
3310   @classmethod
3311   def _ConvertPath(cls, dev_path):
3312     """Converts a /dev/name path to the cache file name.
3313
3314     This replaces slashes with underscores and strips the /dev
3315     prefix. It then returns the full path to the cache file.
3316
3317     @type dev_path: str
3318     @param dev_path: the C{/dev/} path name
3319     @rtype: str
3320     @return: the converted path name
3321
3322     """
3323     if dev_path.startswith(cls._DEV_PREFIX):
3324       dev_path = dev_path[len(cls._DEV_PREFIX):]
3325     dev_path = dev_path.replace("/", "_")
3326     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3327     return fpath
3328
3329   @classmethod
3330   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3331     """Updates the cache information for a given device.
3332
3333     @type dev_path: str
3334     @param dev_path: the pathname of the device
3335     @type owner: str
3336     @param owner: the owner (instance name) of the device
3337     @type on_primary: bool
3338     @param on_primary: whether this is the primary
3339         node nor not
3340     @type iv_name: str
3341     @param iv_name: the instance-visible name of the
3342         device, as in objects.Disk.iv_name
3343
3344     @rtype: None
3345
3346     """
3347     if dev_path is None:
3348       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3349       return
3350     fpath = cls._ConvertPath(dev_path)
3351     if on_primary:
3352       state = "primary"
3353     else:
3354       state = "secondary"
3355     if iv_name is None:
3356       iv_name = "not_visible"
3357     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3358     try:
3359       utils.WriteFile(fpath, data=fdata)
3360     except EnvironmentError, err:
3361       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3362
3363   @classmethod
3364   def RemoveCache(cls, dev_path):
3365     """Remove data for a dev_path.
3366
3367     This is just a wrapper over L{utils.RemoveFile} with a converted
3368     path name and logging.
3369
3370     @type dev_path: str
3371     @param dev_path: the pathname of the device
3372
3373     @rtype: None
3374
3375     """
3376     if dev_path is None:
3377       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3378       return
3379     fpath = cls._ConvertPath(dev_path)
3380     try:
3381       utils.RemoveFile(fpath)
3382     except EnvironmentError, err:
3383       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)