objects: Add custom de-/serializing code for query responses
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79 #: Valid LVS output line regex
80 _LVSLINE_REGEX = re.compile("^ *([^|]+)\|([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
81
82
83 class RPCFail(Exception):
84   """Class denoting RPC failure.
85
86   Its argument is the error message.
87
88   """
89
90
91 def _Fail(msg, *args, **kwargs):
92   """Log an error and the raise an RPCFail exception.
93
94   This exception is then handled specially in the ganeti daemon and
95   turned into a 'failed' return type. As such, this function is a
96   useful shortcut for logging the error and returning it to the master
97   daemon.
98
99   @type msg: string
100   @param msg: the text of the exception
101   @raise RPCFail
102
103   """
104   if args:
105     msg = msg % args
106   if "log" not in kwargs or kwargs["log"]: # if we should log this error
107     if "exc" in kwargs and kwargs["exc"]:
108       logging.exception(msg)
109     else:
110       logging.error(msg)
111   raise RPCFail(msg)
112
113
114 def _GetConfig():
115   """Simple wrapper to return a SimpleStore.
116
117   @rtype: L{ssconf.SimpleStore}
118   @return: a SimpleStore instance
119
120   """
121   return ssconf.SimpleStore()
122
123
124 def _GetSshRunner(cluster_name):
125   """Simple wrapper to return an SshRunner.
126
127   @type cluster_name: str
128   @param cluster_name: the cluster name, which is needed
129       by the SshRunner constructor
130   @rtype: L{ssh.SshRunner}
131   @return: an SshRunner instance
132
133   """
134   return ssh.SshRunner(cluster_name)
135
136
137 def _Decompress(data):
138   """Unpacks data compressed by the RPC client.
139
140   @type data: list or tuple
141   @param data: Data sent by RPC client
142   @rtype: str
143   @return: Decompressed data
144
145   """
146   assert isinstance(data, (list, tuple))
147   assert len(data) == 2
148   (encoding, content) = data
149   if encoding == constants.RPC_ENCODING_NONE:
150     return content
151   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
152     return zlib.decompress(base64.b64decode(content))
153   else:
154     raise AssertionError("Unknown data encoding")
155
156
157 def _CleanDirectory(path, exclude=None):
158   """Removes all regular files in a directory.
159
160   @type path: str
161   @param path: the directory to clean
162   @type exclude: list
163   @param exclude: list of files to be excluded, defaults
164       to the empty list
165
166   """
167   if path not in _ALLOWED_CLEAN_DIRS:
168     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
169           path)
170
171   if not os.path.isdir(path):
172     return
173   if exclude is None:
174     exclude = []
175   else:
176     # Normalize excluded paths
177     exclude = [os.path.normpath(i) for i in exclude]
178
179   for rel_name in utils.ListVisibleFiles(path):
180     full_name = utils.PathJoin(path, rel_name)
181     if full_name in exclude:
182       continue
183     if os.path.isfile(full_name) and not os.path.islink(full_name):
184       utils.RemoveFile(full_name)
185
186
187 def _BuildUploadFileList():
188   """Build the list of allowed upload files.
189
190   This is abstracted so that it's built only once at module import time.
191
192   """
193   allowed_files = set([
194     constants.CLUSTER_CONF_FILE,
195     constants.ETC_HOSTS,
196     constants.SSH_KNOWN_HOSTS_FILE,
197     constants.VNC_PASSWORD_FILE,
198     constants.RAPI_CERT_FILE,
199     constants.RAPI_USERS_FILE,
200     constants.CONFD_HMAC_KEY,
201     constants.CLUSTER_DOMAIN_SECRET_FILE,
202     ])
203
204   for hv_name in constants.HYPER_TYPES:
205     hv_class = hypervisor.GetHypervisorClass(hv_name)
206     allowed_files.update(hv_class.GetAncillaryFiles())
207
208   return frozenset(allowed_files)
209
210
211 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
212
213
214 def JobQueuePurge():
215   """Removes job queue files and archived jobs.
216
217   @rtype: tuple
218   @return: True, None
219
220   """
221   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
222   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
223
224
225 def GetMasterInfo():
226   """Returns master information.
227
228   This is an utility function to compute master information, either
229   for consumption here or from the node daemon.
230
231   @rtype: tuple
232   @return: master_netdev, master_ip, master_name, primary_ip_family
233   @raise RPCFail: in case of errors
234
235   """
236   try:
237     cfg = _GetConfig()
238     master_netdev = cfg.GetMasterNetdev()
239     master_ip = cfg.GetMasterIP()
240     master_node = cfg.GetMasterNode()
241     primary_ip_family = cfg.GetPrimaryIPFamily()
242   except errors.ConfigurationError, err:
243     _Fail("Cluster configuration incomplete: %s", err, exc=True)
244   return (master_netdev, master_ip, master_node, primary_ip_family)
245
246
247 def StartMaster(start_daemons, no_voting):
248   """Activate local node as master node.
249
250   The function will either try activate the IP address of the master
251   (unless someone else has it) or also start the master daemons, based
252   on the start_daemons parameter.
253
254   @type start_daemons: boolean
255   @param start_daemons: whether to start the master daemons
256       (ganeti-masterd and ganeti-rapi), or (if false) activate the
257       master ip
258   @type no_voting: boolean
259   @param no_voting: whether to start ganeti-masterd without a node vote
260       (if start_daemons is True), but still non-interactively
261   @rtype: None
262
263   """
264   # GetMasterInfo will raise an exception if not able to return data
265   master_netdev, master_ip, _, family = GetMasterInfo()
266
267   err_msgs = []
268   # either start the master and rapi daemons
269   if start_daemons:
270     if no_voting:
271       masterd_args = "--no-voting --yes-do-it"
272     else:
273       masterd_args = ""
274
275     env = {
276       "EXTRA_MASTERD_ARGS": masterd_args,
277       }
278
279     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
280     if result.failed:
281       msg = "Can't start Ganeti master: %s" % result.output
282       logging.error(msg)
283       err_msgs.append(msg)
284   # or activate the IP
285   else:
286     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
287       if netutils.IPAddress.Own(master_ip):
288         # we already have the ip:
289         logging.debug("Master IP already configured, doing nothing")
290       else:
291         msg = "Someone else has the master ip, not activating"
292         logging.error(msg)
293         err_msgs.append(msg)
294     else:
295       ipcls = netutils.IP4Address
296       if family == netutils.IP6Address.family:
297         ipcls = netutils.IP6Address
298
299       result = utils.RunCmd(["ip", "address", "add",
300                              "%s/%d" % (master_ip, ipcls.iplen),
301                              "dev", master_netdev, "label",
302                              "%s:0" % master_netdev])
303       if result.failed:
304         msg = "Can't activate master IP: %s" % result.output
305         logging.error(msg)
306         err_msgs.append(msg)
307
308       # we ignore the exit code of the following cmds
309       if ipcls == netutils.IP4Address:
310         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
311                       master_ip, master_ip])
312       elif ipcls == netutils.IP6Address:
313         try:
314           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
315         except errors.OpExecError:
316           # TODO: Better error reporting
317           logging.warning("Can't execute ndisc6, please install if missing")
318
319   if err_msgs:
320     _Fail("; ".join(err_msgs))
321
322
323 def StopMaster(stop_daemons):
324   """Deactivate this node as master.
325
326   The function will always try to deactivate the IP address of the
327   master. It will also stop the master daemons depending on the
328   stop_daemons parameter.
329
330   @type stop_daemons: boolean
331   @param stop_daemons: whether to also stop the master daemons
332       (ganeti-masterd and ganeti-rapi)
333   @rtype: None
334
335   """
336   # TODO: log and report back to the caller the error failures; we
337   # need to decide in which case we fail the RPC for this
338
339   # GetMasterInfo will raise an exception if not able to return data
340   master_netdev, master_ip, _, family = GetMasterInfo()
341
342   ipcls = netutils.IP4Address
343   if family == netutils.IP6Address.family:
344     ipcls = netutils.IP6Address
345
346   result = utils.RunCmd(["ip", "address", "del",
347                          "%s/%d" % (master_ip, ipcls.iplen),
348                          "dev", master_netdev])
349   if result.failed:
350     logging.error("Can't remove the master IP, error: %s", result.output)
351     # but otherwise ignore the failure
352
353   if stop_daemons:
354     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
355     if result.failed:
356       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
357                     " and error %s",
358                     result.cmd, result.exit_code, result.output)
359
360
361 def EtcHostsModify(mode, host, ip):
362   """Modify a host entry in /etc/hosts.
363
364   @param mode: The mode to operate. Either add or remove entry
365   @param host: The host to operate on
366   @param ip: The ip associated with the entry
367
368   """
369   if mode == constants.ETC_HOSTS_ADD:
370     if not ip:
371       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
372               " present")
373     utils.AddHostToEtcHosts(host, ip)
374   elif mode == constants.ETC_HOSTS_REMOVE:
375     if ip:
376       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
377               " parameter is present")
378     utils.RemoveHostFromEtcHosts(host)
379   else:
380     RPCFail("Mode not supported")
381
382
383 def LeaveCluster(modify_ssh_setup):
384   """Cleans up and remove the current node.
385
386   This function cleans up and prepares the current node to be removed
387   from the cluster.
388
389   If processing is successful, then it raises an
390   L{errors.QuitGanetiException} which is used as a special case to
391   shutdown the node daemon.
392
393   @param modify_ssh_setup: boolean
394
395   """
396   _CleanDirectory(constants.DATA_DIR)
397   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
398   JobQueuePurge()
399
400   if modify_ssh_setup:
401     try:
402       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
403
404       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
405
406       utils.RemoveFile(priv_key)
407       utils.RemoveFile(pub_key)
408     except errors.OpExecError:
409       logging.exception("Error while processing ssh files")
410
411   try:
412     utils.RemoveFile(constants.CONFD_HMAC_KEY)
413     utils.RemoveFile(constants.RAPI_CERT_FILE)
414     utils.RemoveFile(constants.NODED_CERT_FILE)
415   except: # pylint: disable-msg=W0702
416     logging.exception("Error while removing cluster secrets")
417
418   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
419   if result.failed:
420     logging.error("Command %s failed with exitcode %s and error %s",
421                   result.cmd, result.exit_code, result.output)
422
423   # Raise a custom exception (handled in ganeti-noded)
424   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
425
426
427 def GetNodeInfo(vgname, hypervisor_type):
428   """Gives back a hash with different information about the node.
429
430   @type vgname: C{string}
431   @param vgname: the name of the volume group to ask for disk space information
432   @type hypervisor_type: C{str}
433   @param hypervisor_type: the name of the hypervisor to ask for
434       memory information
435   @rtype: C{dict}
436   @return: dictionary with the following keys:
437       - vg_size is the size of the configured volume group in MiB
438       - vg_free is the free size of the volume group in MiB
439       - memory_dom0 is the memory allocated for domain0 in MiB
440       - memory_free is the currently available (free) ram in MiB
441       - memory_total is the total number of ram in MiB
442
443   """
444   outputarray = {}
445
446   if vgname is not None:
447     vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
448     vg_free = vg_size = None
449     if vginfo:
450       vg_free = int(round(vginfo[0][0], 0))
451       vg_size = int(round(vginfo[0][1], 0))
452     outputarray['vg_size'] = vg_size
453     outputarray['vg_free'] = vg_free
454
455   if hypervisor_type is not None:
456     hyper = hypervisor.GetHypervisor(hypervisor_type)
457     hyp_info = hyper.GetNodeInfo()
458     if hyp_info is not None:
459       outputarray.update(hyp_info)
460
461   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
462
463   return outputarray
464
465
466 def VerifyNode(what, cluster_name):
467   """Verify the status of the local node.
468
469   Based on the input L{what} parameter, various checks are done on the
470   local node.
471
472   If the I{filelist} key is present, this list of
473   files is checksummed and the file/checksum pairs are returned.
474
475   If the I{nodelist} key is present, we check that we have
476   connectivity via ssh with the target nodes (and check the hostname
477   report).
478
479   If the I{node-net-test} key is present, we check that we have
480   connectivity to the given nodes via both primary IP and, if
481   applicable, secondary IPs.
482
483   @type what: C{dict}
484   @param what: a dictionary of things to check:
485       - filelist: list of files for which to compute checksums
486       - nodelist: list of nodes we should check ssh communication with
487       - node-net-test: list of nodes we should check node daemon port
488         connectivity with
489       - hypervisor: list with hypervisors to run the verify for
490   @rtype: dict
491   @return: a dictionary with the same keys as the input dict, and
492       values representing the result of the checks
493
494   """
495   result = {}
496   my_name = netutils.Hostname.GetSysName()
497   port = netutils.GetDaemonPort(constants.NODED)
498   vm_capable = my_name not in what.get(constants.NV_VMNODES, [])
499
500   if constants.NV_HYPERVISOR in what and vm_capable:
501     result[constants.NV_HYPERVISOR] = tmp = {}
502     for hv_name in what[constants.NV_HYPERVISOR]:
503       try:
504         val = hypervisor.GetHypervisor(hv_name).Verify()
505       except errors.HypervisorError, err:
506         val = "Error while checking hypervisor: %s" % str(err)
507       tmp[hv_name] = val
508
509   if constants.NV_FILELIST in what:
510     result[constants.NV_FILELIST] = utils.FingerprintFiles(
511       what[constants.NV_FILELIST])
512
513   if constants.NV_NODELIST in what:
514     result[constants.NV_NODELIST] = tmp = {}
515     random.shuffle(what[constants.NV_NODELIST])
516     for node in what[constants.NV_NODELIST]:
517       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
518       if not success:
519         tmp[node] = message
520
521   if constants.NV_NODENETTEST in what:
522     result[constants.NV_NODENETTEST] = tmp = {}
523     my_pip = my_sip = None
524     for name, pip, sip in what[constants.NV_NODENETTEST]:
525       if name == my_name:
526         my_pip = pip
527         my_sip = sip
528         break
529     if not my_pip:
530       tmp[my_name] = ("Can't find my own primary/secondary IP"
531                       " in the node list")
532     else:
533       for name, pip, sip in what[constants.NV_NODENETTEST]:
534         fail = []
535         if not netutils.TcpPing(pip, port, source=my_pip):
536           fail.append("primary")
537         if sip != pip:
538           if not netutils.TcpPing(sip, port, source=my_sip):
539             fail.append("secondary")
540         if fail:
541           tmp[name] = ("failure using the %s interface(s)" %
542                        " and ".join(fail))
543
544   if constants.NV_MASTERIP in what:
545     # FIXME: add checks on incoming data structures (here and in the
546     # rest of the function)
547     master_name, master_ip = what[constants.NV_MASTERIP]
548     if master_name == my_name:
549       source = constants.IP4_ADDRESS_LOCALHOST
550     else:
551       source = None
552     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
553                                                   source=source)
554
555   if constants.NV_LVLIST in what and vm_capable:
556     try:
557       val = GetVolumeList(utils.ListVolumeGroups().keys())
558     except RPCFail, err:
559       val = str(err)
560     result[constants.NV_LVLIST] = val
561
562   if constants.NV_INSTANCELIST in what and vm_capable:
563     # GetInstanceList can fail
564     try:
565       val = GetInstanceList(what[constants.NV_INSTANCELIST])
566     except RPCFail, err:
567       val = str(err)
568     result[constants.NV_INSTANCELIST] = val
569
570   if constants.NV_VGLIST in what and vm_capable:
571     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
572
573   if constants.NV_PVLIST in what and vm_capable:
574     result[constants.NV_PVLIST] = \
575       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
576                                    filter_allocatable=False)
577
578   if constants.NV_VERSION in what:
579     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
580                                     constants.RELEASE_VERSION)
581
582   if constants.NV_HVINFO in what and vm_capable:
583     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
584     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
585
586   if constants.NV_DRBDLIST in what and vm_capable:
587     try:
588       used_minors = bdev.DRBD8.GetUsedDevs().keys()
589     except errors.BlockDeviceError, err:
590       logging.warning("Can't get used minors list", exc_info=True)
591       used_minors = str(err)
592     result[constants.NV_DRBDLIST] = used_minors
593
594   if constants.NV_DRBDHELPER in what and vm_capable:
595     status = True
596     try:
597       payload = bdev.BaseDRBD.GetUsermodeHelper()
598     except errors.BlockDeviceError, err:
599       logging.error("Can't get DRBD usermode helper: %s", str(err))
600       status = False
601       payload = str(err)
602     result[constants.NV_DRBDHELPER] = (status, payload)
603
604   if constants.NV_NODESETUP in what:
605     result[constants.NV_NODESETUP] = tmpr = []
606     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
607       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
608                   " under /sys, missing required directories /sys/block"
609                   " and /sys/class/net")
610     if (not os.path.isdir("/proc/sys") or
611         not os.path.isfile("/proc/sysrq-trigger")):
612       tmpr.append("The procfs filesystem doesn't seem to be mounted"
613                   " under /proc, missing required directory /proc/sys and"
614                   " the file /proc/sysrq-trigger")
615
616   if constants.NV_TIME in what:
617     result[constants.NV_TIME] = utils.SplitTime(time.time())
618
619   if constants.NV_OSLIST in what and vm_capable:
620     result[constants.NV_OSLIST] = DiagnoseOS()
621
622   return result
623
624
625 def GetVolumeList(vg_names):
626   """Compute list of logical volumes and their size.
627
628   @type vg_names: list
629   @param vg_names: the volume groups whose LVs we should list
630   @rtype: dict
631   @return:
632       dictionary of all partions (key) with value being a tuple of
633       their size (in MiB), inactive and online status::
634
635         {'xenvg/test1': ('20.06', True, True)}
636
637       in case of errors, a string is returned with the error
638       details.
639
640   """
641   lvs = {}
642   sep = '|'
643   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
644                          "--separator=%s" % sep,
645                          "-ovg_name,lv_name,lv_size,lv_attr"] + vg_names)
646   if result.failed:
647     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
648
649   for line in result.stdout.splitlines():
650     line = line.strip()
651     match = _LVSLINE_REGEX.match(line)
652     if not match:
653       logging.error("Invalid line returned from lvs output: '%s'", line)
654       continue
655     vg_name, name, size, attr = match.groups()
656     inactive = attr[4] == '-'
657     online = attr[5] == 'o'
658     virtual = attr[0] == 'v'
659     if virtual:
660       # we don't want to report such volumes as existing, since they
661       # don't really hold data
662       continue
663     lvs[vg_name+"/"+name] = (size, inactive, online)
664
665   return lvs
666
667
668 def ListVolumeGroups():
669   """List the volume groups and their size.
670
671   @rtype: dict
672   @return: dictionary with keys volume name and values the
673       size of the volume
674
675   """
676   return utils.ListVolumeGroups()
677
678
679 def NodeVolumes():
680   """List all volumes on this node.
681
682   @rtype: list
683   @return:
684     A list of dictionaries, each having four keys:
685       - name: the logical volume name,
686       - size: the size of the logical volume
687       - dev: the physical device on which the LV lives
688       - vg: the volume group to which it belongs
689
690     In case of errors, we return an empty list and log the
691     error.
692
693     Note that since a logical volume can live on multiple physical
694     volumes, the resulting list might include a logical volume
695     multiple times.
696
697   """
698   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
699                          "--separator=|",
700                          "--options=lv_name,lv_size,devices,vg_name"])
701   if result.failed:
702     _Fail("Failed to list logical volumes, lvs output: %s",
703           result.output)
704
705   def parse_dev(dev):
706     return dev.split('(')[0]
707
708   def handle_dev(dev):
709     return [parse_dev(x) for x in dev.split(",")]
710
711   def map_line(line):
712     line = [v.strip() for v in line]
713     return [{'name': line[0], 'size': line[1],
714              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
715
716   all_devs = []
717   for line in result.stdout.splitlines():
718     if line.count('|') >= 3:
719       all_devs.extend(map_line(line.split('|')))
720     else:
721       logging.warning("Strange line in the output from lvs: '%s'", line)
722   return all_devs
723
724
725 def BridgesExist(bridges_list):
726   """Check if a list of bridges exist on the current node.
727
728   @rtype: boolean
729   @return: C{True} if all of them exist, C{False} otherwise
730
731   """
732   missing = []
733   for bridge in bridges_list:
734     if not utils.BridgeExists(bridge):
735       missing.append(bridge)
736
737   if missing:
738     _Fail("Missing bridges %s", utils.CommaJoin(missing))
739
740
741 def GetInstanceList(hypervisor_list):
742   """Provides a list of instances.
743
744   @type hypervisor_list: list
745   @param hypervisor_list: the list of hypervisors to query information
746
747   @rtype: list
748   @return: a list of all running instances on the current node
749     - instance1.example.com
750     - instance2.example.com
751
752   """
753   results = []
754   for hname in hypervisor_list:
755     try:
756       names = hypervisor.GetHypervisor(hname).ListInstances()
757       results.extend(names)
758     except errors.HypervisorError, err:
759       _Fail("Error enumerating instances (hypervisor %s): %s",
760             hname, err, exc=True)
761
762   return results
763
764
765 def GetInstanceInfo(instance, hname):
766   """Gives back the information about an instance as a dictionary.
767
768   @type instance: string
769   @param instance: the instance name
770   @type hname: string
771   @param hname: the hypervisor type of the instance
772
773   @rtype: dict
774   @return: dictionary with the following keys:
775       - memory: memory size of instance (int)
776       - state: xen state of instance (string)
777       - time: cpu time of instance (float)
778
779   """
780   output = {}
781
782   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
783   if iinfo is not None:
784     output['memory'] = iinfo[2]
785     output['state'] = iinfo[4]
786     output['time'] = iinfo[5]
787
788   return output
789
790
791 def GetInstanceMigratable(instance):
792   """Gives whether an instance can be migrated.
793
794   @type instance: L{objects.Instance}
795   @param instance: object representing the instance to be checked.
796
797   @rtype: tuple
798   @return: tuple of (result, description) where:
799       - result: whether the instance can be migrated or not
800       - description: a description of the issue, if relevant
801
802   """
803   hyper = hypervisor.GetHypervisor(instance.hypervisor)
804   iname = instance.name
805   if iname not in hyper.ListInstances():
806     _Fail("Instance %s is not running", iname)
807
808   for idx in range(len(instance.disks)):
809     link_name = _GetBlockDevSymlinkPath(iname, idx)
810     if not os.path.islink(link_name):
811       logging.warning("Instance %s is missing symlink %s for disk %d",
812                       iname, link_name, idx)
813
814
815 def GetAllInstancesInfo(hypervisor_list):
816   """Gather data about all instances.
817
818   This is the equivalent of L{GetInstanceInfo}, except that it
819   computes data for all instances at once, thus being faster if one
820   needs data about more than one instance.
821
822   @type hypervisor_list: list
823   @param hypervisor_list: list of hypervisors to query for instance data
824
825   @rtype: dict
826   @return: dictionary of instance: data, with data having the following keys:
827       - memory: memory size of instance (int)
828       - state: xen state of instance (string)
829       - time: cpu time of instance (float)
830       - vcpus: the number of vcpus
831
832   """
833   output = {}
834
835   for hname in hypervisor_list:
836     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
837     if iinfo:
838       for name, _, memory, vcpus, state, times in iinfo:
839         value = {
840           'memory': memory,
841           'vcpus': vcpus,
842           'state': state,
843           'time': times,
844           }
845         if name in output:
846           # we only check static parameters, like memory and vcpus,
847           # and not state and time which can change between the
848           # invocations of the different hypervisors
849           for key in 'memory', 'vcpus':
850             if value[key] != output[name][key]:
851               _Fail("Instance %s is running twice"
852                     " with different parameters", name)
853         output[name] = value
854
855   return output
856
857
858 def _InstanceLogName(kind, os_name, instance):
859   """Compute the OS log filename for a given instance and operation.
860
861   The instance name and os name are passed in as strings since not all
862   operations have these as part of an instance object.
863
864   @type kind: string
865   @param kind: the operation type (e.g. add, import, etc.)
866   @type os_name: string
867   @param os_name: the os name
868   @type instance: string
869   @param instance: the name of the instance being imported/added/etc.
870
871   """
872   # TODO: Use tempfile.mkstemp to create unique filename
873   base = ("%s-%s-%s-%s.log" %
874           (kind, os_name, instance, utils.TimestampForFilename()))
875   return utils.PathJoin(constants.LOG_OS_DIR, base)
876
877
878 def InstanceOsAdd(instance, reinstall, debug):
879   """Add an OS to an instance.
880
881   @type instance: L{objects.Instance}
882   @param instance: Instance whose OS is to be installed
883   @type reinstall: boolean
884   @param reinstall: whether this is an instance reinstall
885   @type debug: integer
886   @param debug: debug level, passed to the OS scripts
887   @rtype: None
888
889   """
890   inst_os = OSFromDisk(instance.os)
891
892   create_env = OSEnvironment(instance, inst_os, debug)
893   if reinstall:
894     create_env['INSTANCE_REINSTALL'] = "1"
895
896   logfile = _InstanceLogName("add", instance.os, instance.name)
897
898   result = utils.RunCmd([inst_os.create_script], env=create_env,
899                         cwd=inst_os.path, output=logfile,)
900   if result.failed:
901     logging.error("os create command '%s' returned error: %s, logfile: %s,"
902                   " output: %s", result.cmd, result.fail_reason, logfile,
903                   result.output)
904     lines = [utils.SafeEncode(val)
905              for val in utils.TailFile(logfile, lines=20)]
906     _Fail("OS create script failed (%s), last lines in the"
907           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
908
909
910 def RunRenameInstance(instance, old_name, debug):
911   """Run the OS rename script for an instance.
912
913   @type instance: L{objects.Instance}
914   @param instance: Instance whose OS is to be installed
915   @type old_name: string
916   @param old_name: previous instance name
917   @type debug: integer
918   @param debug: debug level, passed to the OS scripts
919   @rtype: boolean
920   @return: the success of the operation
921
922   """
923   inst_os = OSFromDisk(instance.os)
924
925   rename_env = OSEnvironment(instance, inst_os, debug)
926   rename_env['OLD_INSTANCE_NAME'] = old_name
927
928   logfile = _InstanceLogName("rename", instance.os,
929                              "%s-%s" % (old_name, instance.name))
930
931   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
932                         cwd=inst_os.path, output=logfile)
933
934   if result.failed:
935     logging.error("os create command '%s' returned error: %s output: %s",
936                   result.cmd, result.fail_reason, result.output)
937     lines = [utils.SafeEncode(val)
938              for val in utils.TailFile(logfile, lines=20)]
939     _Fail("OS rename script failed (%s), last lines in the"
940           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
941
942
943 def _GetBlockDevSymlinkPath(instance_name, idx):
944   return utils.PathJoin(constants.DISK_LINKS_DIR,
945                         "%s:%d" % (instance_name, idx))
946
947
948 def _SymlinkBlockDev(instance_name, device_path, idx):
949   """Set up symlinks to a instance's block device.
950
951   This is an auxiliary function run when an instance is start (on the primary
952   node) or when an instance is migrated (on the target node).
953
954
955   @param instance_name: the name of the target instance
956   @param device_path: path of the physical block device, on the node
957   @param idx: the disk index
958   @return: absolute path to the disk's symlink
959
960   """
961   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
962   try:
963     os.symlink(device_path, link_name)
964   except OSError, err:
965     if err.errno == errno.EEXIST:
966       if (not os.path.islink(link_name) or
967           os.readlink(link_name) != device_path):
968         os.remove(link_name)
969         os.symlink(device_path, link_name)
970     else:
971       raise
972
973   return link_name
974
975
976 def _RemoveBlockDevLinks(instance_name, disks):
977   """Remove the block device symlinks belonging to the given instance.
978
979   """
980   for idx, _ in enumerate(disks):
981     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
982     if os.path.islink(link_name):
983       try:
984         os.remove(link_name)
985       except OSError:
986         logging.exception("Can't remove symlink '%s'", link_name)
987
988
989 def _GatherAndLinkBlockDevs(instance):
990   """Set up an instance's block device(s).
991
992   This is run on the primary node at instance startup. The block
993   devices must be already assembled.
994
995   @type instance: L{objects.Instance}
996   @param instance: the instance whose disks we shoul assemble
997   @rtype: list
998   @return: list of (disk_object, device_path)
999
1000   """
1001   block_devices = []
1002   for idx, disk in enumerate(instance.disks):
1003     device = _RecursiveFindBD(disk)
1004     if device is None:
1005       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1006                                     str(disk))
1007     device.Open()
1008     try:
1009       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1010     except OSError, e:
1011       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1012                                     e.strerror)
1013
1014     block_devices.append((disk, link_name))
1015
1016   return block_devices
1017
1018
1019 def StartInstance(instance):
1020   """Start an instance.
1021
1022   @type instance: L{objects.Instance}
1023   @param instance: the instance object
1024   @rtype: None
1025
1026   """
1027   running_instances = GetInstanceList([instance.hypervisor])
1028
1029   if instance.name in running_instances:
1030     logging.info("Instance %s already running, not starting", instance.name)
1031     return
1032
1033   try:
1034     block_devices = _GatherAndLinkBlockDevs(instance)
1035     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1036     hyper.StartInstance(instance, block_devices)
1037   except errors.BlockDeviceError, err:
1038     _Fail("Block device error: %s", err, exc=True)
1039   except errors.HypervisorError, err:
1040     _RemoveBlockDevLinks(instance.name, instance.disks)
1041     _Fail("Hypervisor error: %s", err, exc=True)
1042
1043
1044 def InstanceShutdown(instance, timeout):
1045   """Shut an instance down.
1046
1047   @note: this functions uses polling with a hardcoded timeout.
1048
1049   @type instance: L{objects.Instance}
1050   @param instance: the instance object
1051   @type timeout: integer
1052   @param timeout: maximum timeout for soft shutdown
1053   @rtype: None
1054
1055   """
1056   hv_name = instance.hypervisor
1057   hyper = hypervisor.GetHypervisor(hv_name)
1058   iname = instance.name
1059
1060   if instance.name not in hyper.ListInstances():
1061     logging.info("Instance %s not running, doing nothing", iname)
1062     return
1063
1064   class _TryShutdown:
1065     def __init__(self):
1066       self.tried_once = False
1067
1068     def __call__(self):
1069       if iname not in hyper.ListInstances():
1070         return
1071
1072       try:
1073         hyper.StopInstance(instance, retry=self.tried_once)
1074       except errors.HypervisorError, err:
1075         if iname not in hyper.ListInstances():
1076           # if the instance is no longer existing, consider this a
1077           # success and go to cleanup
1078           return
1079
1080         _Fail("Failed to stop instance %s: %s", iname, err)
1081
1082       self.tried_once = True
1083
1084       raise utils.RetryAgain()
1085
1086   try:
1087     utils.Retry(_TryShutdown(), 5, timeout)
1088   except utils.RetryTimeout:
1089     # the shutdown did not succeed
1090     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1091
1092     try:
1093       hyper.StopInstance(instance, force=True)
1094     except errors.HypervisorError, err:
1095       if iname in hyper.ListInstances():
1096         # only raise an error if the instance still exists, otherwise
1097         # the error could simply be "instance ... unknown"!
1098         _Fail("Failed to force stop instance %s: %s", iname, err)
1099
1100     time.sleep(1)
1101
1102     if iname in hyper.ListInstances():
1103       _Fail("Could not shutdown instance %s even by destroy", iname)
1104
1105   try:
1106     hyper.CleanupInstance(instance.name)
1107   except errors.HypervisorError, err:
1108     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1109
1110   _RemoveBlockDevLinks(iname, instance.disks)
1111
1112
1113 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1114   """Reboot an instance.
1115
1116   @type instance: L{objects.Instance}
1117   @param instance: the instance object to reboot
1118   @type reboot_type: str
1119   @param reboot_type: the type of reboot, one the following
1120     constants:
1121       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1122         instance OS, do not recreate the VM
1123       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1124         restart the VM (at the hypervisor level)
1125       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1126         not accepted here, since that mode is handled differently, in
1127         cmdlib, and translates into full stop and start of the
1128         instance (instead of a call_instance_reboot RPC)
1129   @type shutdown_timeout: integer
1130   @param shutdown_timeout: maximum timeout for soft shutdown
1131   @rtype: None
1132
1133   """
1134   running_instances = GetInstanceList([instance.hypervisor])
1135
1136   if instance.name not in running_instances:
1137     _Fail("Cannot reboot instance %s that is not running", instance.name)
1138
1139   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1140   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1141     try:
1142       hyper.RebootInstance(instance)
1143     except errors.HypervisorError, err:
1144       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1145   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1146     try:
1147       InstanceShutdown(instance, shutdown_timeout)
1148       return StartInstance(instance)
1149     except errors.HypervisorError, err:
1150       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1151   else:
1152     _Fail("Invalid reboot_type received: %s", reboot_type)
1153
1154
1155 def MigrationInfo(instance):
1156   """Gather information about an instance to be migrated.
1157
1158   @type instance: L{objects.Instance}
1159   @param instance: the instance definition
1160
1161   """
1162   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1163   try:
1164     info = hyper.MigrationInfo(instance)
1165   except errors.HypervisorError, err:
1166     _Fail("Failed to fetch migration information: %s", err, exc=True)
1167   return info
1168
1169
1170 def AcceptInstance(instance, info, target):
1171   """Prepare the node to accept an instance.
1172
1173   @type instance: L{objects.Instance}
1174   @param instance: the instance definition
1175   @type info: string/data (opaque)
1176   @param info: migration information, from the source node
1177   @type target: string
1178   @param target: target host (usually ip), on this node
1179
1180   """
1181   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1182   try:
1183     hyper.AcceptInstance(instance, info, target)
1184   except errors.HypervisorError, err:
1185     _Fail("Failed to accept instance: %s", err, exc=True)
1186
1187
1188 def FinalizeMigration(instance, info, success):
1189   """Finalize any preparation to accept an instance.
1190
1191   @type instance: L{objects.Instance}
1192   @param instance: the instance definition
1193   @type info: string/data (opaque)
1194   @param info: migration information, from the source node
1195   @type success: boolean
1196   @param success: whether the migration was a success or a failure
1197
1198   """
1199   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1200   try:
1201     hyper.FinalizeMigration(instance, info, success)
1202   except errors.HypervisorError, err:
1203     _Fail("Failed to finalize migration: %s", err, exc=True)
1204
1205
1206 def MigrateInstance(instance, target, live):
1207   """Migrates an instance to another node.
1208
1209   @type instance: L{objects.Instance}
1210   @param instance: the instance definition
1211   @type target: string
1212   @param target: the target node name
1213   @type live: boolean
1214   @param live: whether the migration should be done live or not (the
1215       interpretation of this parameter is left to the hypervisor)
1216   @rtype: tuple
1217   @return: a tuple of (success, msg) where:
1218       - succes is a boolean denoting the success/failure of the operation
1219       - msg is a string with details in case of failure
1220
1221   """
1222   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1223
1224   try:
1225     hyper.MigrateInstance(instance, target, live)
1226   except errors.HypervisorError, err:
1227     _Fail("Failed to migrate instance: %s", err, exc=True)
1228
1229
1230 def BlockdevCreate(disk, size, owner, on_primary, info):
1231   """Creates a block device for an instance.
1232
1233   @type disk: L{objects.Disk}
1234   @param disk: the object describing the disk we should create
1235   @type size: int
1236   @param size: the size of the physical underlying device, in MiB
1237   @type owner: str
1238   @param owner: the name of the instance for which disk is created,
1239       used for device cache data
1240   @type on_primary: boolean
1241   @param on_primary:  indicates if it is the primary node or not
1242   @type info: string
1243   @param info: string that will be sent to the physical device
1244       creation, used for example to set (LVM) tags on LVs
1245
1246   @return: the new unique_id of the device (this can sometime be
1247       computed only after creation), or None. On secondary nodes,
1248       it's not required to return anything.
1249
1250   """
1251   # TODO: remove the obsolete 'size' argument
1252   # pylint: disable-msg=W0613
1253   clist = []
1254   if disk.children:
1255     for child in disk.children:
1256       try:
1257         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1258       except errors.BlockDeviceError, err:
1259         _Fail("Can't assemble device %s: %s", child, err)
1260       if on_primary or disk.AssembleOnSecondary():
1261         # we need the children open in case the device itself has to
1262         # be assembled
1263         try:
1264           # pylint: disable-msg=E1103
1265           crdev.Open()
1266         except errors.BlockDeviceError, err:
1267           _Fail("Can't make child '%s' read-write: %s", child, err)
1268       clist.append(crdev)
1269
1270   try:
1271     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1272   except errors.BlockDeviceError, err:
1273     _Fail("Can't create block device: %s", err)
1274
1275   if on_primary or disk.AssembleOnSecondary():
1276     try:
1277       device.Assemble()
1278     except errors.BlockDeviceError, err:
1279       _Fail("Can't assemble device after creation, unusual event: %s", err)
1280     device.SetSyncSpeed(constants.SYNC_SPEED)
1281     if on_primary or disk.OpenOnSecondary():
1282       try:
1283         device.Open(force=True)
1284       except errors.BlockDeviceError, err:
1285         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1286     DevCacheManager.UpdateCache(device.dev_path, owner,
1287                                 on_primary, disk.iv_name)
1288
1289   device.SetInfo(info)
1290
1291   return device.unique_id
1292
1293
1294 def _WipeDevice(path, offset, size):
1295   """This function actually wipes the device.
1296
1297   @param path: The path to the device to wipe
1298   @param offset: The offset in MiB in the file
1299   @param size: The size in MiB to write
1300
1301   """
1302   cmd = [constants.DD_CMD, "if=/dev/zero", "seek=%d" % offset,
1303          "bs=%d" % constants.WIPE_BLOCK_SIZE, "oflag=direct", "of=%s" % path,
1304          "count=%d" % size]
1305   result = utils.RunCmd(cmd)
1306
1307   if result.failed:
1308     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1309           result.fail_reason, result.output)
1310
1311
1312 def BlockdevWipe(disk, offset, size):
1313   """Wipes a block device.
1314
1315   @type disk: L{objects.Disk}
1316   @param disk: the disk object we want to wipe
1317   @type offset: int
1318   @param offset: The offset in MiB in the file
1319   @type size: int
1320   @param size: The size in MiB to write
1321
1322   """
1323   try:
1324     rdev = _RecursiveFindBD(disk)
1325   except errors.BlockDeviceError:
1326     rdev = None
1327
1328   if not rdev:
1329     _Fail("Cannot execute wipe for device %s: device not found", disk.iv_name)
1330
1331   # Do cross verify some of the parameters
1332   if offset > rdev.size:
1333     _Fail("Offset is bigger than device size")
1334   if (offset + size) > rdev.size:
1335     _Fail("The provided offset and size to wipe is bigger than device size")
1336
1337   _WipeDevice(rdev.dev_path, offset, size)
1338
1339
1340 def BlockdevRemove(disk):
1341   """Remove a block device.
1342
1343   @note: This is intended to be called recursively.
1344
1345   @type disk: L{objects.Disk}
1346   @param disk: the disk object we should remove
1347   @rtype: boolean
1348   @return: the success of the operation
1349
1350   """
1351   msgs = []
1352   try:
1353     rdev = _RecursiveFindBD(disk)
1354   except errors.BlockDeviceError, err:
1355     # probably can't attach
1356     logging.info("Can't attach to device %s in remove", disk)
1357     rdev = None
1358   if rdev is not None:
1359     r_path = rdev.dev_path
1360     try:
1361       rdev.Remove()
1362     except errors.BlockDeviceError, err:
1363       msgs.append(str(err))
1364     if not msgs:
1365       DevCacheManager.RemoveCache(r_path)
1366
1367   if disk.children:
1368     for child in disk.children:
1369       try:
1370         BlockdevRemove(child)
1371       except RPCFail, err:
1372         msgs.append(str(err))
1373
1374   if msgs:
1375     _Fail("; ".join(msgs))
1376
1377
1378 def _RecursiveAssembleBD(disk, owner, as_primary):
1379   """Activate a block device for an instance.
1380
1381   This is run on the primary and secondary nodes for an instance.
1382
1383   @note: this function is called recursively.
1384
1385   @type disk: L{objects.Disk}
1386   @param disk: the disk we try to assemble
1387   @type owner: str
1388   @param owner: the name of the instance which owns the disk
1389   @type as_primary: boolean
1390   @param as_primary: if we should make the block device
1391       read/write
1392
1393   @return: the assembled device or None (in case no device
1394       was assembled)
1395   @raise errors.BlockDeviceError: in case there is an error
1396       during the activation of the children or the device
1397       itself
1398
1399   """
1400   children = []
1401   if disk.children:
1402     mcn = disk.ChildrenNeeded()
1403     if mcn == -1:
1404       mcn = 0 # max number of Nones allowed
1405     else:
1406       mcn = len(disk.children) - mcn # max number of Nones
1407     for chld_disk in disk.children:
1408       try:
1409         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1410       except errors.BlockDeviceError, err:
1411         if children.count(None) >= mcn:
1412           raise
1413         cdev = None
1414         logging.error("Error in child activation (but continuing): %s",
1415                       str(err))
1416       children.append(cdev)
1417
1418   if as_primary or disk.AssembleOnSecondary():
1419     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1420     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1421     result = r_dev
1422     if as_primary or disk.OpenOnSecondary():
1423       r_dev.Open()
1424     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1425                                 as_primary, disk.iv_name)
1426
1427   else:
1428     result = True
1429   return result
1430
1431
1432 def BlockdevAssemble(disk, owner, as_primary):
1433   """Activate a block device for an instance.
1434
1435   This is a wrapper over _RecursiveAssembleBD.
1436
1437   @rtype: str or boolean
1438   @return: a C{/dev/...} path for primary nodes, and
1439       C{True} for secondary nodes
1440
1441   """
1442   try:
1443     result = _RecursiveAssembleBD(disk, owner, as_primary)
1444     if isinstance(result, bdev.BlockDev):
1445       # pylint: disable-msg=E1103
1446       result = result.dev_path
1447   except errors.BlockDeviceError, err:
1448     _Fail("Error while assembling disk: %s", err, exc=True)
1449
1450   return result
1451
1452
1453 def BlockdevShutdown(disk):
1454   """Shut down a block device.
1455
1456   First, if the device is assembled (Attach() is successful), then
1457   the device is shutdown. Then the children of the device are
1458   shutdown.
1459
1460   This function is called recursively. Note that we don't cache the
1461   children or such, as oppossed to assemble, shutdown of different
1462   devices doesn't require that the upper device was active.
1463
1464   @type disk: L{objects.Disk}
1465   @param disk: the description of the disk we should
1466       shutdown
1467   @rtype: None
1468
1469   """
1470   msgs = []
1471   r_dev = _RecursiveFindBD(disk)
1472   if r_dev is not None:
1473     r_path = r_dev.dev_path
1474     try:
1475       r_dev.Shutdown()
1476       DevCacheManager.RemoveCache(r_path)
1477     except errors.BlockDeviceError, err:
1478       msgs.append(str(err))
1479
1480   if disk.children:
1481     for child in disk.children:
1482       try:
1483         BlockdevShutdown(child)
1484       except RPCFail, err:
1485         msgs.append(str(err))
1486
1487   if msgs:
1488     _Fail("; ".join(msgs))
1489
1490
1491 def BlockdevAddchildren(parent_cdev, new_cdevs):
1492   """Extend a mirrored block device.
1493
1494   @type parent_cdev: L{objects.Disk}
1495   @param parent_cdev: the disk to which we should add children
1496   @type new_cdevs: list of L{objects.Disk}
1497   @param new_cdevs: the list of children which we should add
1498   @rtype: None
1499
1500   """
1501   parent_bdev = _RecursiveFindBD(parent_cdev)
1502   if parent_bdev is None:
1503     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1504   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1505   if new_bdevs.count(None) > 0:
1506     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1507   parent_bdev.AddChildren(new_bdevs)
1508
1509
1510 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1511   """Shrink a mirrored block device.
1512
1513   @type parent_cdev: L{objects.Disk}
1514   @param parent_cdev: the disk from which we should remove children
1515   @type new_cdevs: list of L{objects.Disk}
1516   @param new_cdevs: the list of children which we should remove
1517   @rtype: None
1518
1519   """
1520   parent_bdev = _RecursiveFindBD(parent_cdev)
1521   if parent_bdev is None:
1522     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1523   devs = []
1524   for disk in new_cdevs:
1525     rpath = disk.StaticDevPath()
1526     if rpath is None:
1527       bd = _RecursiveFindBD(disk)
1528       if bd is None:
1529         _Fail("Can't find device %s while removing children", disk)
1530       else:
1531         devs.append(bd.dev_path)
1532     else:
1533       if not utils.IsNormAbsPath(rpath):
1534         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1535       devs.append(rpath)
1536   parent_bdev.RemoveChildren(devs)
1537
1538
1539 def BlockdevGetmirrorstatus(disks):
1540   """Get the mirroring status of a list of devices.
1541
1542   @type disks: list of L{objects.Disk}
1543   @param disks: the list of disks which we should query
1544   @rtype: disk
1545   @return: List of L{objects.BlockDevStatus}, one for each disk
1546   @raise errors.BlockDeviceError: if any of the disks cannot be
1547       found
1548
1549   """
1550   stats = []
1551   for dsk in disks:
1552     rbd = _RecursiveFindBD(dsk)
1553     if rbd is None:
1554       _Fail("Can't find device %s", dsk)
1555
1556     stats.append(rbd.CombinedSyncStatus())
1557
1558   return stats
1559
1560
1561 def BlockdevGetmirrorstatusMulti(disks):
1562   """Get the mirroring status of a list of devices.
1563
1564   @type disks: list of L{objects.Disk}
1565   @param disks: the list of disks which we should query
1566   @rtype: disk
1567   @return: List of tuples, (bool, status), one for each disk; bool denotes
1568     success/failure, status is L{objects.BlockDevStatus} on success, string
1569     otherwise
1570
1571   """
1572   result = []
1573   for disk in disks:
1574     try:
1575       rbd = _RecursiveFindBD(disk)
1576       if rbd is None:
1577         result.append((False, "Can't find device %s" % disk))
1578         continue
1579
1580       status = rbd.CombinedSyncStatus()
1581     except errors.BlockDeviceError, err:
1582       logging.exception("Error while getting disk status")
1583       result.append((False, str(err)))
1584     else:
1585       result.append((True, status))
1586
1587   assert len(disks) == len(result)
1588
1589   return result
1590
1591
1592 def _RecursiveFindBD(disk):
1593   """Check if a device is activated.
1594
1595   If so, return information about the real device.
1596
1597   @type disk: L{objects.Disk}
1598   @param disk: the disk object we need to find
1599
1600   @return: None if the device can't be found,
1601       otherwise the device instance
1602
1603   """
1604   children = []
1605   if disk.children:
1606     for chdisk in disk.children:
1607       children.append(_RecursiveFindBD(chdisk))
1608
1609   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1610
1611
1612 def _OpenRealBD(disk):
1613   """Opens the underlying block device of a disk.
1614
1615   @type disk: L{objects.Disk}
1616   @param disk: the disk object we want to open
1617
1618   """
1619   real_disk = _RecursiveFindBD(disk)
1620   if real_disk is None:
1621     _Fail("Block device '%s' is not set up", disk)
1622
1623   real_disk.Open()
1624
1625   return real_disk
1626
1627
1628 def BlockdevFind(disk):
1629   """Check if a device is activated.
1630
1631   If it is, return information about the real device.
1632
1633   @type disk: L{objects.Disk}
1634   @param disk: the disk to find
1635   @rtype: None or objects.BlockDevStatus
1636   @return: None if the disk cannot be found, otherwise a the current
1637            information
1638
1639   """
1640   try:
1641     rbd = _RecursiveFindBD(disk)
1642   except errors.BlockDeviceError, err:
1643     _Fail("Failed to find device: %s", err, exc=True)
1644
1645   if rbd is None:
1646     return None
1647
1648   return rbd.GetSyncStatus()
1649
1650
1651 def BlockdevGetsize(disks):
1652   """Computes the size of the given disks.
1653
1654   If a disk is not found, returns None instead.
1655
1656   @type disks: list of L{objects.Disk}
1657   @param disks: the list of disk to compute the size for
1658   @rtype: list
1659   @return: list with elements None if the disk cannot be found,
1660       otherwise the size
1661
1662   """
1663   result = []
1664   for cf in disks:
1665     try:
1666       rbd = _RecursiveFindBD(cf)
1667     except errors.BlockDeviceError:
1668       result.append(None)
1669       continue
1670     if rbd is None:
1671       result.append(None)
1672     else:
1673       result.append(rbd.GetActualSize())
1674   return result
1675
1676
1677 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1678   """Export a block device to a remote node.
1679
1680   @type disk: L{objects.Disk}
1681   @param disk: the description of the disk to export
1682   @type dest_node: str
1683   @param dest_node: the destination node to export to
1684   @type dest_path: str
1685   @param dest_path: the destination path on the target node
1686   @type cluster_name: str
1687   @param cluster_name: the cluster name, needed for SSH hostalias
1688   @rtype: None
1689
1690   """
1691   real_disk = _OpenRealBD(disk)
1692
1693   # the block size on the read dd is 1MiB to match our units
1694   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1695                                "dd if=%s bs=1048576 count=%s",
1696                                real_disk.dev_path, str(disk.size))
1697
1698   # we set here a smaller block size as, due to ssh buffering, more
1699   # than 64-128k will mostly ignored; we use nocreat to fail if the
1700   # device is not already there or we pass a wrong path; we use
1701   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1702   # to not buffer too much memory; this means that at best, we flush
1703   # every 64k, which will not be very fast
1704   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1705                                 " oflag=dsync", dest_path)
1706
1707   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1708                                                    constants.GANETI_RUNAS,
1709                                                    destcmd)
1710
1711   # all commands have been checked, so we're safe to combine them
1712   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1713
1714   result = utils.RunCmd(["bash", "-c", command])
1715
1716   if result.failed:
1717     _Fail("Disk copy command '%s' returned error: %s"
1718           " output: %s", command, result.fail_reason, result.output)
1719
1720
1721 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1722   """Write a file to the filesystem.
1723
1724   This allows the master to overwrite(!) a file. It will only perform
1725   the operation if the file belongs to a list of configuration files.
1726
1727   @type file_name: str
1728   @param file_name: the target file name
1729   @type data: str
1730   @param data: the new contents of the file
1731   @type mode: int
1732   @param mode: the mode to give the file (can be None)
1733   @type uid: int
1734   @param uid: the owner of the file (can be -1 for default)
1735   @type gid: int
1736   @param gid: the group of the file (can be -1 for default)
1737   @type atime: float
1738   @param atime: the atime to set on the file (can be None)
1739   @type mtime: float
1740   @param mtime: the mtime to set on the file (can be None)
1741   @rtype: None
1742
1743   """
1744   if not os.path.isabs(file_name):
1745     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1746
1747   if file_name not in _ALLOWED_UPLOAD_FILES:
1748     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1749           file_name)
1750
1751   raw_data = _Decompress(data)
1752
1753   utils.SafeWriteFile(file_name, None,
1754                       data=raw_data, mode=mode, uid=uid, gid=gid,
1755                       atime=atime, mtime=mtime)
1756
1757
1758 def RunOob(oob_program, command, node, timeout):
1759   """Executes oob_program with given command on given node.
1760
1761   @param oob_program: The path to the executable oob_program
1762   @param command: The command to invoke on oob_program
1763   @param node: The node given as an argument to the program
1764   @param timeout: Timeout after which we kill the oob program
1765
1766   @return: stdout
1767   @raise RPCFail: If execution fails for some reason
1768
1769   """
1770   result = utils.RunCmd([oob_program, command, node], timeout=timeout)
1771
1772   if result.failed:
1773     _Fail("'%s' failed with reason '%s'; output: %s", result.cmd,
1774           result.fail_reason, result.output)
1775
1776   return result.stdout
1777
1778
1779 def WriteSsconfFiles(values):
1780   """Update all ssconf files.
1781
1782   Wrapper around the SimpleStore.WriteFiles.
1783
1784   """
1785   ssconf.SimpleStore().WriteFiles(values)
1786
1787
1788 def _ErrnoOrStr(err):
1789   """Format an EnvironmentError exception.
1790
1791   If the L{err} argument has an errno attribute, it will be looked up
1792   and converted into a textual C{E...} description. Otherwise the
1793   string representation of the error will be returned.
1794
1795   @type err: L{EnvironmentError}
1796   @param err: the exception to format
1797
1798   """
1799   if hasattr(err, 'errno'):
1800     detail = errno.errorcode[err.errno]
1801   else:
1802     detail = str(err)
1803   return detail
1804
1805
1806 def _OSOndiskAPIVersion(os_dir):
1807   """Compute and return the API version of a given OS.
1808
1809   This function will try to read the API version of the OS residing in
1810   the 'os_dir' directory.
1811
1812   @type os_dir: str
1813   @param os_dir: the directory in which we should look for the OS
1814   @rtype: tuple
1815   @return: tuple (status, data) with status denoting the validity and
1816       data holding either the vaid versions or an error message
1817
1818   """
1819   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1820
1821   try:
1822     st = os.stat(api_file)
1823   except EnvironmentError, err:
1824     return False, ("Required file '%s' not found under path %s: %s" %
1825                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1826
1827   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1828     return False, ("File '%s' in %s is not a regular file" %
1829                    (constants.OS_API_FILE, os_dir))
1830
1831   try:
1832     api_versions = utils.ReadFile(api_file).splitlines()
1833   except EnvironmentError, err:
1834     return False, ("Error while reading the API version file at %s: %s" %
1835                    (api_file, _ErrnoOrStr(err)))
1836
1837   try:
1838     api_versions = [int(version.strip()) for version in api_versions]
1839   except (TypeError, ValueError), err:
1840     return False, ("API version(s) can't be converted to integer: %s" %
1841                    str(err))
1842
1843   return True, api_versions
1844
1845
1846 def DiagnoseOS(top_dirs=None):
1847   """Compute the validity for all OSes.
1848
1849   @type top_dirs: list
1850   @param top_dirs: the list of directories in which to
1851       search (if not given defaults to
1852       L{constants.OS_SEARCH_PATH})
1853   @rtype: list of L{objects.OS}
1854   @return: a list of tuples (name, path, status, diagnose, variants,
1855       parameters, api_version) for all (potential) OSes under all
1856       search paths, where:
1857           - name is the (potential) OS name
1858           - path is the full path to the OS
1859           - status True/False is the validity of the OS
1860           - diagnose is the error message for an invalid OS, otherwise empty
1861           - variants is a list of supported OS variants, if any
1862           - parameters is a list of (name, help) parameters, if any
1863           - api_version is a list of support OS API versions
1864
1865   """
1866   if top_dirs is None:
1867     top_dirs = constants.OS_SEARCH_PATH
1868
1869   result = []
1870   for dir_name in top_dirs:
1871     if os.path.isdir(dir_name):
1872       try:
1873         f_names = utils.ListVisibleFiles(dir_name)
1874       except EnvironmentError, err:
1875         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1876         break
1877       for name in f_names:
1878         os_path = utils.PathJoin(dir_name, name)
1879         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1880         if status:
1881           diagnose = ""
1882           variants = os_inst.supported_variants
1883           parameters = os_inst.supported_parameters
1884           api_versions = os_inst.api_versions
1885         else:
1886           diagnose = os_inst
1887           variants = parameters = api_versions = []
1888         result.append((name, os_path, status, diagnose, variants,
1889                        parameters, api_versions))
1890
1891   return result
1892
1893
1894 def _TryOSFromDisk(name, base_dir=None):
1895   """Create an OS instance from disk.
1896
1897   This function will return an OS instance if the given name is a
1898   valid OS name.
1899
1900   @type base_dir: string
1901   @keyword base_dir: Base directory containing OS installations.
1902                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1903   @rtype: tuple
1904   @return: success and either the OS instance if we find a valid one,
1905       or error message
1906
1907   """
1908   if base_dir is None:
1909     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1910   else:
1911     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1912
1913   if os_dir is None:
1914     return False, "Directory for OS %s not found in search path" % name
1915
1916   status, api_versions = _OSOndiskAPIVersion(os_dir)
1917   if not status:
1918     # push the error up
1919     return status, api_versions
1920
1921   if not constants.OS_API_VERSIONS.intersection(api_versions):
1922     return False, ("API version mismatch for path '%s': found %s, want %s." %
1923                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1924
1925   # OS Files dictionary, we will populate it with the absolute path names
1926   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1927
1928   if max(api_versions) >= constants.OS_API_V15:
1929     os_files[constants.OS_VARIANTS_FILE] = ''
1930
1931   if max(api_versions) >= constants.OS_API_V20:
1932     os_files[constants.OS_PARAMETERS_FILE] = ''
1933   else:
1934     del os_files[constants.OS_SCRIPT_VERIFY]
1935
1936   for filename in os_files:
1937     os_files[filename] = utils.PathJoin(os_dir, filename)
1938
1939     try:
1940       st = os.stat(os_files[filename])
1941     except EnvironmentError, err:
1942       return False, ("File '%s' under path '%s' is missing (%s)" %
1943                      (filename, os_dir, _ErrnoOrStr(err)))
1944
1945     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1946       return False, ("File '%s' under path '%s' is not a regular file" %
1947                      (filename, os_dir))
1948
1949     if filename in constants.OS_SCRIPTS:
1950       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1951         return False, ("File '%s' under path '%s' is not executable" %
1952                        (filename, os_dir))
1953
1954   variants = []
1955   if constants.OS_VARIANTS_FILE in os_files:
1956     variants_file = os_files[constants.OS_VARIANTS_FILE]
1957     try:
1958       variants = utils.ReadFile(variants_file).splitlines()
1959     except EnvironmentError, err:
1960       return False, ("Error while reading the OS variants file at %s: %s" %
1961                      (variants_file, _ErrnoOrStr(err)))
1962     if not variants:
1963       return False, ("No supported os variant found")
1964
1965   parameters = []
1966   if constants.OS_PARAMETERS_FILE in os_files:
1967     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1968     try:
1969       parameters = utils.ReadFile(parameters_file).splitlines()
1970     except EnvironmentError, err:
1971       return False, ("Error while reading the OS parameters file at %s: %s" %
1972                      (parameters_file, _ErrnoOrStr(err)))
1973     parameters = [v.split(None, 1) for v in parameters]
1974
1975   os_obj = objects.OS(name=name, path=os_dir,
1976                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1977                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1978                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1979                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1980                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1981                                                  None),
1982                       supported_variants=variants,
1983                       supported_parameters=parameters,
1984                       api_versions=api_versions)
1985   return True, os_obj
1986
1987
1988 def OSFromDisk(name, base_dir=None):
1989   """Create an OS instance from disk.
1990
1991   This function will return an OS instance if the given name is a
1992   valid OS name. Otherwise, it will raise an appropriate
1993   L{RPCFail} exception, detailing why this is not a valid OS.
1994
1995   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1996   an exception but returns true/false status data.
1997
1998   @type base_dir: string
1999   @keyword base_dir: Base directory containing OS installations.
2000                      Defaults to a search in all the OS_SEARCH_PATH dirs.
2001   @rtype: L{objects.OS}
2002   @return: the OS instance if we find a valid one
2003   @raise RPCFail: if we don't find a valid OS
2004
2005   """
2006   name_only = objects.OS.GetName(name)
2007   status, payload = _TryOSFromDisk(name_only, base_dir)
2008
2009   if not status:
2010     _Fail(payload)
2011
2012   return payload
2013
2014
2015 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
2016   """Calculate the basic environment for an os script.
2017
2018   @type os_name: str
2019   @param os_name: full operating system name (including variant)
2020   @type inst_os: L{objects.OS}
2021   @param inst_os: operating system for which the environment is being built
2022   @type os_params: dict
2023   @param os_params: the OS parameters
2024   @type debug: integer
2025   @param debug: debug level (0 or 1, for OS Api 10)
2026   @rtype: dict
2027   @return: dict of environment variables
2028   @raise errors.BlockDeviceError: if the block device
2029       cannot be found
2030
2031   """
2032   result = {}
2033   api_version = \
2034     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
2035   result['OS_API_VERSION'] = '%d' % api_version
2036   result['OS_NAME'] = inst_os.name
2037   result['DEBUG_LEVEL'] = '%d' % debug
2038
2039   # OS variants
2040   if api_version >= constants.OS_API_V15:
2041     variant = objects.OS.GetVariant(os_name)
2042     if not variant:
2043       variant = inst_os.supported_variants[0]
2044     result['OS_VARIANT'] = variant
2045
2046   # OS params
2047   for pname, pvalue in os_params.items():
2048     result['OSP_%s' % pname.upper()] = pvalue
2049
2050   return result
2051
2052
2053 def OSEnvironment(instance, inst_os, debug=0):
2054   """Calculate the environment for an os script.
2055
2056   @type instance: L{objects.Instance}
2057   @param instance: target instance for the os script run
2058   @type inst_os: L{objects.OS}
2059   @param inst_os: operating system for which the environment is being built
2060   @type debug: integer
2061   @param debug: debug level (0 or 1, for OS Api 10)
2062   @rtype: dict
2063   @return: dict of environment variables
2064   @raise errors.BlockDeviceError: if the block device
2065       cannot be found
2066
2067   """
2068   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
2069
2070   for attr in ["name", "os", "uuid", "ctime", "mtime"]:
2071     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2072
2073   result['HYPERVISOR'] = instance.hypervisor
2074   result['DISK_COUNT'] = '%d' % len(instance.disks)
2075   result['NIC_COUNT'] = '%d' % len(instance.nics)
2076
2077   # Disks
2078   for idx, disk in enumerate(instance.disks):
2079     real_disk = _OpenRealBD(disk)
2080     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2081     result['DISK_%d_ACCESS' % idx] = disk.mode
2082     if constants.HV_DISK_TYPE in instance.hvparams:
2083       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2084         instance.hvparams[constants.HV_DISK_TYPE]
2085     if disk.dev_type in constants.LDS_BLOCK:
2086       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2087     elif disk.dev_type == constants.LD_FILE:
2088       result['DISK_%d_BACKEND_TYPE' % idx] = \
2089         'file:%s' % disk.physical_id[0]
2090
2091   # NICs
2092   for idx, nic in enumerate(instance.nics):
2093     result['NIC_%d_MAC' % idx] = nic.mac
2094     if nic.ip:
2095       result['NIC_%d_IP' % idx] = nic.ip
2096     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2097     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2098       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2099     if nic.nicparams[constants.NIC_LINK]:
2100       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2101     if constants.HV_NIC_TYPE in instance.hvparams:
2102       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2103         instance.hvparams[constants.HV_NIC_TYPE]
2104
2105   # HV/BE params
2106   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2107     for key, value in source.items():
2108       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2109
2110   return result
2111
2112
2113 def BlockdevGrow(disk, amount):
2114   """Grow a stack of block devices.
2115
2116   This function is called recursively, with the childrens being the
2117   first ones to resize.
2118
2119   @type disk: L{objects.Disk}
2120   @param disk: the disk to be grown
2121   @rtype: (status, result)
2122   @return: a tuple with the status of the operation
2123       (True/False), and the errors message if status
2124       is False
2125
2126   """
2127   r_dev = _RecursiveFindBD(disk)
2128   if r_dev is None:
2129     _Fail("Cannot find block device %s", disk)
2130
2131   try:
2132     r_dev.Grow(amount)
2133   except errors.BlockDeviceError, err:
2134     _Fail("Failed to grow block device: %s", err, exc=True)
2135
2136
2137 def BlockdevSnapshot(disk):
2138   """Create a snapshot copy of a block device.
2139
2140   This function is called recursively, and the snapshot is actually created
2141   just for the leaf lvm backend device.
2142
2143   @type disk: L{objects.Disk}
2144   @param disk: the disk to be snapshotted
2145   @rtype: string
2146   @return: snapshot disk ID as (vg, lv)
2147
2148   """
2149   if disk.dev_type == constants.LD_DRBD8:
2150     if not disk.children:
2151       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2152             disk.unique_id)
2153     return BlockdevSnapshot(disk.children[0])
2154   elif disk.dev_type == constants.LD_LV:
2155     r_dev = _RecursiveFindBD(disk)
2156     if r_dev is not None:
2157       # FIXME: choose a saner value for the snapshot size
2158       # let's stay on the safe side and ask for the full size, for now
2159       return r_dev.Snapshot(disk.size)
2160     else:
2161       _Fail("Cannot find block device %s", disk)
2162   else:
2163     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2164           disk.unique_id, disk.dev_type)
2165
2166
2167 def FinalizeExport(instance, snap_disks):
2168   """Write out the export configuration information.
2169
2170   @type instance: L{objects.Instance}
2171   @param instance: the instance which we export, used for
2172       saving configuration
2173   @type snap_disks: list of L{objects.Disk}
2174   @param snap_disks: list of snapshot block devices, which
2175       will be used to get the actual name of the dump file
2176
2177   @rtype: None
2178
2179   """
2180   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2181   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2182
2183   config = objects.SerializableConfigParser()
2184
2185   config.add_section(constants.INISECT_EXP)
2186   config.set(constants.INISECT_EXP, 'version', '0')
2187   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2188   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2189   config.set(constants.INISECT_EXP, 'os', instance.os)
2190   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2191
2192   config.add_section(constants.INISECT_INS)
2193   config.set(constants.INISECT_INS, 'name', instance.name)
2194   config.set(constants.INISECT_INS, 'memory', '%d' %
2195              instance.beparams[constants.BE_MEMORY])
2196   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2197              instance.beparams[constants.BE_VCPUS])
2198   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2199   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2200
2201   nic_total = 0
2202   for nic_count, nic in enumerate(instance.nics):
2203     nic_total += 1
2204     config.set(constants.INISECT_INS, 'nic%d_mac' %
2205                nic_count, '%s' % nic.mac)
2206     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2207     for param in constants.NICS_PARAMETER_TYPES:
2208       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2209                  '%s' % nic.nicparams.get(param, None))
2210   # TODO: redundant: on load can read nics until it doesn't exist
2211   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2212
2213   disk_total = 0
2214   for disk_count, disk in enumerate(snap_disks):
2215     if disk:
2216       disk_total += 1
2217       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2218                  ('%s' % disk.iv_name))
2219       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2220                  ('%s' % disk.physical_id[1]))
2221       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2222                  ('%d' % disk.size))
2223
2224   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2225
2226   # New-style hypervisor/backend parameters
2227
2228   config.add_section(constants.INISECT_HYP)
2229   for name, value in instance.hvparams.items():
2230     if name not in constants.HVC_GLOBALS:
2231       config.set(constants.INISECT_HYP, name, str(value))
2232
2233   config.add_section(constants.INISECT_BEP)
2234   for name, value in instance.beparams.items():
2235     config.set(constants.INISECT_BEP, name, str(value))
2236
2237   config.add_section(constants.INISECT_OSP)
2238   for name, value in instance.osparams.items():
2239     config.set(constants.INISECT_OSP, name, str(value))
2240
2241   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2242                   data=config.Dumps())
2243   shutil.rmtree(finaldestdir, ignore_errors=True)
2244   shutil.move(destdir, finaldestdir)
2245
2246
2247 def ExportInfo(dest):
2248   """Get export configuration information.
2249
2250   @type dest: str
2251   @param dest: directory containing the export
2252
2253   @rtype: L{objects.SerializableConfigParser}
2254   @return: a serializable config file containing the
2255       export info
2256
2257   """
2258   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2259
2260   config = objects.SerializableConfigParser()
2261   config.read(cff)
2262
2263   if (not config.has_section(constants.INISECT_EXP) or
2264       not config.has_section(constants.INISECT_INS)):
2265     _Fail("Export info file doesn't have the required fields")
2266
2267   return config.Dumps()
2268
2269
2270 def ListExports():
2271   """Return a list of exports currently available on this machine.
2272
2273   @rtype: list
2274   @return: list of the exports
2275
2276   """
2277   if os.path.isdir(constants.EXPORT_DIR):
2278     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2279   else:
2280     _Fail("No exports directory")
2281
2282
2283 def RemoveExport(export):
2284   """Remove an existing export from the node.
2285
2286   @type export: str
2287   @param export: the name of the export to remove
2288   @rtype: None
2289
2290   """
2291   target = utils.PathJoin(constants.EXPORT_DIR, export)
2292
2293   try:
2294     shutil.rmtree(target)
2295   except EnvironmentError, err:
2296     _Fail("Error while removing the export: %s", err, exc=True)
2297
2298
2299 def BlockdevRename(devlist):
2300   """Rename a list of block devices.
2301
2302   @type devlist: list of tuples
2303   @param devlist: list of tuples of the form  (disk,
2304       new_logical_id, new_physical_id); disk is an
2305       L{objects.Disk} object describing the current disk,
2306       and new logical_id/physical_id is the name we
2307       rename it to
2308   @rtype: boolean
2309   @return: True if all renames succeeded, False otherwise
2310
2311   """
2312   msgs = []
2313   result = True
2314   for disk, unique_id in devlist:
2315     dev = _RecursiveFindBD(disk)
2316     if dev is None:
2317       msgs.append("Can't find device %s in rename" % str(disk))
2318       result = False
2319       continue
2320     try:
2321       old_rpath = dev.dev_path
2322       dev.Rename(unique_id)
2323       new_rpath = dev.dev_path
2324       if old_rpath != new_rpath:
2325         DevCacheManager.RemoveCache(old_rpath)
2326         # FIXME: we should add the new cache information here, like:
2327         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2328         # but we don't have the owner here - maybe parse from existing
2329         # cache? for now, we only lose lvm data when we rename, which
2330         # is less critical than DRBD or MD
2331     except errors.BlockDeviceError, err:
2332       msgs.append("Can't rename device '%s' to '%s': %s" %
2333                   (dev, unique_id, err))
2334       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2335       result = False
2336   if not result:
2337     _Fail("; ".join(msgs))
2338
2339
2340 def _TransformFileStorageDir(file_storage_dir):
2341   """Checks whether given file_storage_dir is valid.
2342
2343   Checks wheter the given file_storage_dir is within the cluster-wide
2344   default file_storage_dir stored in SimpleStore. Only paths under that
2345   directory are allowed.
2346
2347   @type file_storage_dir: str
2348   @param file_storage_dir: the path to check
2349
2350   @return: the normalized path if valid, None otherwise
2351
2352   """
2353   if not constants.ENABLE_FILE_STORAGE:
2354     _Fail("File storage disabled at configure time")
2355   cfg = _GetConfig()
2356   file_storage_dir = os.path.normpath(file_storage_dir)
2357   base_file_storage_dir = cfg.GetFileStorageDir()
2358   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2359       base_file_storage_dir):
2360     _Fail("File storage directory '%s' is not under base file"
2361           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2362   return file_storage_dir
2363
2364
2365 def CreateFileStorageDir(file_storage_dir):
2366   """Create file storage directory.
2367
2368   @type file_storage_dir: str
2369   @param file_storage_dir: directory to create
2370
2371   @rtype: tuple
2372   @return: tuple with first element a boolean indicating wheter dir
2373       creation was successful or not
2374
2375   """
2376   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2377   if os.path.exists(file_storage_dir):
2378     if not os.path.isdir(file_storage_dir):
2379       _Fail("Specified storage dir '%s' is not a directory",
2380             file_storage_dir)
2381   else:
2382     try:
2383       os.makedirs(file_storage_dir, 0750)
2384     except OSError, err:
2385       _Fail("Cannot create file storage directory '%s': %s",
2386             file_storage_dir, err, exc=True)
2387
2388
2389 def RemoveFileStorageDir(file_storage_dir):
2390   """Remove file storage directory.
2391
2392   Remove it only if it's empty. If not log an error and return.
2393
2394   @type file_storage_dir: str
2395   @param file_storage_dir: the directory we should cleanup
2396   @rtype: tuple (success,)
2397   @return: tuple of one element, C{success}, denoting
2398       whether the operation was successful
2399
2400   """
2401   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2402   if os.path.exists(file_storage_dir):
2403     if not os.path.isdir(file_storage_dir):
2404       _Fail("Specified Storage directory '%s' is not a directory",
2405             file_storage_dir)
2406     # deletes dir only if empty, otherwise we want to fail the rpc call
2407     try:
2408       os.rmdir(file_storage_dir)
2409     except OSError, err:
2410       _Fail("Cannot remove file storage directory '%s': %s",
2411             file_storage_dir, err)
2412
2413
2414 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2415   """Rename the file storage directory.
2416
2417   @type old_file_storage_dir: str
2418   @param old_file_storage_dir: the current path
2419   @type new_file_storage_dir: str
2420   @param new_file_storage_dir: the name we should rename to
2421   @rtype: tuple (success,)
2422   @return: tuple of one element, C{success}, denoting
2423       whether the operation was successful
2424
2425   """
2426   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2427   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2428   if not os.path.exists(new_file_storage_dir):
2429     if os.path.isdir(old_file_storage_dir):
2430       try:
2431         os.rename(old_file_storage_dir, new_file_storage_dir)
2432       except OSError, err:
2433         _Fail("Cannot rename '%s' to '%s': %s",
2434               old_file_storage_dir, new_file_storage_dir, err)
2435     else:
2436       _Fail("Specified storage dir '%s' is not a directory",
2437             old_file_storage_dir)
2438   else:
2439     if os.path.exists(old_file_storage_dir):
2440       _Fail("Cannot rename '%s' to '%s': both locations exist",
2441             old_file_storage_dir, new_file_storage_dir)
2442
2443
2444 def _EnsureJobQueueFile(file_name):
2445   """Checks whether the given filename is in the queue directory.
2446
2447   @type file_name: str
2448   @param file_name: the file name we should check
2449   @rtype: None
2450   @raises RPCFail: if the file is not valid
2451
2452   """
2453   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2454   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2455
2456   if not result:
2457     _Fail("Passed job queue file '%s' does not belong to"
2458           " the queue directory '%s'", file_name, queue_dir)
2459
2460
2461 def JobQueueUpdate(file_name, content):
2462   """Updates a file in the queue directory.
2463
2464   This is just a wrapper over L{utils.WriteFile}, with proper
2465   checking.
2466
2467   @type file_name: str
2468   @param file_name: the job file name
2469   @type content: str
2470   @param content: the new job contents
2471   @rtype: boolean
2472   @return: the success of the operation
2473
2474   """
2475   _EnsureJobQueueFile(file_name)
2476   getents = runtime.GetEnts()
2477
2478   # Write and replace the file atomically
2479   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2480                   gid=getents.masterd_gid)
2481
2482
2483 def JobQueueRename(old, new):
2484   """Renames a job queue file.
2485
2486   This is just a wrapper over os.rename with proper checking.
2487
2488   @type old: str
2489   @param old: the old (actual) file name
2490   @type new: str
2491   @param new: the desired file name
2492   @rtype: tuple
2493   @return: the success of the operation and payload
2494
2495   """
2496   _EnsureJobQueueFile(old)
2497   _EnsureJobQueueFile(new)
2498
2499   utils.RenameFile(old, new, mkdir=True)
2500
2501
2502 def BlockdevClose(instance_name, disks):
2503   """Closes the given block devices.
2504
2505   This means they will be switched to secondary mode (in case of
2506   DRBD).
2507
2508   @param instance_name: if the argument is not empty, the symlinks
2509       of this instance will be removed
2510   @type disks: list of L{objects.Disk}
2511   @param disks: the list of disks to be closed
2512   @rtype: tuple (success, message)
2513   @return: a tuple of success and message, where success
2514       indicates the succes of the operation, and message
2515       which will contain the error details in case we
2516       failed
2517
2518   """
2519   bdevs = []
2520   for cf in disks:
2521     rd = _RecursiveFindBD(cf)
2522     if rd is None:
2523       _Fail("Can't find device %s", cf)
2524     bdevs.append(rd)
2525
2526   msg = []
2527   for rd in bdevs:
2528     try:
2529       rd.Close()
2530     except errors.BlockDeviceError, err:
2531       msg.append(str(err))
2532   if msg:
2533     _Fail("Can't make devices secondary: %s", ",".join(msg))
2534   else:
2535     if instance_name:
2536       _RemoveBlockDevLinks(instance_name, disks)
2537
2538
2539 def ValidateHVParams(hvname, hvparams):
2540   """Validates the given hypervisor parameters.
2541
2542   @type hvname: string
2543   @param hvname: the hypervisor name
2544   @type hvparams: dict
2545   @param hvparams: the hypervisor parameters to be validated
2546   @rtype: None
2547
2548   """
2549   try:
2550     hv_type = hypervisor.GetHypervisor(hvname)
2551     hv_type.ValidateParameters(hvparams)
2552   except errors.HypervisorError, err:
2553     _Fail(str(err), log=False)
2554
2555
2556 def _CheckOSPList(os_obj, parameters):
2557   """Check whether a list of parameters is supported by the OS.
2558
2559   @type os_obj: L{objects.OS}
2560   @param os_obj: OS object to check
2561   @type parameters: list
2562   @param parameters: the list of parameters to check
2563
2564   """
2565   supported = [v[0] for v in os_obj.supported_parameters]
2566   delta = frozenset(parameters).difference(supported)
2567   if delta:
2568     _Fail("The following parameters are not supported"
2569           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2570
2571
2572 def ValidateOS(required, osname, checks, osparams):
2573   """Validate the given OS' parameters.
2574
2575   @type required: boolean
2576   @param required: whether absence of the OS should translate into
2577       failure or not
2578   @type osname: string
2579   @param osname: the OS to be validated
2580   @type checks: list
2581   @param checks: list of the checks to run (currently only 'parameters')
2582   @type osparams: dict
2583   @param osparams: dictionary with OS parameters
2584   @rtype: boolean
2585   @return: True if the validation passed, or False if the OS was not
2586       found and L{required} was false
2587
2588   """
2589   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2590     _Fail("Unknown checks required for OS %s: %s", osname,
2591           set(checks).difference(constants.OS_VALIDATE_CALLS))
2592
2593   name_only = objects.OS.GetName(osname)
2594   status, tbv = _TryOSFromDisk(name_only, None)
2595
2596   if not status:
2597     if required:
2598       _Fail(tbv)
2599     else:
2600       return False
2601
2602   if max(tbv.api_versions) < constants.OS_API_V20:
2603     return True
2604
2605   if constants.OS_VALIDATE_PARAMETERS in checks:
2606     _CheckOSPList(tbv, osparams.keys())
2607
2608   validate_env = OSCoreEnv(osname, tbv, osparams)
2609   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2610                         cwd=tbv.path)
2611   if result.failed:
2612     logging.error("os validate command '%s' returned error: %s output: %s",
2613                   result.cmd, result.fail_reason, result.output)
2614     _Fail("OS validation script failed (%s), output: %s",
2615           result.fail_reason, result.output, log=False)
2616
2617   return True
2618
2619
2620 def DemoteFromMC():
2621   """Demotes the current node from master candidate role.
2622
2623   """
2624   # try to ensure we're not the master by mistake
2625   master, myself = ssconf.GetMasterAndMyself()
2626   if master == myself:
2627     _Fail("ssconf status shows I'm the master node, will not demote")
2628
2629   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2630   if not result.failed:
2631     _Fail("The master daemon is running, will not demote")
2632
2633   try:
2634     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2635       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2636   except EnvironmentError, err:
2637     if err.errno != errno.ENOENT:
2638       _Fail("Error while backing up cluster file: %s", err, exc=True)
2639
2640   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2641
2642
2643 def _GetX509Filenames(cryptodir, name):
2644   """Returns the full paths for the private key and certificate.
2645
2646   """
2647   return (utils.PathJoin(cryptodir, name),
2648           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2649           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2650
2651
2652 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2653   """Creates a new X509 certificate for SSL/TLS.
2654
2655   @type validity: int
2656   @param validity: Validity in seconds
2657   @rtype: tuple; (string, string)
2658   @return: Certificate name and public part
2659
2660   """
2661   (key_pem, cert_pem) = \
2662     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2663                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2664
2665   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2666                               prefix="x509-%s-" % utils.TimestampForFilename())
2667   try:
2668     name = os.path.basename(cert_dir)
2669     assert len(name) > 5
2670
2671     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2672
2673     utils.WriteFile(key_file, mode=0400, data=key_pem)
2674     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2675
2676     # Never return private key as it shouldn't leave the node
2677     return (name, cert_pem)
2678   except Exception:
2679     shutil.rmtree(cert_dir, ignore_errors=True)
2680     raise
2681
2682
2683 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2684   """Removes a X509 certificate.
2685
2686   @type name: string
2687   @param name: Certificate name
2688
2689   """
2690   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2691
2692   utils.RemoveFile(key_file)
2693   utils.RemoveFile(cert_file)
2694
2695   try:
2696     os.rmdir(cert_dir)
2697   except EnvironmentError, err:
2698     _Fail("Cannot remove certificate directory '%s': %s",
2699           cert_dir, err)
2700
2701
2702 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2703   """Returns the command for the requested input/output.
2704
2705   @type instance: L{objects.Instance}
2706   @param instance: The instance object
2707   @param mode: Import/export mode
2708   @param ieio: Input/output type
2709   @param ieargs: Input/output arguments
2710
2711   """
2712   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2713
2714   env = None
2715   prefix = None
2716   suffix = None
2717   exp_size = None
2718
2719   if ieio == constants.IEIO_FILE:
2720     (filename, ) = ieargs
2721
2722     if not utils.IsNormAbsPath(filename):
2723       _Fail("Path '%s' is not normalized or absolute", filename)
2724
2725     directory = os.path.normpath(os.path.dirname(filename))
2726
2727     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2728         constants.EXPORT_DIR):
2729       _Fail("File '%s' is not under exports directory '%s'",
2730             filename, constants.EXPORT_DIR)
2731
2732     # Create directory
2733     utils.Makedirs(directory, mode=0750)
2734
2735     quoted_filename = utils.ShellQuote(filename)
2736
2737     if mode == constants.IEM_IMPORT:
2738       suffix = "> %s" % quoted_filename
2739     elif mode == constants.IEM_EXPORT:
2740       suffix = "< %s" % quoted_filename
2741
2742       # Retrieve file size
2743       try:
2744         st = os.stat(filename)
2745       except EnvironmentError, err:
2746         logging.error("Can't stat(2) %s: %s", filename, err)
2747       else:
2748         exp_size = utils.BytesToMebibyte(st.st_size)
2749
2750   elif ieio == constants.IEIO_RAW_DISK:
2751     (disk, ) = ieargs
2752
2753     real_disk = _OpenRealBD(disk)
2754
2755     if mode == constants.IEM_IMPORT:
2756       # we set here a smaller block size as, due to transport buffering, more
2757       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2758       # is not already there or we pass a wrong path; we use notrunc to no
2759       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2760       # much memory; this means that at best, we flush every 64k, which will
2761       # not be very fast
2762       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2763                                     " bs=%s oflag=dsync"),
2764                                     real_disk.dev_path,
2765                                     str(64 * 1024))
2766
2767     elif mode == constants.IEM_EXPORT:
2768       # the block size on the read dd is 1MiB to match our units
2769       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2770                                    real_disk.dev_path,
2771                                    str(1024 * 1024), # 1 MB
2772                                    str(disk.size))
2773       exp_size = disk.size
2774
2775   elif ieio == constants.IEIO_SCRIPT:
2776     (disk, disk_index, ) = ieargs
2777
2778     assert isinstance(disk_index, (int, long))
2779
2780     real_disk = _OpenRealBD(disk)
2781
2782     inst_os = OSFromDisk(instance.os)
2783     env = OSEnvironment(instance, inst_os)
2784
2785     if mode == constants.IEM_IMPORT:
2786       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2787       env["IMPORT_INDEX"] = str(disk_index)
2788       script = inst_os.import_script
2789
2790     elif mode == constants.IEM_EXPORT:
2791       env["EXPORT_DEVICE"] = real_disk.dev_path
2792       env["EXPORT_INDEX"] = str(disk_index)
2793       script = inst_os.export_script
2794
2795     # TODO: Pass special environment only to script
2796     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2797
2798     if mode == constants.IEM_IMPORT:
2799       suffix = "| %s" % script_cmd
2800
2801     elif mode == constants.IEM_EXPORT:
2802       prefix = "%s |" % script_cmd
2803
2804     # Let script predict size
2805     exp_size = constants.IE_CUSTOM_SIZE
2806
2807   else:
2808     _Fail("Invalid %s I/O mode %r", mode, ieio)
2809
2810   return (env, prefix, suffix, exp_size)
2811
2812
2813 def _CreateImportExportStatusDir(prefix):
2814   """Creates status directory for import/export.
2815
2816   """
2817   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2818                           prefix=("%s-%s-" %
2819                                   (prefix, utils.TimestampForFilename())))
2820
2821
2822 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2823   """Starts an import or export daemon.
2824
2825   @param mode: Import/output mode
2826   @type opts: L{objects.ImportExportOptions}
2827   @param opts: Daemon options
2828   @type host: string
2829   @param host: Remote host for export (None for import)
2830   @type port: int
2831   @param port: Remote port for export (None for import)
2832   @type instance: L{objects.Instance}
2833   @param instance: Instance object
2834   @param ieio: Input/output type
2835   @param ieioargs: Input/output arguments
2836
2837   """
2838   if mode == constants.IEM_IMPORT:
2839     prefix = "import"
2840
2841     if not (host is None and port is None):
2842       _Fail("Can not specify host or port on import")
2843
2844   elif mode == constants.IEM_EXPORT:
2845     prefix = "export"
2846
2847     if host is None or port is None:
2848       _Fail("Host and port must be specified for an export")
2849
2850   else:
2851     _Fail("Invalid mode %r", mode)
2852
2853   if (opts.key_name is None) ^ (opts.ca_pem is None):
2854     _Fail("Cluster certificate can only be used for both key and CA")
2855
2856   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2857     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2858
2859   if opts.key_name is None:
2860     # Use server.pem
2861     key_path = constants.NODED_CERT_FILE
2862     cert_path = constants.NODED_CERT_FILE
2863     assert opts.ca_pem is None
2864   else:
2865     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2866                                                  opts.key_name)
2867     assert opts.ca_pem is not None
2868
2869   for i in [key_path, cert_path]:
2870     if not os.path.exists(i):
2871       _Fail("File '%s' does not exist" % i)
2872
2873   status_dir = _CreateImportExportStatusDir(prefix)
2874   try:
2875     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2876     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2877     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2878
2879     if opts.ca_pem is None:
2880       # Use server.pem
2881       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2882     else:
2883       ca = opts.ca_pem
2884
2885     # Write CA file
2886     utils.WriteFile(ca_file, data=ca, mode=0400)
2887
2888     cmd = [
2889       constants.IMPORT_EXPORT_DAEMON,
2890       status_file, mode,
2891       "--key=%s" % key_path,
2892       "--cert=%s" % cert_path,
2893       "--ca=%s" % ca_file,
2894       ]
2895
2896     if host:
2897       cmd.append("--host=%s" % host)
2898
2899     if port:
2900       cmd.append("--port=%s" % port)
2901
2902     if opts.ipv6:
2903       cmd.append("--ipv6")
2904     else:
2905       cmd.append("--ipv4")
2906
2907     if opts.compress:
2908       cmd.append("--compress=%s" % opts.compress)
2909
2910     if opts.magic:
2911       cmd.append("--magic=%s" % opts.magic)
2912
2913     if exp_size is not None:
2914       cmd.append("--expected-size=%s" % exp_size)
2915
2916     if cmd_prefix:
2917       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2918
2919     if cmd_suffix:
2920       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2921
2922     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2923
2924     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2925     # support for receiving a file descriptor for output
2926     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2927                       output=logfile)
2928
2929     # The import/export name is simply the status directory name
2930     return os.path.basename(status_dir)
2931
2932   except Exception:
2933     shutil.rmtree(status_dir, ignore_errors=True)
2934     raise
2935
2936
2937 def GetImportExportStatus(names):
2938   """Returns import/export daemon status.
2939
2940   @type names: sequence
2941   @param names: List of names
2942   @rtype: List of dicts
2943   @return: Returns a list of the state of each named import/export or None if a
2944            status couldn't be read
2945
2946   """
2947   result = []
2948
2949   for name in names:
2950     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2951                                  _IES_STATUS_FILE)
2952
2953     try:
2954       data = utils.ReadFile(status_file)
2955     except EnvironmentError, err:
2956       if err.errno != errno.ENOENT:
2957         raise
2958       data = None
2959
2960     if not data:
2961       result.append(None)
2962       continue
2963
2964     result.append(serializer.LoadJson(data))
2965
2966   return result
2967
2968
2969 def AbortImportExport(name):
2970   """Sends SIGTERM to a running import/export daemon.
2971
2972   """
2973   logging.info("Abort import/export %s", name)
2974
2975   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2976   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2977
2978   if pid:
2979     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2980                  name, pid)
2981     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2982
2983
2984 def CleanupImportExport(name):
2985   """Cleanup after an import or export.
2986
2987   If the import/export daemon is still running it's killed. Afterwards the
2988   whole status directory is removed.
2989
2990   """
2991   logging.info("Finalizing import/export %s", name)
2992
2993   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2994
2995   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2996
2997   if pid:
2998     logging.info("Import/export %s is still running with PID %s",
2999                  name, pid)
3000     utils.KillProcess(pid, waitpid=False)
3001
3002   shutil.rmtree(status_dir, ignore_errors=True)
3003
3004
3005 def _FindDisks(nodes_ip, disks):
3006   """Sets the physical ID on disks and returns the block devices.
3007
3008   """
3009   # set the correct physical ID
3010   my_name = netutils.Hostname.GetSysName()
3011   for cf in disks:
3012     cf.SetPhysicalID(my_name, nodes_ip)
3013
3014   bdevs = []
3015
3016   for cf in disks:
3017     rd = _RecursiveFindBD(cf)
3018     if rd is None:
3019       _Fail("Can't find device %s", cf)
3020     bdevs.append(rd)
3021   return bdevs
3022
3023
3024 def DrbdDisconnectNet(nodes_ip, disks):
3025   """Disconnects the network on a list of drbd devices.
3026
3027   """
3028   bdevs = _FindDisks(nodes_ip, disks)
3029
3030   # disconnect disks
3031   for rd in bdevs:
3032     try:
3033       rd.DisconnectNet()
3034     except errors.BlockDeviceError, err:
3035       _Fail("Can't change network configuration to standalone mode: %s",
3036             err, exc=True)
3037
3038
3039 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
3040   """Attaches the network on a list of drbd devices.
3041
3042   """
3043   bdevs = _FindDisks(nodes_ip, disks)
3044
3045   if multimaster:
3046     for idx, rd in enumerate(bdevs):
3047       try:
3048         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
3049       except EnvironmentError, err:
3050         _Fail("Can't create symlink: %s", err)
3051   # reconnect disks, switch to new master configuration and if
3052   # needed primary mode
3053   for rd in bdevs:
3054     try:
3055       rd.AttachNet(multimaster)
3056     except errors.BlockDeviceError, err:
3057       _Fail("Can't change network configuration: %s", err)
3058
3059   # wait until the disks are connected; we need to retry the re-attach
3060   # if the device becomes standalone, as this might happen if the one
3061   # node disconnects and reconnects in a different mode before the
3062   # other node reconnects; in this case, one or both of the nodes will
3063   # decide it has wrong configuration and switch to standalone
3064
3065   def _Attach():
3066     all_connected = True
3067
3068     for rd in bdevs:
3069       stats = rd.GetProcStatus()
3070
3071       all_connected = (all_connected and
3072                        (stats.is_connected or stats.is_in_resync))
3073
3074       if stats.is_standalone:
3075         # peer had different config info and this node became
3076         # standalone, even though this should not happen with the
3077         # new staged way of changing disk configs
3078         try:
3079           rd.AttachNet(multimaster)
3080         except errors.BlockDeviceError, err:
3081           _Fail("Can't change network configuration: %s", err)
3082
3083     if not all_connected:
3084       raise utils.RetryAgain()
3085
3086   try:
3087     # Start with a delay of 100 miliseconds and go up to 5 seconds
3088     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3089   except utils.RetryTimeout:
3090     _Fail("Timeout in disk reconnecting")
3091
3092   if multimaster:
3093     # change to primary mode
3094     for rd in bdevs:
3095       try:
3096         rd.Open()
3097       except errors.BlockDeviceError, err:
3098         _Fail("Can't change to primary mode: %s", err)
3099
3100
3101 def DrbdWaitSync(nodes_ip, disks):
3102   """Wait until DRBDs have synchronized.
3103
3104   """
3105   def _helper(rd):
3106     stats = rd.GetProcStatus()
3107     if not (stats.is_connected or stats.is_in_resync):
3108       raise utils.RetryAgain()
3109     return stats
3110
3111   bdevs = _FindDisks(nodes_ip, disks)
3112
3113   min_resync = 100
3114   alldone = True
3115   for rd in bdevs:
3116     try:
3117       # poll each second for 15 seconds
3118       stats = utils.Retry(_helper, 1, 15, args=[rd])
3119     except utils.RetryTimeout:
3120       stats = rd.GetProcStatus()
3121       # last check
3122       if not (stats.is_connected or stats.is_in_resync):
3123         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3124     alldone = alldone and (not stats.is_in_resync)
3125     if stats.sync_percent is not None:
3126       min_resync = min(min_resync, stats.sync_percent)
3127
3128   return (alldone, min_resync)
3129
3130
3131 def GetDrbdUsermodeHelper():
3132   """Returns DRBD usermode helper currently configured.
3133
3134   """
3135   try:
3136     return bdev.BaseDRBD.GetUsermodeHelper()
3137   except errors.BlockDeviceError, err:
3138     _Fail(str(err))
3139
3140
3141 def PowercycleNode(hypervisor_type):
3142   """Hard-powercycle the node.
3143
3144   Because we need to return first, and schedule the powercycle in the
3145   background, we won't be able to report failures nicely.
3146
3147   """
3148   hyper = hypervisor.GetHypervisor(hypervisor_type)
3149   try:
3150     pid = os.fork()
3151   except OSError:
3152     # if we can't fork, we'll pretend that we're in the child process
3153     pid = 0
3154   if pid > 0:
3155     return "Reboot scheduled in 5 seconds"
3156   # ensure the child is running on ram
3157   try:
3158     utils.Mlockall()
3159   except Exception: # pylint: disable-msg=W0703
3160     pass
3161   time.sleep(5)
3162   hyper.PowercycleNode()
3163
3164
3165 class HooksRunner(object):
3166   """Hook runner.
3167
3168   This class is instantiated on the node side (ganeti-noded) and not
3169   on the master side.
3170
3171   """
3172   def __init__(self, hooks_base_dir=None):
3173     """Constructor for hooks runner.
3174
3175     @type hooks_base_dir: str or None
3176     @param hooks_base_dir: if not None, this overrides the
3177         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3178
3179     """
3180     if hooks_base_dir is None:
3181       hooks_base_dir = constants.HOOKS_BASE_DIR
3182     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3183     # constant
3184     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3185
3186   def RunHooks(self, hpath, phase, env):
3187     """Run the scripts in the hooks directory.
3188
3189     @type hpath: str
3190     @param hpath: the path to the hooks directory which
3191         holds the scripts
3192     @type phase: str
3193     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3194         L{constants.HOOKS_PHASE_POST}
3195     @type env: dict
3196     @param env: dictionary with the environment for the hook
3197     @rtype: list
3198     @return: list of 3-element tuples:
3199       - script path
3200       - script result, either L{constants.HKR_SUCCESS} or
3201         L{constants.HKR_FAIL}
3202       - output of the script
3203
3204     @raise errors.ProgrammerError: for invalid input
3205         parameters
3206
3207     """
3208     if phase == constants.HOOKS_PHASE_PRE:
3209       suffix = "pre"
3210     elif phase == constants.HOOKS_PHASE_POST:
3211       suffix = "post"
3212     else:
3213       _Fail("Unknown hooks phase '%s'", phase)
3214
3215
3216     subdir = "%s-%s.d" % (hpath, suffix)
3217     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3218
3219     results = []
3220
3221     if not os.path.isdir(dir_name):
3222       # for non-existing/non-dirs, we simply exit instead of logging a
3223       # warning at every operation
3224       return results
3225
3226     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3227
3228     for (relname, relstatus, runresult)  in runparts_results:
3229       if relstatus == constants.RUNPARTS_SKIP:
3230         rrval = constants.HKR_SKIP
3231         output = ""
3232       elif relstatus == constants.RUNPARTS_ERR:
3233         rrval = constants.HKR_FAIL
3234         output = "Hook script execution error: %s" % runresult
3235       elif relstatus == constants.RUNPARTS_RUN:
3236         if runresult.failed:
3237           rrval = constants.HKR_FAIL
3238         else:
3239           rrval = constants.HKR_SUCCESS
3240         output = utils.SafeEncode(runresult.output.strip())
3241       results.append(("%s/%s" % (subdir, relname), rrval, output))
3242
3243     return results
3244
3245
3246 class IAllocatorRunner(object):
3247   """IAllocator runner.
3248
3249   This class is instantiated on the node side (ganeti-noded) and not on
3250   the master side.
3251
3252   """
3253   @staticmethod
3254   def Run(name, idata):
3255     """Run an iallocator script.
3256
3257     @type name: str
3258     @param name: the iallocator script name
3259     @type idata: str
3260     @param idata: the allocator input data
3261
3262     @rtype: tuple
3263     @return: two element tuple of:
3264        - status
3265        - either error message or stdout of allocator (for success)
3266
3267     """
3268     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3269                                   os.path.isfile)
3270     if alloc_script is None:
3271       _Fail("iallocator module '%s' not found in the search path", name)
3272
3273     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3274     try:
3275       os.write(fd, idata)
3276       os.close(fd)
3277       result = utils.RunCmd([alloc_script, fin_name])
3278       if result.failed:
3279         _Fail("iallocator module '%s' failed: %s, output '%s'",
3280               name, result.fail_reason, result.output)
3281     finally:
3282       os.unlink(fin_name)
3283
3284     return result.stdout
3285
3286
3287 class DevCacheManager(object):
3288   """Simple class for managing a cache of block device information.
3289
3290   """
3291   _DEV_PREFIX = "/dev/"
3292   _ROOT_DIR = constants.BDEV_CACHE_DIR
3293
3294   @classmethod
3295   def _ConvertPath(cls, dev_path):
3296     """Converts a /dev/name path to the cache file name.
3297
3298     This replaces slashes with underscores and strips the /dev
3299     prefix. It then returns the full path to the cache file.
3300
3301     @type dev_path: str
3302     @param dev_path: the C{/dev/} path name
3303     @rtype: str
3304     @return: the converted path name
3305
3306     """
3307     if dev_path.startswith(cls._DEV_PREFIX):
3308       dev_path = dev_path[len(cls._DEV_PREFIX):]
3309     dev_path = dev_path.replace("/", "_")
3310     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3311     return fpath
3312
3313   @classmethod
3314   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3315     """Updates the cache information for a given device.
3316
3317     @type dev_path: str
3318     @param dev_path: the pathname of the device
3319     @type owner: str
3320     @param owner: the owner (instance name) of the device
3321     @type on_primary: bool
3322     @param on_primary: whether this is the primary
3323         node nor not
3324     @type iv_name: str
3325     @param iv_name: the instance-visible name of the
3326         device, as in objects.Disk.iv_name
3327
3328     @rtype: None
3329
3330     """
3331     if dev_path is None:
3332       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3333       return
3334     fpath = cls._ConvertPath(dev_path)
3335     if on_primary:
3336       state = "primary"
3337     else:
3338       state = "secondary"
3339     if iv_name is None:
3340       iv_name = "not_visible"
3341     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3342     try:
3343       utils.WriteFile(fpath, data=fdata)
3344     except EnvironmentError, err:
3345       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3346
3347   @classmethod
3348   def RemoveCache(cls, dev_path):
3349     """Remove data for a dev_path.
3350
3351     This is just a wrapper over L{utils.RemoveFile} with a converted
3352     path name and logging.
3353
3354     @type dev_path: str
3355     @param dev_path: the pathname of the device
3356
3357     @rtype: None
3358
3359     """
3360     if dev_path is None:
3361       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3362       return
3363     fpath = cls._ConvertPath(dev_path)
3364     try:
3365       utils.RemoveFile(fpath)
3366     except EnvironmentError, err:
3367       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)