Add drbd_helper rpc call
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61
62
63 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
64 _ALLOWED_CLEAN_DIRS = frozenset([
65   constants.DATA_DIR,
66   constants.JOB_QUEUE_ARCHIVE_DIR,
67   constants.QUEUE_DIR,
68   constants.CRYPTO_KEYS_DIR,
69   ])
70 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
71 _X509_KEY_FILE = "key"
72 _X509_CERT_FILE = "cert"
73 _IES_STATUS_FILE = "status"
74 _IES_PID_FILE = "pid"
75 _IES_CA_FILE = "ca"
76
77
78 class RPCFail(Exception):
79   """Class denoting RPC failure.
80
81   Its argument is the error message.
82
83   """
84
85
86 def _Fail(msg, *args, **kwargs):
87   """Log an error and the raise an RPCFail exception.
88
89   This exception is then handled specially in the ganeti daemon and
90   turned into a 'failed' return type. As such, this function is a
91   useful shortcut for logging the error and returning it to the master
92   daemon.
93
94   @type msg: string
95   @param msg: the text of the exception
96   @raise RPCFail
97
98   """
99   if args:
100     msg = msg % args
101   if "log" not in kwargs or kwargs["log"]: # if we should log this error
102     if "exc" in kwargs and kwargs["exc"]:
103       logging.exception(msg)
104     else:
105       logging.error(msg)
106   raise RPCFail(msg)
107
108
109 def _GetConfig():
110   """Simple wrapper to return a SimpleStore.
111
112   @rtype: L{ssconf.SimpleStore}
113   @return: a SimpleStore instance
114
115   """
116   return ssconf.SimpleStore()
117
118
119 def _GetSshRunner(cluster_name):
120   """Simple wrapper to return an SshRunner.
121
122   @type cluster_name: str
123   @param cluster_name: the cluster name, which is needed
124       by the SshRunner constructor
125   @rtype: L{ssh.SshRunner}
126   @return: an SshRunner instance
127
128   """
129   return ssh.SshRunner(cluster_name)
130
131
132 def _Decompress(data):
133   """Unpacks data compressed by the RPC client.
134
135   @type data: list or tuple
136   @param data: Data sent by RPC client
137   @rtype: str
138   @return: Decompressed data
139
140   """
141   assert isinstance(data, (list, tuple))
142   assert len(data) == 2
143   (encoding, content) = data
144   if encoding == constants.RPC_ENCODING_NONE:
145     return content
146   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
147     return zlib.decompress(base64.b64decode(content))
148   else:
149     raise AssertionError("Unknown data encoding")
150
151
152 def _CleanDirectory(path, exclude=None):
153   """Removes all regular files in a directory.
154
155   @type path: str
156   @param path: the directory to clean
157   @type exclude: list
158   @param exclude: list of files to be excluded, defaults
159       to the empty list
160
161   """
162   if path not in _ALLOWED_CLEAN_DIRS:
163     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
164           path)
165
166   if not os.path.isdir(path):
167     return
168   if exclude is None:
169     exclude = []
170   else:
171     # Normalize excluded paths
172     exclude = [os.path.normpath(i) for i in exclude]
173
174   for rel_name in utils.ListVisibleFiles(path):
175     full_name = utils.PathJoin(path, rel_name)
176     if full_name in exclude:
177       continue
178     if os.path.isfile(full_name) and not os.path.islink(full_name):
179       utils.RemoveFile(full_name)
180
181
182 def _BuildUploadFileList():
183   """Build the list of allowed upload files.
184
185   This is abstracted so that it's built only once at module import time.
186
187   """
188   allowed_files = set([
189     constants.CLUSTER_CONF_FILE,
190     constants.ETC_HOSTS,
191     constants.SSH_KNOWN_HOSTS_FILE,
192     constants.VNC_PASSWORD_FILE,
193     constants.RAPI_CERT_FILE,
194     constants.RAPI_USERS_FILE,
195     constants.CONFD_HMAC_KEY,
196     constants.CLUSTER_DOMAIN_SECRET_FILE,
197     ])
198
199   for hv_name in constants.HYPER_TYPES:
200     hv_class = hypervisor.GetHypervisorClass(hv_name)
201     allowed_files.update(hv_class.GetAncillaryFiles())
202
203   return frozenset(allowed_files)
204
205
206 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
207
208
209 def JobQueuePurge():
210   """Removes job queue files and archived jobs.
211
212   @rtype: tuple
213   @return: True, None
214
215   """
216   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
217   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
218
219
220 def GetMasterInfo():
221   """Returns master information.
222
223   This is an utility function to compute master information, either
224   for consumption here or from the node daemon.
225
226   @rtype: tuple
227   @return: master_netdev, master_ip, master_name
228   @raise RPCFail: in case of errors
229
230   """
231   try:
232     cfg = _GetConfig()
233     master_netdev = cfg.GetMasterNetdev()
234     master_ip = cfg.GetMasterIP()
235     master_node = cfg.GetMasterNode()
236   except errors.ConfigurationError, err:
237     _Fail("Cluster configuration incomplete: %s", err, exc=True)
238   return (master_netdev, master_ip, master_node)
239
240
241 def StartMaster(start_daemons, no_voting):
242   """Activate local node as master node.
243
244   The function will always try activate the IP address of the master
245   (unless someone else has it). It will also start the master daemons,
246   based on the start_daemons parameter.
247
248   @type start_daemons: boolean
249   @param start_daemons: whether to also start the master
250       daemons (ganeti-masterd and ganeti-rapi)
251   @type no_voting: boolean
252   @param no_voting: whether to start ganeti-masterd without a node vote
253       (if start_daemons is True), but still non-interactively
254   @rtype: None
255
256   """
257   # GetMasterInfo will raise an exception if not able to return data
258   master_netdev, master_ip, _ = GetMasterInfo()
259
260   err_msgs = []
261   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
262     if utils.OwnIpAddress(master_ip):
263       # we already have the ip:
264       logging.debug("Master IP already configured, doing nothing")
265     else:
266       msg = "Someone else has the master ip, not activating"
267       logging.error(msg)
268       err_msgs.append(msg)
269   else:
270     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
271                            "dev", master_netdev, "label",
272                            "%s:0" % master_netdev])
273     if result.failed:
274       msg = "Can't activate master IP: %s" % result.output
275       logging.error(msg)
276       err_msgs.append(msg)
277
278     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
279                            "-s", master_ip, master_ip])
280     # we'll ignore the exit code of arping
281
282   # and now start the master and rapi daemons
283   if start_daemons:
284     if no_voting:
285       masterd_args = "--no-voting --yes-do-it"
286     else:
287       masterd_args = ""
288
289     env = {
290       "EXTRA_MASTERD_ARGS": masterd_args,
291       }
292
293     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
294     if result.failed:
295       msg = "Can't start Ganeti master: %s" % result.output
296       logging.error(msg)
297       err_msgs.append(msg)
298
299   if err_msgs:
300     _Fail("; ".join(err_msgs))
301
302
303 def StopMaster(stop_daemons):
304   """Deactivate this node as master.
305
306   The function will always try to deactivate the IP address of the
307   master. It will also stop the master daemons depending on the
308   stop_daemons parameter.
309
310   @type stop_daemons: boolean
311   @param stop_daemons: whether to also stop the master daemons
312       (ganeti-masterd and ganeti-rapi)
313   @rtype: None
314
315   """
316   # TODO: log and report back to the caller the error failures; we
317   # need to decide in which case we fail the RPC for this
318
319   # GetMasterInfo will raise an exception if not able to return data
320   master_netdev, master_ip, _ = GetMasterInfo()
321
322   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
323                          "dev", master_netdev])
324   if result.failed:
325     logging.error("Can't remove the master IP, error: %s", result.output)
326     # but otherwise ignore the failure
327
328   if stop_daemons:
329     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
330     if result.failed:
331       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
332                     " and error %s",
333                     result.cmd, result.exit_code, result.output)
334
335
336 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
337   """Joins this node to the cluster.
338
339   This does the following:
340       - updates the hostkeys of the machine (rsa and dsa)
341       - adds the ssh private key to the user
342       - adds the ssh public key to the users' authorized_keys file
343
344   @type dsa: str
345   @param dsa: the DSA private key to write
346   @type dsapub: str
347   @param dsapub: the DSA public key to write
348   @type rsa: str
349   @param rsa: the RSA private key to write
350   @type rsapub: str
351   @param rsapub: the RSA public key to write
352   @type sshkey: str
353   @param sshkey: the SSH private key to write
354   @type sshpub: str
355   @param sshpub: the SSH public key to write
356   @rtype: boolean
357   @return: the success of the operation
358
359   """
360   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
361                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
362                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
363                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
364   for name, content, mode in sshd_keys:
365     utils.WriteFile(name, data=content, mode=mode)
366
367   try:
368     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
369                                                     mkdir=True)
370   except errors.OpExecError, err:
371     _Fail("Error while processing user ssh files: %s", err, exc=True)
372
373   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
374     utils.WriteFile(name, data=content, mode=0600)
375
376   utils.AddAuthorizedKey(auth_keys, sshpub)
377
378   result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
379   if result.failed:
380     _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
381           result.cmd, result.exit_code, result.output)
382
383
384 def LeaveCluster(modify_ssh_setup):
385   """Cleans up and remove the current node.
386
387   This function cleans up and prepares the current node to be removed
388   from the cluster.
389
390   If processing is successful, then it raises an
391   L{errors.QuitGanetiException} which is used as a special case to
392   shutdown the node daemon.
393
394   @param modify_ssh_setup: boolean
395
396   """
397   _CleanDirectory(constants.DATA_DIR)
398   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
399   JobQueuePurge()
400
401   if modify_ssh_setup:
402     try:
403       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
404
405       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
406
407       utils.RemoveFile(priv_key)
408       utils.RemoveFile(pub_key)
409     except errors.OpExecError:
410       logging.exception("Error while processing ssh files")
411
412   try:
413     utils.RemoveFile(constants.CONFD_HMAC_KEY)
414     utils.RemoveFile(constants.RAPI_CERT_FILE)
415     utils.RemoveFile(constants.NODED_CERT_FILE)
416   except: # pylint: disable-msg=W0702
417     logging.exception("Error while removing cluster secrets")
418
419   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
420   if result.failed:
421     logging.error("Command %s failed with exitcode %s and error %s",
422                   result.cmd, result.exit_code, result.output)
423
424   # Raise a custom exception (handled in ganeti-noded)
425   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
426
427
428 def GetNodeInfo(vgname, hypervisor_type):
429   """Gives back a hash with different information about the node.
430
431   @type vgname: C{string}
432   @param vgname: the name of the volume group to ask for disk space information
433   @type hypervisor_type: C{str}
434   @param hypervisor_type: the name of the hypervisor to ask for
435       memory information
436   @rtype: C{dict}
437   @return: dictionary with the following keys:
438       - vg_size is the size of the configured volume group in MiB
439       - vg_free is the free size of the volume group in MiB
440       - memory_dom0 is the memory allocated for domain0 in MiB
441       - memory_free is the currently available (free) ram in MiB
442       - memory_total is the total number of ram in MiB
443
444   """
445   outputarray = {}
446   vginfo = _GetVGInfo(vgname)
447   outputarray['vg_size'] = vginfo['vg_size']
448   outputarray['vg_free'] = vginfo['vg_free']
449
450   hyper = hypervisor.GetHypervisor(hypervisor_type)
451   hyp_info = hyper.GetNodeInfo()
452   if hyp_info is not None:
453     outputarray.update(hyp_info)
454
455   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
456
457   return outputarray
458
459
460 def VerifyNode(what, cluster_name):
461   """Verify the status of the local node.
462
463   Based on the input L{what} parameter, various checks are done on the
464   local node.
465
466   If the I{filelist} key is present, this list of
467   files is checksummed and the file/checksum pairs are returned.
468
469   If the I{nodelist} key is present, we check that we have
470   connectivity via ssh with the target nodes (and check the hostname
471   report).
472
473   If the I{node-net-test} key is present, we check that we have
474   connectivity to the given nodes via both primary IP and, if
475   applicable, secondary IPs.
476
477   @type what: C{dict}
478   @param what: a dictionary of things to check:
479       - filelist: list of files for which to compute checksums
480       - nodelist: list of nodes we should check ssh communication with
481       - node-net-test: list of nodes we should check node daemon port
482         connectivity with
483       - hypervisor: list with hypervisors to run the verify for
484   @rtype: dict
485   @return: a dictionary with the same keys as the input dict, and
486       values representing the result of the checks
487
488   """
489   result = {}
490   my_name = utils.HostInfo().name
491   port = utils.GetDaemonPort(constants.NODED)
492
493   if constants.NV_HYPERVISOR in what:
494     result[constants.NV_HYPERVISOR] = tmp = {}
495     for hv_name in what[constants.NV_HYPERVISOR]:
496       try:
497         val = hypervisor.GetHypervisor(hv_name).Verify()
498       except errors.HypervisorError, err:
499         val = "Error while checking hypervisor: %s" % str(err)
500       tmp[hv_name] = val
501
502   if constants.NV_FILELIST in what:
503     result[constants.NV_FILELIST] = utils.FingerprintFiles(
504       what[constants.NV_FILELIST])
505
506   if constants.NV_NODELIST in what:
507     result[constants.NV_NODELIST] = tmp = {}
508     random.shuffle(what[constants.NV_NODELIST])
509     for node in what[constants.NV_NODELIST]:
510       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
511       if not success:
512         tmp[node] = message
513
514   if constants.NV_NODENETTEST in what:
515     result[constants.NV_NODENETTEST] = tmp = {}
516     my_pip = my_sip = None
517     for name, pip, sip in what[constants.NV_NODENETTEST]:
518       if name == my_name:
519         my_pip = pip
520         my_sip = sip
521         break
522     if not my_pip:
523       tmp[my_name] = ("Can't find my own primary/secondary IP"
524                       " in the node list")
525     else:
526       for name, pip, sip in what[constants.NV_NODENETTEST]:
527         fail = []
528         if not utils.TcpPing(pip, port, source=my_pip):
529           fail.append("primary")
530         if sip != pip:
531           if not utils.TcpPing(sip, port, source=my_sip):
532             fail.append("secondary")
533         if fail:
534           tmp[name] = ("failure using the %s interface(s)" %
535                        " and ".join(fail))
536
537   if constants.NV_MASTERIP in what:
538     # FIXME: add checks on incoming data structures (here and in the
539     # rest of the function)
540     master_name, master_ip = what[constants.NV_MASTERIP]
541     if master_name == my_name:
542       source = constants.IP4_ADDRESS_LOCALHOST
543     else:
544       source = None
545     result[constants.NV_MASTERIP] = utils.TcpPing(master_ip, port,
546                                                   source=source)
547
548   if constants.NV_LVLIST in what:
549     try:
550       val = GetVolumeList(what[constants.NV_LVLIST])
551     except RPCFail, err:
552       val = str(err)
553     result[constants.NV_LVLIST] = val
554
555   if constants.NV_INSTANCELIST in what:
556     # GetInstanceList can fail
557     try:
558       val = GetInstanceList(what[constants.NV_INSTANCELIST])
559     except RPCFail, err:
560       val = str(err)
561     result[constants.NV_INSTANCELIST] = val
562
563   if constants.NV_VGLIST in what:
564     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
565
566   if constants.NV_PVLIST in what:
567     result[constants.NV_PVLIST] = \
568       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
569                                    filter_allocatable=False)
570
571   if constants.NV_VERSION in what:
572     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
573                                     constants.RELEASE_VERSION)
574
575   if constants.NV_HVINFO in what:
576     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
577     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
578
579   if constants.NV_DRBDLIST in what:
580     try:
581       used_minors = bdev.DRBD8.GetUsedDevs().keys()
582     except errors.BlockDeviceError, err:
583       logging.warning("Can't get used minors list", exc_info=True)
584       used_minors = str(err)
585     result[constants.NV_DRBDLIST] = used_minors
586
587   if constants.NV_DRBDHELPER in what:
588     status = True
589     try:
590       payload = bdev.BaseDRBD.GetUsermodeHelper()
591     except errors.BlockDeviceError, err:
592       logging.error("Can't get DRBD usermode helper: %s", str(err))
593       status = False
594       payload = str(err)
595     result[constants.NV_DRBDHELPER] = (status, payload)
596
597   if constants.NV_NODESETUP in what:
598     result[constants.NV_NODESETUP] = tmpr = []
599     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
600       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
601                   " under /sys, missing required directories /sys/block"
602                   " and /sys/class/net")
603     if (not os.path.isdir("/proc/sys") or
604         not os.path.isfile("/proc/sysrq-trigger")):
605       tmpr.append("The procfs filesystem doesn't seem to be mounted"
606                   " under /proc, missing required directory /proc/sys and"
607                   " the file /proc/sysrq-trigger")
608
609   if constants.NV_TIME in what:
610     result[constants.NV_TIME] = utils.SplitTime(time.time())
611
612   if constants.NV_OSLIST in what:
613     result[constants.NV_OSLIST] = DiagnoseOS()
614
615   return result
616
617
618 def GetVolumeList(vg_name):
619   """Compute list of logical volumes and their size.
620
621   @type vg_name: str
622   @param vg_name: the volume group whose LVs we should list
623   @rtype: dict
624   @return:
625       dictionary of all partions (key) with value being a tuple of
626       their size (in MiB), inactive and online status::
627
628         {'test1': ('20.06', True, True)}
629
630       in case of errors, a string is returned with the error
631       details.
632
633   """
634   lvs = {}
635   sep = '|'
636   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
637                          "--separator=%s" % sep,
638                          "-olv_name,lv_size,lv_attr", vg_name])
639   if result.failed:
640     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
641
642   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
643   for line in result.stdout.splitlines():
644     line = line.strip()
645     match = valid_line_re.match(line)
646     if not match:
647       logging.error("Invalid line returned from lvs output: '%s'", line)
648       continue
649     name, size, attr = match.groups()
650     inactive = attr[4] == '-'
651     online = attr[5] == 'o'
652     virtual = attr[0] == 'v'
653     if virtual:
654       # we don't want to report such volumes as existing, since they
655       # don't really hold data
656       continue
657     lvs[name] = (size, inactive, online)
658
659   return lvs
660
661
662 def ListVolumeGroups():
663   """List the volume groups and their size.
664
665   @rtype: dict
666   @return: dictionary with keys volume name and values the
667       size of the volume
668
669   """
670   return utils.ListVolumeGroups()
671
672
673 def NodeVolumes():
674   """List all volumes on this node.
675
676   @rtype: list
677   @return:
678     A list of dictionaries, each having four keys:
679       - name: the logical volume name,
680       - size: the size of the logical volume
681       - dev: the physical device on which the LV lives
682       - vg: the volume group to which it belongs
683
684     In case of errors, we return an empty list and log the
685     error.
686
687     Note that since a logical volume can live on multiple physical
688     volumes, the resulting list might include a logical volume
689     multiple times.
690
691   """
692   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
693                          "--separator=|",
694                          "--options=lv_name,lv_size,devices,vg_name"])
695   if result.failed:
696     _Fail("Failed to list logical volumes, lvs output: %s",
697           result.output)
698
699   def parse_dev(dev):
700     return dev.split('(')[0]
701
702   def handle_dev(dev):
703     return [parse_dev(x) for x in dev.split(",")]
704
705   def map_line(line):
706     line = [v.strip() for v in line]
707     return [{'name': line[0], 'size': line[1],
708              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
709
710   all_devs = []
711   for line in result.stdout.splitlines():
712     if line.count('|') >= 3:
713       all_devs.extend(map_line(line.split('|')))
714     else:
715       logging.warning("Strange line in the output from lvs: '%s'", line)
716   return all_devs
717
718
719 def BridgesExist(bridges_list):
720   """Check if a list of bridges exist on the current node.
721
722   @rtype: boolean
723   @return: C{True} if all of them exist, C{False} otherwise
724
725   """
726   missing = []
727   for bridge in bridges_list:
728     if not utils.BridgeExists(bridge):
729       missing.append(bridge)
730
731   if missing:
732     _Fail("Missing bridges %s", utils.CommaJoin(missing))
733
734
735 def GetInstanceList(hypervisor_list):
736   """Provides a list of instances.
737
738   @type hypervisor_list: list
739   @param hypervisor_list: the list of hypervisors to query information
740
741   @rtype: list
742   @return: a list of all running instances on the current node
743     - instance1.example.com
744     - instance2.example.com
745
746   """
747   results = []
748   for hname in hypervisor_list:
749     try:
750       names = hypervisor.GetHypervisor(hname).ListInstances()
751       results.extend(names)
752     except errors.HypervisorError, err:
753       _Fail("Error enumerating instances (hypervisor %s): %s",
754             hname, err, exc=True)
755
756   return results
757
758
759 def GetInstanceInfo(instance, hname):
760   """Gives back the information about an instance as a dictionary.
761
762   @type instance: string
763   @param instance: the instance name
764   @type hname: string
765   @param hname: the hypervisor type of the instance
766
767   @rtype: dict
768   @return: dictionary with the following keys:
769       - memory: memory size of instance (int)
770       - state: xen state of instance (string)
771       - time: cpu time of instance (float)
772
773   """
774   output = {}
775
776   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
777   if iinfo is not None:
778     output['memory'] = iinfo[2]
779     output['state'] = iinfo[4]
780     output['time'] = iinfo[5]
781
782   return output
783
784
785 def GetInstanceMigratable(instance):
786   """Gives whether an instance can be migrated.
787
788   @type instance: L{objects.Instance}
789   @param instance: object representing the instance to be checked.
790
791   @rtype: tuple
792   @return: tuple of (result, description) where:
793       - result: whether the instance can be migrated or not
794       - description: a description of the issue, if relevant
795
796   """
797   hyper = hypervisor.GetHypervisor(instance.hypervisor)
798   iname = instance.name
799   if iname not in hyper.ListInstances():
800     _Fail("Instance %s is not running", iname)
801
802   for idx in range(len(instance.disks)):
803     link_name = _GetBlockDevSymlinkPath(iname, idx)
804     if not os.path.islink(link_name):
805       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
806
807
808 def GetAllInstancesInfo(hypervisor_list):
809   """Gather data about all instances.
810
811   This is the equivalent of L{GetInstanceInfo}, except that it
812   computes data for all instances at once, thus being faster if one
813   needs data about more than one instance.
814
815   @type hypervisor_list: list
816   @param hypervisor_list: list of hypervisors to query for instance data
817
818   @rtype: dict
819   @return: dictionary of instance: data, with data having the following keys:
820       - memory: memory size of instance (int)
821       - state: xen state of instance (string)
822       - time: cpu time of instance (float)
823       - vcpus: the number of vcpus
824
825   """
826   output = {}
827
828   for hname in hypervisor_list:
829     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
830     if iinfo:
831       for name, _, memory, vcpus, state, times in iinfo:
832         value = {
833           'memory': memory,
834           'vcpus': vcpus,
835           'state': state,
836           'time': times,
837           }
838         if name in output:
839           # we only check static parameters, like memory and vcpus,
840           # and not state and time which can change between the
841           # invocations of the different hypervisors
842           for key in 'memory', 'vcpus':
843             if value[key] != output[name][key]:
844               _Fail("Instance %s is running twice"
845                     " with different parameters", name)
846         output[name] = value
847
848   return output
849
850
851 def _InstanceLogName(kind, os_name, instance):
852   """Compute the OS log filename for a given instance and operation.
853
854   The instance name and os name are passed in as strings since not all
855   operations have these as part of an instance object.
856
857   @type kind: string
858   @param kind: the operation type (e.g. add, import, etc.)
859   @type os_name: string
860   @param os_name: the os name
861   @type instance: string
862   @param instance: the name of the instance being imported/added/etc.
863
864   """
865   # TODO: Use tempfile.mkstemp to create unique filename
866   base = ("%s-%s-%s-%s.log" %
867           (kind, os_name, instance, utils.TimestampForFilename()))
868   return utils.PathJoin(constants.LOG_OS_DIR, base)
869
870
871 def InstanceOsAdd(instance, reinstall, debug):
872   """Add an OS to an instance.
873
874   @type instance: L{objects.Instance}
875   @param instance: Instance whose OS is to be installed
876   @type reinstall: boolean
877   @param reinstall: whether this is an instance reinstall
878   @type debug: integer
879   @param debug: debug level, passed to the OS scripts
880   @rtype: None
881
882   """
883   inst_os = OSFromDisk(instance.os)
884
885   create_env = OSEnvironment(instance, inst_os, debug)
886   if reinstall:
887     create_env['INSTANCE_REINSTALL'] = "1"
888
889   logfile = _InstanceLogName("add", instance.os, instance.name)
890
891   result = utils.RunCmd([inst_os.create_script], env=create_env,
892                         cwd=inst_os.path, output=logfile,)
893   if result.failed:
894     logging.error("os create command '%s' returned error: %s, logfile: %s,"
895                   " output: %s", result.cmd, result.fail_reason, logfile,
896                   result.output)
897     lines = [utils.SafeEncode(val)
898              for val in utils.TailFile(logfile, lines=20)]
899     _Fail("OS create script failed (%s), last lines in the"
900           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
901
902
903 def RunRenameInstance(instance, old_name, debug):
904   """Run the OS rename script for an instance.
905
906   @type instance: L{objects.Instance}
907   @param instance: Instance whose OS is to be installed
908   @type old_name: string
909   @param old_name: previous instance name
910   @type debug: integer
911   @param debug: debug level, passed to the OS scripts
912   @rtype: boolean
913   @return: the success of the operation
914
915   """
916   inst_os = OSFromDisk(instance.os)
917
918   rename_env = OSEnvironment(instance, inst_os, debug)
919   rename_env['OLD_INSTANCE_NAME'] = old_name
920
921   logfile = _InstanceLogName("rename", instance.os,
922                              "%s-%s" % (old_name, instance.name))
923
924   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
925                         cwd=inst_os.path, output=logfile)
926
927   if result.failed:
928     logging.error("os create command '%s' returned error: %s output: %s",
929                   result.cmd, result.fail_reason, result.output)
930     lines = [utils.SafeEncode(val)
931              for val in utils.TailFile(logfile, lines=20)]
932     _Fail("OS rename script failed (%s), last lines in the"
933           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
934
935
936 def _GetVGInfo(vg_name):
937   """Get information about the volume group.
938
939   @type vg_name: str
940   @param vg_name: the volume group which we query
941   @rtype: dict
942   @return:
943     A dictionary with the following keys:
944       - C{vg_size} is the total size of the volume group in MiB
945       - C{vg_free} is the free size of the volume group in MiB
946       - C{pv_count} are the number of physical disks in that VG
947
948     If an error occurs during gathering of data, we return the same dict
949     with keys all set to None.
950
951   """
952   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
953
954   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
955                          "--nosuffix", "--units=m", "--separator=:", vg_name])
956
957   if retval.failed:
958     logging.error("volume group %s not present", vg_name)
959     return retdic
960   valarr = retval.stdout.strip().rstrip(':').split(':')
961   if len(valarr) == 3:
962     try:
963       retdic = {
964         "vg_size": int(round(float(valarr[0]), 0)),
965         "vg_free": int(round(float(valarr[1]), 0)),
966         "pv_count": int(valarr[2]),
967         }
968     except (TypeError, ValueError), err:
969       logging.exception("Fail to parse vgs output: %s", err)
970   else:
971     logging.error("vgs output has the wrong number of fields (expected"
972                   " three): %s", str(valarr))
973   return retdic
974
975
976 def _GetBlockDevSymlinkPath(instance_name, idx):
977   return utils.PathJoin(constants.DISK_LINKS_DIR,
978                         "%s:%d" % (instance_name, idx))
979
980
981 def _SymlinkBlockDev(instance_name, device_path, idx):
982   """Set up symlinks to a instance's block device.
983
984   This is an auxiliary function run when an instance is start (on the primary
985   node) or when an instance is migrated (on the target node).
986
987
988   @param instance_name: the name of the target instance
989   @param device_path: path of the physical block device, on the node
990   @param idx: the disk index
991   @return: absolute path to the disk's symlink
992
993   """
994   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
995   try:
996     os.symlink(device_path, link_name)
997   except OSError, err:
998     if err.errno == errno.EEXIST:
999       if (not os.path.islink(link_name) or
1000           os.readlink(link_name) != device_path):
1001         os.remove(link_name)
1002         os.symlink(device_path, link_name)
1003     else:
1004       raise
1005
1006   return link_name
1007
1008
1009 def _RemoveBlockDevLinks(instance_name, disks):
1010   """Remove the block device symlinks belonging to the given instance.
1011
1012   """
1013   for idx, _ in enumerate(disks):
1014     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
1015     if os.path.islink(link_name):
1016       try:
1017         os.remove(link_name)
1018       except OSError:
1019         logging.exception("Can't remove symlink '%s'", link_name)
1020
1021
1022 def _GatherAndLinkBlockDevs(instance):
1023   """Set up an instance's block device(s).
1024
1025   This is run on the primary node at instance startup. The block
1026   devices must be already assembled.
1027
1028   @type instance: L{objects.Instance}
1029   @param instance: the instance whose disks we shoul assemble
1030   @rtype: list
1031   @return: list of (disk_object, device_path)
1032
1033   """
1034   block_devices = []
1035   for idx, disk in enumerate(instance.disks):
1036     device = _RecursiveFindBD(disk)
1037     if device is None:
1038       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1039                                     str(disk))
1040     device.Open()
1041     try:
1042       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1043     except OSError, e:
1044       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1045                                     e.strerror)
1046
1047     block_devices.append((disk, link_name))
1048
1049   return block_devices
1050
1051
1052 def StartInstance(instance):
1053   """Start an instance.
1054
1055   @type instance: L{objects.Instance}
1056   @param instance: the instance object
1057   @rtype: None
1058
1059   """
1060   running_instances = GetInstanceList([instance.hypervisor])
1061
1062   if instance.name in running_instances:
1063     logging.info("Instance %s already running, not starting", instance.name)
1064     return
1065
1066   try:
1067     block_devices = _GatherAndLinkBlockDevs(instance)
1068     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1069     hyper.StartInstance(instance, block_devices)
1070   except errors.BlockDeviceError, err:
1071     _Fail("Block device error: %s", err, exc=True)
1072   except errors.HypervisorError, err:
1073     _RemoveBlockDevLinks(instance.name, instance.disks)
1074     _Fail("Hypervisor error: %s", err, exc=True)
1075
1076
1077 def InstanceShutdown(instance, timeout):
1078   """Shut an instance down.
1079
1080   @note: this functions uses polling with a hardcoded timeout.
1081
1082   @type instance: L{objects.Instance}
1083   @param instance: the instance object
1084   @type timeout: integer
1085   @param timeout: maximum timeout for soft shutdown
1086   @rtype: None
1087
1088   """
1089   hv_name = instance.hypervisor
1090   hyper = hypervisor.GetHypervisor(hv_name)
1091   iname = instance.name
1092
1093   if instance.name not in hyper.ListInstances():
1094     logging.info("Instance %s not running, doing nothing", iname)
1095     return
1096
1097   class _TryShutdown:
1098     def __init__(self):
1099       self.tried_once = False
1100
1101     def __call__(self):
1102       if iname not in hyper.ListInstances():
1103         return
1104
1105       try:
1106         hyper.StopInstance(instance, retry=self.tried_once)
1107       except errors.HypervisorError, err:
1108         if iname not in hyper.ListInstances():
1109           # if the instance is no longer existing, consider this a
1110           # success and go to cleanup
1111           return
1112
1113         _Fail("Failed to stop instance %s: %s", iname, err)
1114
1115       self.tried_once = True
1116
1117       raise utils.RetryAgain()
1118
1119   try:
1120     utils.Retry(_TryShutdown(), 5, timeout)
1121   except utils.RetryTimeout:
1122     # the shutdown did not succeed
1123     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1124
1125     try:
1126       hyper.StopInstance(instance, force=True)
1127     except errors.HypervisorError, err:
1128       if iname in hyper.ListInstances():
1129         # only raise an error if the instance still exists, otherwise
1130         # the error could simply be "instance ... unknown"!
1131         _Fail("Failed to force stop instance %s: %s", iname, err)
1132
1133     time.sleep(1)
1134
1135     if iname in hyper.ListInstances():
1136       _Fail("Could not shutdown instance %s even by destroy", iname)
1137
1138   try:
1139     hyper.CleanupInstance(instance.name)
1140   except errors.HypervisorError, err:
1141     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1142
1143   _RemoveBlockDevLinks(iname, instance.disks)
1144
1145
1146 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1147   """Reboot an instance.
1148
1149   @type instance: L{objects.Instance}
1150   @param instance: the instance object to reboot
1151   @type reboot_type: str
1152   @param reboot_type: the type of reboot, one the following
1153     constants:
1154       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1155         instance OS, do not recreate the VM
1156       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1157         restart the VM (at the hypervisor level)
1158       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1159         not accepted here, since that mode is handled differently, in
1160         cmdlib, and translates into full stop and start of the
1161         instance (instead of a call_instance_reboot RPC)
1162   @type shutdown_timeout: integer
1163   @param shutdown_timeout: maximum timeout for soft shutdown
1164   @rtype: None
1165
1166   """
1167   running_instances = GetInstanceList([instance.hypervisor])
1168
1169   if instance.name not in running_instances:
1170     _Fail("Cannot reboot instance %s that is not running", instance.name)
1171
1172   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1173   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1174     try:
1175       hyper.RebootInstance(instance)
1176     except errors.HypervisorError, err:
1177       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1178   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1179     try:
1180       InstanceShutdown(instance, shutdown_timeout)
1181       return StartInstance(instance)
1182     except errors.HypervisorError, err:
1183       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1184   else:
1185     _Fail("Invalid reboot_type received: %s", reboot_type)
1186
1187
1188 def MigrationInfo(instance):
1189   """Gather information about an instance to be migrated.
1190
1191   @type instance: L{objects.Instance}
1192   @param instance: the instance definition
1193
1194   """
1195   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1196   try:
1197     info = hyper.MigrationInfo(instance)
1198   except errors.HypervisorError, err:
1199     _Fail("Failed to fetch migration information: %s", err, exc=True)
1200   return info
1201
1202
1203 def AcceptInstance(instance, info, target):
1204   """Prepare the node to accept an instance.
1205
1206   @type instance: L{objects.Instance}
1207   @param instance: the instance definition
1208   @type info: string/data (opaque)
1209   @param info: migration information, from the source node
1210   @type target: string
1211   @param target: target host (usually ip), on this node
1212
1213   """
1214   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1215   try:
1216     hyper.AcceptInstance(instance, info, target)
1217   except errors.HypervisorError, err:
1218     _Fail("Failed to accept instance: %s", err, exc=True)
1219
1220
1221 def FinalizeMigration(instance, info, success):
1222   """Finalize any preparation to accept an instance.
1223
1224   @type instance: L{objects.Instance}
1225   @param instance: the instance definition
1226   @type info: string/data (opaque)
1227   @param info: migration information, from the source node
1228   @type success: boolean
1229   @param success: whether the migration was a success or a failure
1230
1231   """
1232   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1233   try:
1234     hyper.FinalizeMigration(instance, info, success)
1235   except errors.HypervisorError, err:
1236     _Fail("Failed to finalize migration: %s", err, exc=True)
1237
1238
1239 def MigrateInstance(instance, target, live):
1240   """Migrates an instance to another node.
1241
1242   @type instance: L{objects.Instance}
1243   @param instance: the instance definition
1244   @type target: string
1245   @param target: the target node name
1246   @type live: boolean
1247   @param live: whether the migration should be done live or not (the
1248       interpretation of this parameter is left to the hypervisor)
1249   @rtype: tuple
1250   @return: a tuple of (success, msg) where:
1251       - succes is a boolean denoting the success/failure of the operation
1252       - msg is a string with details in case of failure
1253
1254   """
1255   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1256
1257   try:
1258     hyper.MigrateInstance(instance, target, live)
1259   except errors.HypervisorError, err:
1260     _Fail("Failed to migrate instance: %s", err, exc=True)
1261
1262
1263 def BlockdevCreate(disk, size, owner, on_primary, info):
1264   """Creates a block device for an instance.
1265
1266   @type disk: L{objects.Disk}
1267   @param disk: the object describing the disk we should create
1268   @type size: int
1269   @param size: the size of the physical underlying device, in MiB
1270   @type owner: str
1271   @param owner: the name of the instance for which disk is created,
1272       used for device cache data
1273   @type on_primary: boolean
1274   @param on_primary:  indicates if it is the primary node or not
1275   @type info: string
1276   @param info: string that will be sent to the physical device
1277       creation, used for example to set (LVM) tags on LVs
1278
1279   @return: the new unique_id of the device (this can sometime be
1280       computed only after creation), or None. On secondary nodes,
1281       it's not required to return anything.
1282
1283   """
1284   # TODO: remove the obsolete 'size' argument
1285   # pylint: disable-msg=W0613
1286   clist = []
1287   if disk.children:
1288     for child in disk.children:
1289       try:
1290         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1291       except errors.BlockDeviceError, err:
1292         _Fail("Can't assemble device %s: %s", child, err)
1293       if on_primary or disk.AssembleOnSecondary():
1294         # we need the children open in case the device itself has to
1295         # be assembled
1296         try:
1297           # pylint: disable-msg=E1103
1298           crdev.Open()
1299         except errors.BlockDeviceError, err:
1300           _Fail("Can't make child '%s' read-write: %s", child, err)
1301       clist.append(crdev)
1302
1303   try:
1304     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1305   except errors.BlockDeviceError, err:
1306     _Fail("Can't create block device: %s", err)
1307
1308   if on_primary or disk.AssembleOnSecondary():
1309     try:
1310       device.Assemble()
1311     except errors.BlockDeviceError, err:
1312       _Fail("Can't assemble device after creation, unusual event: %s", err)
1313     device.SetSyncSpeed(constants.SYNC_SPEED)
1314     if on_primary or disk.OpenOnSecondary():
1315       try:
1316         device.Open(force=True)
1317       except errors.BlockDeviceError, err:
1318         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1319     DevCacheManager.UpdateCache(device.dev_path, owner,
1320                                 on_primary, disk.iv_name)
1321
1322   device.SetInfo(info)
1323
1324   return device.unique_id
1325
1326
1327 def BlockdevRemove(disk):
1328   """Remove a block device.
1329
1330   @note: This is intended to be called recursively.
1331
1332   @type disk: L{objects.Disk}
1333   @param disk: the disk object we should remove
1334   @rtype: boolean
1335   @return: the success of the operation
1336
1337   """
1338   msgs = []
1339   try:
1340     rdev = _RecursiveFindBD(disk)
1341   except errors.BlockDeviceError, err:
1342     # probably can't attach
1343     logging.info("Can't attach to device %s in remove", disk)
1344     rdev = None
1345   if rdev is not None:
1346     r_path = rdev.dev_path
1347     try:
1348       rdev.Remove()
1349     except errors.BlockDeviceError, err:
1350       msgs.append(str(err))
1351     if not msgs:
1352       DevCacheManager.RemoveCache(r_path)
1353
1354   if disk.children:
1355     for child in disk.children:
1356       try:
1357         BlockdevRemove(child)
1358       except RPCFail, err:
1359         msgs.append(str(err))
1360
1361   if msgs:
1362     _Fail("; ".join(msgs))
1363
1364
1365 def _RecursiveAssembleBD(disk, owner, as_primary):
1366   """Activate a block device for an instance.
1367
1368   This is run on the primary and secondary nodes for an instance.
1369
1370   @note: this function is called recursively.
1371
1372   @type disk: L{objects.Disk}
1373   @param disk: the disk we try to assemble
1374   @type owner: str
1375   @param owner: the name of the instance which owns the disk
1376   @type as_primary: boolean
1377   @param as_primary: if we should make the block device
1378       read/write
1379
1380   @return: the assembled device or None (in case no device
1381       was assembled)
1382   @raise errors.BlockDeviceError: in case there is an error
1383       during the activation of the children or the device
1384       itself
1385
1386   """
1387   children = []
1388   if disk.children:
1389     mcn = disk.ChildrenNeeded()
1390     if mcn == -1:
1391       mcn = 0 # max number of Nones allowed
1392     else:
1393       mcn = len(disk.children) - mcn # max number of Nones
1394     for chld_disk in disk.children:
1395       try:
1396         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1397       except errors.BlockDeviceError, err:
1398         if children.count(None) >= mcn:
1399           raise
1400         cdev = None
1401         logging.error("Error in child activation (but continuing): %s",
1402                       str(err))
1403       children.append(cdev)
1404
1405   if as_primary or disk.AssembleOnSecondary():
1406     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1407     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1408     result = r_dev
1409     if as_primary or disk.OpenOnSecondary():
1410       r_dev.Open()
1411     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1412                                 as_primary, disk.iv_name)
1413
1414   else:
1415     result = True
1416   return result
1417
1418
1419 def BlockdevAssemble(disk, owner, as_primary):
1420   """Activate a block device for an instance.
1421
1422   This is a wrapper over _RecursiveAssembleBD.
1423
1424   @rtype: str or boolean
1425   @return: a C{/dev/...} path for primary nodes, and
1426       C{True} for secondary nodes
1427
1428   """
1429   try:
1430     result = _RecursiveAssembleBD(disk, owner, as_primary)
1431     if isinstance(result, bdev.BlockDev):
1432       # pylint: disable-msg=E1103
1433       result = result.dev_path
1434   except errors.BlockDeviceError, err:
1435     _Fail("Error while assembling disk: %s", err, exc=True)
1436
1437   return result
1438
1439
1440 def BlockdevShutdown(disk):
1441   """Shut down a block device.
1442
1443   First, if the device is assembled (Attach() is successful), then
1444   the device is shutdown. Then the children of the device are
1445   shutdown.
1446
1447   This function is called recursively. Note that we don't cache the
1448   children or such, as oppossed to assemble, shutdown of different
1449   devices doesn't require that the upper device was active.
1450
1451   @type disk: L{objects.Disk}
1452   @param disk: the description of the disk we should
1453       shutdown
1454   @rtype: None
1455
1456   """
1457   msgs = []
1458   r_dev = _RecursiveFindBD(disk)
1459   if r_dev is not None:
1460     r_path = r_dev.dev_path
1461     try:
1462       r_dev.Shutdown()
1463       DevCacheManager.RemoveCache(r_path)
1464     except errors.BlockDeviceError, err:
1465       msgs.append(str(err))
1466
1467   if disk.children:
1468     for child in disk.children:
1469       try:
1470         BlockdevShutdown(child)
1471       except RPCFail, err:
1472         msgs.append(str(err))
1473
1474   if msgs:
1475     _Fail("; ".join(msgs))
1476
1477
1478 def BlockdevAddchildren(parent_cdev, new_cdevs):
1479   """Extend a mirrored block device.
1480
1481   @type parent_cdev: L{objects.Disk}
1482   @param parent_cdev: the disk to which we should add children
1483   @type new_cdevs: list of L{objects.Disk}
1484   @param new_cdevs: the list of children which we should add
1485   @rtype: None
1486
1487   """
1488   parent_bdev = _RecursiveFindBD(parent_cdev)
1489   if parent_bdev is None:
1490     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1491   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1492   if new_bdevs.count(None) > 0:
1493     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1494   parent_bdev.AddChildren(new_bdevs)
1495
1496
1497 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1498   """Shrink a mirrored block device.
1499
1500   @type parent_cdev: L{objects.Disk}
1501   @param parent_cdev: the disk from which we should remove children
1502   @type new_cdevs: list of L{objects.Disk}
1503   @param new_cdevs: the list of children which we should remove
1504   @rtype: None
1505
1506   """
1507   parent_bdev = _RecursiveFindBD(parent_cdev)
1508   if parent_bdev is None:
1509     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1510   devs = []
1511   for disk in new_cdevs:
1512     rpath = disk.StaticDevPath()
1513     if rpath is None:
1514       bd = _RecursiveFindBD(disk)
1515       if bd is None:
1516         _Fail("Can't find device %s while removing children", disk)
1517       else:
1518         devs.append(bd.dev_path)
1519     else:
1520       if not utils.IsNormAbsPath(rpath):
1521         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1522       devs.append(rpath)
1523   parent_bdev.RemoveChildren(devs)
1524
1525
1526 def BlockdevGetmirrorstatus(disks):
1527   """Get the mirroring status of a list of devices.
1528
1529   @type disks: list of L{objects.Disk}
1530   @param disks: the list of disks which we should query
1531   @rtype: disk
1532   @return:
1533       a list of (mirror_done, estimated_time) tuples, which
1534       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1535   @raise errors.BlockDeviceError: if any of the disks cannot be
1536       found
1537
1538   """
1539   stats = []
1540   for dsk in disks:
1541     rbd = _RecursiveFindBD(dsk)
1542     if rbd is None:
1543       _Fail("Can't find device %s", dsk)
1544
1545     stats.append(rbd.CombinedSyncStatus())
1546
1547   return stats
1548
1549
1550 def _RecursiveFindBD(disk):
1551   """Check if a device is activated.
1552
1553   If so, return information about the real device.
1554
1555   @type disk: L{objects.Disk}
1556   @param disk: the disk object we need to find
1557
1558   @return: None if the device can't be found,
1559       otherwise the device instance
1560
1561   """
1562   children = []
1563   if disk.children:
1564     for chdisk in disk.children:
1565       children.append(_RecursiveFindBD(chdisk))
1566
1567   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1568
1569
1570 def _OpenRealBD(disk):
1571   """Opens the underlying block device of a disk.
1572
1573   @type disk: L{objects.Disk}
1574   @param disk: the disk object we want to open
1575
1576   """
1577   real_disk = _RecursiveFindBD(disk)
1578   if real_disk is None:
1579     _Fail("Block device '%s' is not set up", disk)
1580
1581   real_disk.Open()
1582
1583   return real_disk
1584
1585
1586 def BlockdevFind(disk):
1587   """Check if a device is activated.
1588
1589   If it is, return information about the real device.
1590
1591   @type disk: L{objects.Disk}
1592   @param disk: the disk to find
1593   @rtype: None or objects.BlockDevStatus
1594   @return: None if the disk cannot be found, otherwise a the current
1595            information
1596
1597   """
1598   try:
1599     rbd = _RecursiveFindBD(disk)
1600   except errors.BlockDeviceError, err:
1601     _Fail("Failed to find device: %s", err, exc=True)
1602
1603   if rbd is None:
1604     return None
1605
1606   return rbd.GetSyncStatus()
1607
1608
1609 def BlockdevGetsize(disks):
1610   """Computes the size of the given disks.
1611
1612   If a disk is not found, returns None instead.
1613
1614   @type disks: list of L{objects.Disk}
1615   @param disks: the list of disk to compute the size for
1616   @rtype: list
1617   @return: list with elements None if the disk cannot be found,
1618       otherwise the size
1619
1620   """
1621   result = []
1622   for cf in disks:
1623     try:
1624       rbd = _RecursiveFindBD(cf)
1625     except errors.BlockDeviceError:
1626       result.append(None)
1627       continue
1628     if rbd is None:
1629       result.append(None)
1630     else:
1631       result.append(rbd.GetActualSize())
1632   return result
1633
1634
1635 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1636   """Export a block device to a remote node.
1637
1638   @type disk: L{objects.Disk}
1639   @param disk: the description of the disk to export
1640   @type dest_node: str
1641   @param dest_node: the destination node to export to
1642   @type dest_path: str
1643   @param dest_path: the destination path on the target node
1644   @type cluster_name: str
1645   @param cluster_name: the cluster name, needed for SSH hostalias
1646   @rtype: None
1647
1648   """
1649   real_disk = _OpenRealBD(disk)
1650
1651   # the block size on the read dd is 1MiB to match our units
1652   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1653                                "dd if=%s bs=1048576 count=%s",
1654                                real_disk.dev_path, str(disk.size))
1655
1656   # we set here a smaller block size as, due to ssh buffering, more
1657   # than 64-128k will mostly ignored; we use nocreat to fail if the
1658   # device is not already there or we pass a wrong path; we use
1659   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1660   # to not buffer too much memory; this means that at best, we flush
1661   # every 64k, which will not be very fast
1662   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1663                                 " oflag=dsync", dest_path)
1664
1665   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1666                                                    constants.GANETI_RUNAS,
1667                                                    destcmd)
1668
1669   # all commands have been checked, so we're safe to combine them
1670   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1671
1672   result = utils.RunCmd(["bash", "-c", command])
1673
1674   if result.failed:
1675     _Fail("Disk copy command '%s' returned error: %s"
1676           " output: %s", command, result.fail_reason, result.output)
1677
1678
1679 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1680   """Write a file to the filesystem.
1681
1682   This allows the master to overwrite(!) a file. It will only perform
1683   the operation if the file belongs to a list of configuration files.
1684
1685   @type file_name: str
1686   @param file_name: the target file name
1687   @type data: str
1688   @param data: the new contents of the file
1689   @type mode: int
1690   @param mode: the mode to give the file (can be None)
1691   @type uid: int
1692   @param uid: the owner of the file (can be -1 for default)
1693   @type gid: int
1694   @param gid: the group of the file (can be -1 for default)
1695   @type atime: float
1696   @param atime: the atime to set on the file (can be None)
1697   @type mtime: float
1698   @param mtime: the mtime to set on the file (can be None)
1699   @rtype: None
1700
1701   """
1702   if not os.path.isabs(file_name):
1703     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1704
1705   if file_name not in _ALLOWED_UPLOAD_FILES:
1706     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1707           file_name)
1708
1709   raw_data = _Decompress(data)
1710
1711   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1712                   atime=atime, mtime=mtime)
1713
1714
1715 def WriteSsconfFiles(values):
1716   """Update all ssconf files.
1717
1718   Wrapper around the SimpleStore.WriteFiles.
1719
1720   """
1721   ssconf.SimpleStore().WriteFiles(values)
1722
1723
1724 def _ErrnoOrStr(err):
1725   """Format an EnvironmentError exception.
1726
1727   If the L{err} argument has an errno attribute, it will be looked up
1728   and converted into a textual C{E...} description. Otherwise the
1729   string representation of the error will be returned.
1730
1731   @type err: L{EnvironmentError}
1732   @param err: the exception to format
1733
1734   """
1735   if hasattr(err, 'errno'):
1736     detail = errno.errorcode[err.errno]
1737   else:
1738     detail = str(err)
1739   return detail
1740
1741
1742 def _OSOndiskAPIVersion(os_dir):
1743   """Compute and return the API version of a given OS.
1744
1745   This function will try to read the API version of the OS residing in
1746   the 'os_dir' directory.
1747
1748   @type os_dir: str
1749   @param os_dir: the directory in which we should look for the OS
1750   @rtype: tuple
1751   @return: tuple (status, data) with status denoting the validity and
1752       data holding either the vaid versions or an error message
1753
1754   """
1755   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1756
1757   try:
1758     st = os.stat(api_file)
1759   except EnvironmentError, err:
1760     return False, ("Required file '%s' not found under path %s: %s" %
1761                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1762
1763   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1764     return False, ("File '%s' in %s is not a regular file" %
1765                    (constants.OS_API_FILE, os_dir))
1766
1767   try:
1768     api_versions = utils.ReadFile(api_file).splitlines()
1769   except EnvironmentError, err:
1770     return False, ("Error while reading the API version file at %s: %s" %
1771                    (api_file, _ErrnoOrStr(err)))
1772
1773   try:
1774     api_versions = [int(version.strip()) for version in api_versions]
1775   except (TypeError, ValueError), err:
1776     return False, ("API version(s) can't be converted to integer: %s" %
1777                    str(err))
1778
1779   return True, api_versions
1780
1781
1782 def DiagnoseOS(top_dirs=None):
1783   """Compute the validity for all OSes.
1784
1785   @type top_dirs: list
1786   @param top_dirs: the list of directories in which to
1787       search (if not given defaults to
1788       L{constants.OS_SEARCH_PATH})
1789   @rtype: list of L{objects.OS}
1790   @return: a list of tuples (name, path, status, diagnose, variants,
1791       parameters, api_version) for all (potential) OSes under all
1792       search paths, where:
1793           - name is the (potential) OS name
1794           - path is the full path to the OS
1795           - status True/False is the validity of the OS
1796           - diagnose is the error message for an invalid OS, otherwise empty
1797           - variants is a list of supported OS variants, if any
1798           - parameters is a list of (name, help) parameters, if any
1799           - api_version is a list of support OS API versions
1800
1801   """
1802   if top_dirs is None:
1803     top_dirs = constants.OS_SEARCH_PATH
1804
1805   result = []
1806   for dir_name in top_dirs:
1807     if os.path.isdir(dir_name):
1808       try:
1809         f_names = utils.ListVisibleFiles(dir_name)
1810       except EnvironmentError, err:
1811         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1812         break
1813       for name in f_names:
1814         os_path = utils.PathJoin(dir_name, name)
1815         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1816         if status:
1817           diagnose = ""
1818           variants = os_inst.supported_variants
1819           parameters = os_inst.supported_parameters
1820           api_versions = os_inst.api_versions
1821         else:
1822           diagnose = os_inst
1823           variants = parameters = api_versions = []
1824         result.append((name, os_path, status, diagnose, variants,
1825                        parameters, api_versions))
1826
1827   return result
1828
1829
1830 def _TryOSFromDisk(name, base_dir=None):
1831   """Create an OS instance from disk.
1832
1833   This function will return an OS instance if the given name is a
1834   valid OS name.
1835
1836   @type base_dir: string
1837   @keyword base_dir: Base directory containing OS installations.
1838                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1839   @rtype: tuple
1840   @return: success and either the OS instance if we find a valid one,
1841       or error message
1842
1843   """
1844   if base_dir is None:
1845     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1846   else:
1847     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1848
1849   if os_dir is None:
1850     return False, "Directory for OS %s not found in search path" % name
1851
1852   status, api_versions = _OSOndiskAPIVersion(os_dir)
1853   if not status:
1854     # push the error up
1855     return status, api_versions
1856
1857   if not constants.OS_API_VERSIONS.intersection(api_versions):
1858     return False, ("API version mismatch for path '%s': found %s, want %s." %
1859                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1860
1861   # OS Files dictionary, we will populate it with the absolute path names
1862   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1863
1864   if max(api_versions) >= constants.OS_API_V15:
1865     os_files[constants.OS_VARIANTS_FILE] = ''
1866
1867   if max(api_versions) >= constants.OS_API_V20:
1868     os_files[constants.OS_PARAMETERS_FILE] = ''
1869   else:
1870     del os_files[constants.OS_SCRIPT_VERIFY]
1871
1872   for filename in os_files:
1873     os_files[filename] = utils.PathJoin(os_dir, filename)
1874
1875     try:
1876       st = os.stat(os_files[filename])
1877     except EnvironmentError, err:
1878       return False, ("File '%s' under path '%s' is missing (%s)" %
1879                      (filename, os_dir, _ErrnoOrStr(err)))
1880
1881     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1882       return False, ("File '%s' under path '%s' is not a regular file" %
1883                      (filename, os_dir))
1884
1885     if filename in constants.OS_SCRIPTS:
1886       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1887         return False, ("File '%s' under path '%s' is not executable" %
1888                        (filename, os_dir))
1889
1890   variants = []
1891   if constants.OS_VARIANTS_FILE in os_files:
1892     variants_file = os_files[constants.OS_VARIANTS_FILE]
1893     try:
1894       variants = utils.ReadFile(variants_file).splitlines()
1895     except EnvironmentError, err:
1896       return False, ("Error while reading the OS variants file at %s: %s" %
1897                      (variants_file, _ErrnoOrStr(err)))
1898     if not variants:
1899       return False, ("No supported os variant found")
1900
1901   parameters = []
1902   if constants.OS_PARAMETERS_FILE in os_files:
1903     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1904     try:
1905       parameters = utils.ReadFile(parameters_file).splitlines()
1906     except EnvironmentError, err:
1907       return False, ("Error while reading the OS parameters file at %s: %s" %
1908                      (parameters_file, _ErrnoOrStr(err)))
1909     parameters = [v.split(None, 1) for v in parameters]
1910
1911   os_obj = objects.OS(name=name, path=os_dir,
1912                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1913                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1914                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1915                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1916                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1917                                                  None),
1918                       supported_variants=variants,
1919                       supported_parameters=parameters,
1920                       api_versions=api_versions)
1921   return True, os_obj
1922
1923
1924 def OSFromDisk(name, base_dir=None):
1925   """Create an OS instance from disk.
1926
1927   This function will return an OS instance if the given name is a
1928   valid OS name. Otherwise, it will raise an appropriate
1929   L{RPCFail} exception, detailing why this is not a valid OS.
1930
1931   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1932   an exception but returns true/false status data.
1933
1934   @type base_dir: string
1935   @keyword base_dir: Base directory containing OS installations.
1936                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1937   @rtype: L{objects.OS}
1938   @return: the OS instance if we find a valid one
1939   @raise RPCFail: if we don't find a valid OS
1940
1941   """
1942   name_only = name.split("+", 1)[0]
1943   status, payload = _TryOSFromDisk(name_only, base_dir)
1944
1945   if not status:
1946     _Fail(payload)
1947
1948   return payload
1949
1950
1951 def OSCoreEnv(inst_os, os_params, debug=0):
1952   """Calculate the basic environment for an os script.
1953
1954   @type inst_os: L{objects.OS}
1955   @param inst_os: operating system for which the environment is being built
1956   @type os_params: dict
1957   @param os_params: the OS parameters
1958   @type debug: integer
1959   @param debug: debug level (0 or 1, for OS Api 10)
1960   @rtype: dict
1961   @return: dict of environment variables
1962   @raise errors.BlockDeviceError: if the block device
1963       cannot be found
1964
1965   """
1966   result = {}
1967   api_version = \
1968     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1969   result['OS_API_VERSION'] = '%d' % api_version
1970   result['OS_NAME'] = inst_os.name
1971   result['DEBUG_LEVEL'] = '%d' % debug
1972
1973   # OS variants
1974   if api_version >= constants.OS_API_V15:
1975     try:
1976       variant = inst_os.name.split('+', 1)[1]
1977     except IndexError:
1978       variant = inst_os.supported_variants[0]
1979     result['OS_VARIANT'] = variant
1980
1981   # OS params
1982   for pname, pvalue in os_params.items():
1983     result['OSP_%s' % pname.upper()] = pvalue
1984
1985   return result
1986
1987
1988 def OSEnvironment(instance, inst_os, debug=0):
1989   """Calculate the environment for an os script.
1990
1991   @type instance: L{objects.Instance}
1992   @param instance: target instance for the os script run
1993   @type inst_os: L{objects.OS}
1994   @param inst_os: operating system for which the environment is being built
1995   @type debug: integer
1996   @param debug: debug level (0 or 1, for OS Api 10)
1997   @rtype: dict
1998   @return: dict of environment variables
1999   @raise errors.BlockDeviceError: if the block device
2000       cannot be found
2001
2002   """
2003   result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
2004
2005   result['INSTANCE_NAME'] = instance.name
2006   result['INSTANCE_OS'] = instance.os
2007   result['HYPERVISOR'] = instance.hypervisor
2008   result['DISK_COUNT'] = '%d' % len(instance.disks)
2009   result['NIC_COUNT'] = '%d' % len(instance.nics)
2010
2011   # Disks
2012   for idx, disk in enumerate(instance.disks):
2013     real_disk = _OpenRealBD(disk)
2014     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2015     result['DISK_%d_ACCESS' % idx] = disk.mode
2016     if constants.HV_DISK_TYPE in instance.hvparams:
2017       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2018         instance.hvparams[constants.HV_DISK_TYPE]
2019     if disk.dev_type in constants.LDS_BLOCK:
2020       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2021     elif disk.dev_type == constants.LD_FILE:
2022       result['DISK_%d_BACKEND_TYPE' % idx] = \
2023         'file:%s' % disk.physical_id[0]
2024
2025   # NICs
2026   for idx, nic in enumerate(instance.nics):
2027     result['NIC_%d_MAC' % idx] = nic.mac
2028     if nic.ip:
2029       result['NIC_%d_IP' % idx] = nic.ip
2030     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2031     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2032       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2033     if nic.nicparams[constants.NIC_LINK]:
2034       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2035     if constants.HV_NIC_TYPE in instance.hvparams:
2036       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2037         instance.hvparams[constants.HV_NIC_TYPE]
2038
2039   # HV/BE params
2040   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2041     for key, value in source.items():
2042       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2043
2044   return result
2045
2046
2047 def BlockdevGrow(disk, amount):
2048   """Grow a stack of block devices.
2049
2050   This function is called recursively, with the childrens being the
2051   first ones to resize.
2052
2053   @type disk: L{objects.Disk}
2054   @param disk: the disk to be grown
2055   @rtype: (status, result)
2056   @return: a tuple with the status of the operation
2057       (True/False), and the errors message if status
2058       is False
2059
2060   """
2061   r_dev = _RecursiveFindBD(disk)
2062   if r_dev is None:
2063     _Fail("Cannot find block device %s", disk)
2064
2065   try:
2066     r_dev.Grow(amount)
2067   except errors.BlockDeviceError, err:
2068     _Fail("Failed to grow block device: %s", err, exc=True)
2069
2070
2071 def BlockdevSnapshot(disk):
2072   """Create a snapshot copy of a block device.
2073
2074   This function is called recursively, and the snapshot is actually created
2075   just for the leaf lvm backend device.
2076
2077   @type disk: L{objects.Disk}
2078   @param disk: the disk to be snapshotted
2079   @rtype: string
2080   @return: snapshot disk path
2081
2082   """
2083   if disk.dev_type == constants.LD_DRBD8:
2084     if not disk.children:
2085       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2086             disk.unique_id)
2087     return BlockdevSnapshot(disk.children[0])
2088   elif disk.dev_type == constants.LD_LV:
2089     r_dev = _RecursiveFindBD(disk)
2090     if r_dev is not None:
2091       # FIXME: choose a saner value for the snapshot size
2092       # let's stay on the safe side and ask for the full size, for now
2093       return r_dev.Snapshot(disk.size)
2094     else:
2095       _Fail("Cannot find block device %s", disk)
2096   else:
2097     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2098           disk.unique_id, disk.dev_type)
2099
2100
2101 def FinalizeExport(instance, snap_disks):
2102   """Write out the export configuration information.
2103
2104   @type instance: L{objects.Instance}
2105   @param instance: the instance which we export, used for
2106       saving configuration
2107   @type snap_disks: list of L{objects.Disk}
2108   @param snap_disks: list of snapshot block devices, which
2109       will be used to get the actual name of the dump file
2110
2111   @rtype: None
2112
2113   """
2114   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2115   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2116
2117   config = objects.SerializableConfigParser()
2118
2119   config.add_section(constants.INISECT_EXP)
2120   config.set(constants.INISECT_EXP, 'version', '0')
2121   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2122   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2123   config.set(constants.INISECT_EXP, 'os', instance.os)
2124   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2125
2126   config.add_section(constants.INISECT_INS)
2127   config.set(constants.INISECT_INS, 'name', instance.name)
2128   config.set(constants.INISECT_INS, 'memory', '%d' %
2129              instance.beparams[constants.BE_MEMORY])
2130   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2131              instance.beparams[constants.BE_VCPUS])
2132   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2133   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2134
2135   nic_total = 0
2136   for nic_count, nic in enumerate(instance.nics):
2137     nic_total += 1
2138     config.set(constants.INISECT_INS, 'nic%d_mac' %
2139                nic_count, '%s' % nic.mac)
2140     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2141     for param in constants.NICS_PARAMETER_TYPES:
2142       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2143                  '%s' % nic.nicparams.get(param, None))
2144   # TODO: redundant: on load can read nics until it doesn't exist
2145   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2146
2147   disk_total = 0
2148   for disk_count, disk in enumerate(snap_disks):
2149     if disk:
2150       disk_total += 1
2151       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2152                  ('%s' % disk.iv_name))
2153       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2154                  ('%s' % disk.physical_id[1]))
2155       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2156                  ('%d' % disk.size))
2157
2158   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2159
2160   # New-style hypervisor/backend parameters
2161
2162   config.add_section(constants.INISECT_HYP)
2163   for name, value in instance.hvparams.items():
2164     if name not in constants.HVC_GLOBALS:
2165       config.set(constants.INISECT_HYP, name, str(value))
2166
2167   config.add_section(constants.INISECT_BEP)
2168   for name, value in instance.beparams.items():
2169     config.set(constants.INISECT_BEP, name, str(value))
2170
2171   config.add_section(constants.INISECT_OSP)
2172   for name, value in instance.osparams.items():
2173     config.set(constants.INISECT_OSP, name, str(value))
2174
2175   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2176                   data=config.Dumps())
2177   shutil.rmtree(finaldestdir, ignore_errors=True)
2178   shutil.move(destdir, finaldestdir)
2179
2180
2181 def ExportInfo(dest):
2182   """Get export configuration information.
2183
2184   @type dest: str
2185   @param dest: directory containing the export
2186
2187   @rtype: L{objects.SerializableConfigParser}
2188   @return: a serializable config file containing the
2189       export info
2190
2191   """
2192   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2193
2194   config = objects.SerializableConfigParser()
2195   config.read(cff)
2196
2197   if (not config.has_section(constants.INISECT_EXP) or
2198       not config.has_section(constants.INISECT_INS)):
2199     _Fail("Export info file doesn't have the required fields")
2200
2201   return config.Dumps()
2202
2203
2204 def ListExports():
2205   """Return a list of exports currently available on this machine.
2206
2207   @rtype: list
2208   @return: list of the exports
2209
2210   """
2211   if os.path.isdir(constants.EXPORT_DIR):
2212     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2213   else:
2214     _Fail("No exports directory")
2215
2216
2217 def RemoveExport(export):
2218   """Remove an existing export from the node.
2219
2220   @type export: str
2221   @param export: the name of the export to remove
2222   @rtype: None
2223
2224   """
2225   target = utils.PathJoin(constants.EXPORT_DIR, export)
2226
2227   try:
2228     shutil.rmtree(target)
2229   except EnvironmentError, err:
2230     _Fail("Error while removing the export: %s", err, exc=True)
2231
2232
2233 def BlockdevRename(devlist):
2234   """Rename a list of block devices.
2235
2236   @type devlist: list of tuples
2237   @param devlist: list of tuples of the form  (disk,
2238       new_logical_id, new_physical_id); disk is an
2239       L{objects.Disk} object describing the current disk,
2240       and new logical_id/physical_id is the name we
2241       rename it to
2242   @rtype: boolean
2243   @return: True if all renames succeeded, False otherwise
2244
2245   """
2246   msgs = []
2247   result = True
2248   for disk, unique_id in devlist:
2249     dev = _RecursiveFindBD(disk)
2250     if dev is None:
2251       msgs.append("Can't find device %s in rename" % str(disk))
2252       result = False
2253       continue
2254     try:
2255       old_rpath = dev.dev_path
2256       dev.Rename(unique_id)
2257       new_rpath = dev.dev_path
2258       if old_rpath != new_rpath:
2259         DevCacheManager.RemoveCache(old_rpath)
2260         # FIXME: we should add the new cache information here, like:
2261         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2262         # but we don't have the owner here - maybe parse from existing
2263         # cache? for now, we only lose lvm data when we rename, which
2264         # is less critical than DRBD or MD
2265     except errors.BlockDeviceError, err:
2266       msgs.append("Can't rename device '%s' to '%s': %s" %
2267                   (dev, unique_id, err))
2268       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2269       result = False
2270   if not result:
2271     _Fail("; ".join(msgs))
2272
2273
2274 def _TransformFileStorageDir(file_storage_dir):
2275   """Checks whether given file_storage_dir is valid.
2276
2277   Checks wheter the given file_storage_dir is within the cluster-wide
2278   default file_storage_dir stored in SimpleStore. Only paths under that
2279   directory are allowed.
2280
2281   @type file_storage_dir: str
2282   @param file_storage_dir: the path to check
2283
2284   @return: the normalized path if valid, None otherwise
2285
2286   """
2287   if not constants.ENABLE_FILE_STORAGE:
2288     _Fail("File storage disabled at configure time")
2289   cfg = _GetConfig()
2290   file_storage_dir = os.path.normpath(file_storage_dir)
2291   base_file_storage_dir = cfg.GetFileStorageDir()
2292   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2293       base_file_storage_dir):
2294     _Fail("File storage directory '%s' is not under base file"
2295           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2296   return file_storage_dir
2297
2298
2299 def CreateFileStorageDir(file_storage_dir):
2300   """Create file storage directory.
2301
2302   @type file_storage_dir: str
2303   @param file_storage_dir: directory to create
2304
2305   @rtype: tuple
2306   @return: tuple with first element a boolean indicating wheter dir
2307       creation was successful or not
2308
2309   """
2310   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2311   if os.path.exists(file_storage_dir):
2312     if not os.path.isdir(file_storage_dir):
2313       _Fail("Specified storage dir '%s' is not a directory",
2314             file_storage_dir)
2315   else:
2316     try:
2317       os.makedirs(file_storage_dir, 0750)
2318     except OSError, err:
2319       _Fail("Cannot create file storage directory '%s': %s",
2320             file_storage_dir, err, exc=True)
2321
2322
2323 def RemoveFileStorageDir(file_storage_dir):
2324   """Remove file storage directory.
2325
2326   Remove it only if it's empty. If not log an error and return.
2327
2328   @type file_storage_dir: str
2329   @param file_storage_dir: the directory we should cleanup
2330   @rtype: tuple (success,)
2331   @return: tuple of one element, C{success}, denoting
2332       whether the operation was successful
2333
2334   """
2335   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2336   if os.path.exists(file_storage_dir):
2337     if not os.path.isdir(file_storage_dir):
2338       _Fail("Specified Storage directory '%s' is not a directory",
2339             file_storage_dir)
2340     # deletes dir only if empty, otherwise we want to fail the rpc call
2341     try:
2342       os.rmdir(file_storage_dir)
2343     except OSError, err:
2344       _Fail("Cannot remove file storage directory '%s': %s",
2345             file_storage_dir, err)
2346
2347
2348 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2349   """Rename the file storage directory.
2350
2351   @type old_file_storage_dir: str
2352   @param old_file_storage_dir: the current path
2353   @type new_file_storage_dir: str
2354   @param new_file_storage_dir: the name we should rename to
2355   @rtype: tuple (success,)
2356   @return: tuple of one element, C{success}, denoting
2357       whether the operation was successful
2358
2359   """
2360   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2361   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2362   if not os.path.exists(new_file_storage_dir):
2363     if os.path.isdir(old_file_storage_dir):
2364       try:
2365         os.rename(old_file_storage_dir, new_file_storage_dir)
2366       except OSError, err:
2367         _Fail("Cannot rename '%s' to '%s': %s",
2368               old_file_storage_dir, new_file_storage_dir, err)
2369     else:
2370       _Fail("Specified storage dir '%s' is not a directory",
2371             old_file_storage_dir)
2372   else:
2373     if os.path.exists(old_file_storage_dir):
2374       _Fail("Cannot rename '%s' to '%s': both locations exist",
2375             old_file_storage_dir, new_file_storage_dir)
2376
2377
2378 def _EnsureJobQueueFile(file_name):
2379   """Checks whether the given filename is in the queue directory.
2380
2381   @type file_name: str
2382   @param file_name: the file name we should check
2383   @rtype: None
2384   @raises RPCFail: if the file is not valid
2385
2386   """
2387   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2388   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2389
2390   if not result:
2391     _Fail("Passed job queue file '%s' does not belong to"
2392           " the queue directory '%s'", file_name, queue_dir)
2393
2394
2395 def JobQueueUpdate(file_name, content):
2396   """Updates a file in the queue directory.
2397
2398   This is just a wrapper over L{utils.WriteFile}, with proper
2399   checking.
2400
2401   @type file_name: str
2402   @param file_name: the job file name
2403   @type content: str
2404   @param content: the new job contents
2405   @rtype: boolean
2406   @return: the success of the operation
2407
2408   """
2409   _EnsureJobQueueFile(file_name)
2410
2411   # Write and replace the file atomically
2412   utils.WriteFile(file_name, data=_Decompress(content))
2413
2414
2415 def JobQueueRename(old, new):
2416   """Renames a job queue file.
2417
2418   This is just a wrapper over os.rename with proper checking.
2419
2420   @type old: str
2421   @param old: the old (actual) file name
2422   @type new: str
2423   @param new: the desired file name
2424   @rtype: tuple
2425   @return: the success of the operation and payload
2426
2427   """
2428   _EnsureJobQueueFile(old)
2429   _EnsureJobQueueFile(new)
2430
2431   utils.RenameFile(old, new, mkdir=True)
2432
2433
2434 def BlockdevClose(instance_name, disks):
2435   """Closes the given block devices.
2436
2437   This means they will be switched to secondary mode (in case of
2438   DRBD).
2439
2440   @param instance_name: if the argument is not empty, the symlinks
2441       of this instance will be removed
2442   @type disks: list of L{objects.Disk}
2443   @param disks: the list of disks to be closed
2444   @rtype: tuple (success, message)
2445   @return: a tuple of success and message, where success
2446       indicates the succes of the operation, and message
2447       which will contain the error details in case we
2448       failed
2449
2450   """
2451   bdevs = []
2452   for cf in disks:
2453     rd = _RecursiveFindBD(cf)
2454     if rd is None:
2455       _Fail("Can't find device %s", cf)
2456     bdevs.append(rd)
2457
2458   msg = []
2459   for rd in bdevs:
2460     try:
2461       rd.Close()
2462     except errors.BlockDeviceError, err:
2463       msg.append(str(err))
2464   if msg:
2465     _Fail("Can't make devices secondary: %s", ",".join(msg))
2466   else:
2467     if instance_name:
2468       _RemoveBlockDevLinks(instance_name, disks)
2469
2470
2471 def ValidateHVParams(hvname, hvparams):
2472   """Validates the given hypervisor parameters.
2473
2474   @type hvname: string
2475   @param hvname: the hypervisor name
2476   @type hvparams: dict
2477   @param hvparams: the hypervisor parameters to be validated
2478   @rtype: None
2479
2480   """
2481   try:
2482     hv_type = hypervisor.GetHypervisor(hvname)
2483     hv_type.ValidateParameters(hvparams)
2484   except errors.HypervisorError, err:
2485     _Fail(str(err), log=False)
2486
2487
2488 def _CheckOSPList(os_obj, parameters):
2489   """Check whether a list of parameters is supported by the OS.
2490
2491   @type os_obj: L{objects.OS}
2492   @param os_obj: OS object to check
2493   @type parameters: list
2494   @param parameters: the list of parameters to check
2495
2496   """
2497   supported = [v[0] for v in os_obj.supported_parameters]
2498   delta = frozenset(parameters).difference(supported)
2499   if delta:
2500     _Fail("The following parameters are not supported"
2501           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2502
2503
2504 def ValidateOS(required, osname, checks, osparams):
2505   """Validate the given OS' parameters.
2506
2507   @type required: boolean
2508   @param required: whether absence of the OS should translate into
2509       failure or not
2510   @type osname: string
2511   @param osname: the OS to be validated
2512   @type checks: list
2513   @param checks: list of the checks to run (currently only 'parameters')
2514   @type osparams: dict
2515   @param osparams: dictionary with OS parameters
2516   @rtype: boolean
2517   @return: True if the validation passed, or False if the OS was not
2518       found and L{required} was false
2519
2520   """
2521   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2522     _Fail("Unknown checks required for OS %s: %s", osname,
2523           set(checks).difference(constants.OS_VALIDATE_CALLS))
2524
2525   name_only = osname.split("+", 1)[0]
2526   status, tbv = _TryOSFromDisk(name_only, None)
2527
2528   if not status:
2529     if required:
2530       _Fail(tbv)
2531     else:
2532       return False
2533
2534   if max(tbv.api_versions) < constants.OS_API_V20:
2535     return True
2536
2537   if constants.OS_VALIDATE_PARAMETERS in checks:
2538     _CheckOSPList(tbv, osparams.keys())
2539
2540   validate_env = OSCoreEnv(tbv, osparams)
2541   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2542                         cwd=tbv.path)
2543   if result.failed:
2544     logging.error("os validate command '%s' returned error: %s output: %s",
2545                   result.cmd, result.fail_reason, result.output)
2546     _Fail("OS validation script failed (%s), output: %s",
2547           result.fail_reason, result.output, log=False)
2548
2549   return True
2550
2551
2552 def DemoteFromMC():
2553   """Demotes the current node from master candidate role.
2554
2555   """
2556   # try to ensure we're not the master by mistake
2557   master, myself = ssconf.GetMasterAndMyself()
2558   if master == myself:
2559     _Fail("ssconf status shows I'm the master node, will not demote")
2560
2561   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2562   if not result.failed:
2563     _Fail("The master daemon is running, will not demote")
2564
2565   try:
2566     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2567       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2568   except EnvironmentError, err:
2569     if err.errno != errno.ENOENT:
2570       _Fail("Error while backing up cluster file: %s", err, exc=True)
2571
2572   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2573
2574
2575 def _GetX509Filenames(cryptodir, name):
2576   """Returns the full paths for the private key and certificate.
2577
2578   """
2579   return (utils.PathJoin(cryptodir, name),
2580           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2581           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2582
2583
2584 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2585   """Creates a new X509 certificate for SSL/TLS.
2586
2587   @type validity: int
2588   @param validity: Validity in seconds
2589   @rtype: tuple; (string, string)
2590   @return: Certificate name and public part
2591
2592   """
2593   (key_pem, cert_pem) = \
2594     utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2595                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2596
2597   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2598                               prefix="x509-%s-" % utils.TimestampForFilename())
2599   try:
2600     name = os.path.basename(cert_dir)
2601     assert len(name) > 5
2602
2603     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2604
2605     utils.WriteFile(key_file, mode=0400, data=key_pem)
2606     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2607
2608     # Never return private key as it shouldn't leave the node
2609     return (name, cert_pem)
2610   except Exception:
2611     shutil.rmtree(cert_dir, ignore_errors=True)
2612     raise
2613
2614
2615 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2616   """Removes a X509 certificate.
2617
2618   @type name: string
2619   @param name: Certificate name
2620
2621   """
2622   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2623
2624   utils.RemoveFile(key_file)
2625   utils.RemoveFile(cert_file)
2626
2627   try:
2628     os.rmdir(cert_dir)
2629   except EnvironmentError, err:
2630     _Fail("Cannot remove certificate directory '%s': %s",
2631           cert_dir, err)
2632
2633
2634 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2635   """Returns the command for the requested input/output.
2636
2637   @type instance: L{objects.Instance}
2638   @param instance: The instance object
2639   @param mode: Import/export mode
2640   @param ieio: Input/output type
2641   @param ieargs: Input/output arguments
2642
2643   """
2644   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2645
2646   env = None
2647   prefix = None
2648   suffix = None
2649   exp_size = None
2650
2651   if ieio == constants.IEIO_FILE:
2652     (filename, ) = ieargs
2653
2654     if not utils.IsNormAbsPath(filename):
2655       _Fail("Path '%s' is not normalized or absolute", filename)
2656
2657     directory = os.path.normpath(os.path.dirname(filename))
2658
2659     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2660         constants.EXPORT_DIR):
2661       _Fail("File '%s' is not under exports directory '%s'",
2662             filename, constants.EXPORT_DIR)
2663
2664     # Create directory
2665     utils.Makedirs(directory, mode=0750)
2666
2667     quoted_filename = utils.ShellQuote(filename)
2668
2669     if mode == constants.IEM_IMPORT:
2670       suffix = "> %s" % quoted_filename
2671     elif mode == constants.IEM_EXPORT:
2672       suffix = "< %s" % quoted_filename
2673
2674       # Retrieve file size
2675       try:
2676         st = os.stat(filename)
2677       except EnvironmentError, err:
2678         logging.error("Can't stat(2) %s: %s", filename, err)
2679       else:
2680         exp_size = utils.BytesToMebibyte(st.st_size)
2681
2682   elif ieio == constants.IEIO_RAW_DISK:
2683     (disk, ) = ieargs
2684
2685     real_disk = _OpenRealBD(disk)
2686
2687     if mode == constants.IEM_IMPORT:
2688       # we set here a smaller block size as, due to transport buffering, more
2689       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2690       # is not already there or we pass a wrong path; we use notrunc to no
2691       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2692       # much memory; this means that at best, we flush every 64k, which will
2693       # not be very fast
2694       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2695                                     " bs=%s oflag=dsync"),
2696                                     real_disk.dev_path,
2697                                     str(64 * 1024))
2698
2699     elif mode == constants.IEM_EXPORT:
2700       # the block size on the read dd is 1MiB to match our units
2701       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2702                                    real_disk.dev_path,
2703                                    str(1024 * 1024), # 1 MB
2704                                    str(disk.size))
2705       exp_size = disk.size
2706
2707   elif ieio == constants.IEIO_SCRIPT:
2708     (disk, disk_index, ) = ieargs
2709
2710     assert isinstance(disk_index, (int, long))
2711
2712     real_disk = _OpenRealBD(disk)
2713
2714     inst_os = OSFromDisk(instance.os)
2715     env = OSEnvironment(instance, inst_os)
2716
2717     if mode == constants.IEM_IMPORT:
2718       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2719       env["IMPORT_INDEX"] = str(disk_index)
2720       script = inst_os.import_script
2721
2722     elif mode == constants.IEM_EXPORT:
2723       env["EXPORT_DEVICE"] = real_disk.dev_path
2724       env["EXPORT_INDEX"] = str(disk_index)
2725       script = inst_os.export_script
2726
2727     # TODO: Pass special environment only to script
2728     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2729
2730     if mode == constants.IEM_IMPORT:
2731       suffix = "| %s" % script_cmd
2732
2733     elif mode == constants.IEM_EXPORT:
2734       prefix = "%s |" % script_cmd
2735
2736     # Let script predict size
2737     exp_size = constants.IE_CUSTOM_SIZE
2738
2739   else:
2740     _Fail("Invalid %s I/O mode %r", mode, ieio)
2741
2742   return (env, prefix, suffix, exp_size)
2743
2744
2745 def _CreateImportExportStatusDir(prefix):
2746   """Creates status directory for import/export.
2747
2748   """
2749   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2750                           prefix=("%s-%s-" %
2751                                   (prefix, utils.TimestampForFilename())))
2752
2753
2754 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2755   """Starts an import or export daemon.
2756
2757   @param mode: Import/output mode
2758   @type opts: L{objects.ImportExportOptions}
2759   @param opts: Daemon options
2760   @type host: string
2761   @param host: Remote host for export (None for import)
2762   @type port: int
2763   @param port: Remote port for export (None for import)
2764   @type instance: L{objects.Instance}
2765   @param instance: Instance object
2766   @param ieio: Input/output type
2767   @param ieioargs: Input/output arguments
2768
2769   """
2770   if mode == constants.IEM_IMPORT:
2771     prefix = "import"
2772
2773     if not (host is None and port is None):
2774       _Fail("Can not specify host or port on import")
2775
2776   elif mode == constants.IEM_EXPORT:
2777     prefix = "export"
2778
2779     if host is None or port is None:
2780       _Fail("Host and port must be specified for an export")
2781
2782   else:
2783     _Fail("Invalid mode %r", mode)
2784
2785   if (opts.key_name is None) ^ (opts.ca_pem is None):
2786     _Fail("Cluster certificate can only be used for both key and CA")
2787
2788   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2789     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2790
2791   if opts.key_name is None:
2792     # Use server.pem
2793     key_path = constants.NODED_CERT_FILE
2794     cert_path = constants.NODED_CERT_FILE
2795     assert opts.ca_pem is None
2796   else:
2797     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2798                                                  opts.key_name)
2799     assert opts.ca_pem is not None
2800
2801   for i in [key_path, cert_path]:
2802     if not os.path.exists(i):
2803       _Fail("File '%s' does not exist" % i)
2804
2805   status_dir = _CreateImportExportStatusDir(prefix)
2806   try:
2807     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2808     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2809     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2810
2811     if opts.ca_pem is None:
2812       # Use server.pem
2813       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2814     else:
2815       ca = opts.ca_pem
2816
2817     # Write CA file
2818     utils.WriteFile(ca_file, data=ca, mode=0400)
2819
2820     cmd = [
2821       constants.IMPORT_EXPORT_DAEMON,
2822       status_file, mode,
2823       "--key=%s" % key_path,
2824       "--cert=%s" % cert_path,
2825       "--ca=%s" % ca_file,
2826       ]
2827
2828     if host:
2829       cmd.append("--host=%s" % host)
2830
2831     if port:
2832       cmd.append("--port=%s" % port)
2833
2834     if opts.compress:
2835       cmd.append("--compress=%s" % opts.compress)
2836
2837     if opts.magic:
2838       cmd.append("--magic=%s" % opts.magic)
2839
2840     if exp_size is not None:
2841       cmd.append("--expected-size=%s" % exp_size)
2842
2843     if cmd_prefix:
2844       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2845
2846     if cmd_suffix:
2847       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2848
2849     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2850
2851     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2852     # support for receiving a file descriptor for output
2853     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2854                       output=logfile)
2855
2856     # The import/export name is simply the status directory name
2857     return os.path.basename(status_dir)
2858
2859   except Exception:
2860     shutil.rmtree(status_dir, ignore_errors=True)
2861     raise
2862
2863
2864 def GetImportExportStatus(names):
2865   """Returns import/export daemon status.
2866
2867   @type names: sequence
2868   @param names: List of names
2869   @rtype: List of dicts
2870   @return: Returns a list of the state of each named import/export or None if a
2871            status couldn't be read
2872
2873   """
2874   result = []
2875
2876   for name in names:
2877     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2878                                  _IES_STATUS_FILE)
2879
2880     try:
2881       data = utils.ReadFile(status_file)
2882     except EnvironmentError, err:
2883       if err.errno != errno.ENOENT:
2884         raise
2885       data = None
2886
2887     if not data:
2888       result.append(None)
2889       continue
2890
2891     result.append(serializer.LoadJson(data))
2892
2893   return result
2894
2895
2896 def AbortImportExport(name):
2897   """Sends SIGTERM to a running import/export daemon.
2898
2899   """
2900   logging.info("Abort import/export %s", name)
2901
2902   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2903   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2904
2905   if pid:
2906     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2907                  name, pid)
2908     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2909
2910
2911 def CleanupImportExport(name):
2912   """Cleanup after an import or export.
2913
2914   If the import/export daemon is still running it's killed. Afterwards the
2915   whole status directory is removed.
2916
2917   """
2918   logging.info("Finalizing import/export %s", name)
2919
2920   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2921
2922   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2923
2924   if pid:
2925     logging.info("Import/export %s is still running with PID %s",
2926                  name, pid)
2927     utils.KillProcess(pid, waitpid=False)
2928
2929   shutil.rmtree(status_dir, ignore_errors=True)
2930
2931
2932 def _FindDisks(nodes_ip, disks):
2933   """Sets the physical ID on disks and returns the block devices.
2934
2935   """
2936   # set the correct physical ID
2937   my_name = utils.HostInfo().name
2938   for cf in disks:
2939     cf.SetPhysicalID(my_name, nodes_ip)
2940
2941   bdevs = []
2942
2943   for cf in disks:
2944     rd = _RecursiveFindBD(cf)
2945     if rd is None:
2946       _Fail("Can't find device %s", cf)
2947     bdevs.append(rd)
2948   return bdevs
2949
2950
2951 def DrbdDisconnectNet(nodes_ip, disks):
2952   """Disconnects the network on a list of drbd devices.
2953
2954   """
2955   bdevs = _FindDisks(nodes_ip, disks)
2956
2957   # disconnect disks
2958   for rd in bdevs:
2959     try:
2960       rd.DisconnectNet()
2961     except errors.BlockDeviceError, err:
2962       _Fail("Can't change network configuration to standalone mode: %s",
2963             err, exc=True)
2964
2965
2966 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2967   """Attaches the network on a list of drbd devices.
2968
2969   """
2970   bdevs = _FindDisks(nodes_ip, disks)
2971
2972   if multimaster:
2973     for idx, rd in enumerate(bdevs):
2974       try:
2975         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2976       except EnvironmentError, err:
2977         _Fail("Can't create symlink: %s", err)
2978   # reconnect disks, switch to new master configuration and if
2979   # needed primary mode
2980   for rd in bdevs:
2981     try:
2982       rd.AttachNet(multimaster)
2983     except errors.BlockDeviceError, err:
2984       _Fail("Can't change network configuration: %s", err)
2985
2986   # wait until the disks are connected; we need to retry the re-attach
2987   # if the device becomes standalone, as this might happen if the one
2988   # node disconnects and reconnects in a different mode before the
2989   # other node reconnects; in this case, one or both of the nodes will
2990   # decide it has wrong configuration and switch to standalone
2991
2992   def _Attach():
2993     all_connected = True
2994
2995     for rd in bdevs:
2996       stats = rd.GetProcStatus()
2997
2998       all_connected = (all_connected and
2999                        (stats.is_connected or stats.is_in_resync))
3000
3001       if stats.is_standalone:
3002         # peer had different config info and this node became
3003         # standalone, even though this should not happen with the
3004         # new staged way of changing disk configs
3005         try:
3006           rd.AttachNet(multimaster)
3007         except errors.BlockDeviceError, err:
3008           _Fail("Can't change network configuration: %s", err)
3009
3010     if not all_connected:
3011       raise utils.RetryAgain()
3012
3013   try:
3014     # Start with a delay of 100 miliseconds and go up to 5 seconds
3015     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3016   except utils.RetryTimeout:
3017     _Fail("Timeout in disk reconnecting")
3018
3019   if multimaster:
3020     # change to primary mode
3021     for rd in bdevs:
3022       try:
3023         rd.Open()
3024       except errors.BlockDeviceError, err:
3025         _Fail("Can't change to primary mode: %s", err)
3026
3027
3028 def DrbdWaitSync(nodes_ip, disks):
3029   """Wait until DRBDs have synchronized.
3030
3031   """
3032   def _helper(rd):
3033     stats = rd.GetProcStatus()
3034     if not (stats.is_connected or stats.is_in_resync):
3035       raise utils.RetryAgain()
3036     return stats
3037
3038   bdevs = _FindDisks(nodes_ip, disks)
3039
3040   min_resync = 100
3041   alldone = True
3042   for rd in bdevs:
3043     try:
3044       # poll each second for 15 seconds
3045       stats = utils.Retry(_helper, 1, 15, args=[rd])
3046     except utils.RetryTimeout:
3047       stats = rd.GetProcStatus()
3048       # last check
3049       if not (stats.is_connected or stats.is_in_resync):
3050         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3051     alldone = alldone and (not stats.is_in_resync)
3052     if stats.sync_percent is not None:
3053       min_resync = min(min_resync, stats.sync_percent)
3054
3055   return (alldone, min_resync)
3056
3057
3058 def GetDrbdUsermodeHelper():
3059   """Returns DRBD usermode helper currently configured.
3060
3061   """
3062   try:
3063     return bdev.BaseDRBD.GetUsermodeHelper()
3064   except errors.BlockDeviceError, err:
3065     _Fail(str(err))
3066
3067
3068 def PowercycleNode(hypervisor_type):
3069   """Hard-powercycle the node.
3070
3071   Because we need to return first, and schedule the powercycle in the
3072   background, we won't be able to report failures nicely.
3073
3074   """
3075   hyper = hypervisor.GetHypervisor(hypervisor_type)
3076   try:
3077     pid = os.fork()
3078   except OSError:
3079     # if we can't fork, we'll pretend that we're in the child process
3080     pid = 0
3081   if pid > 0:
3082     return "Reboot scheduled in 5 seconds"
3083   # ensure the child is running on ram
3084   try:
3085     utils.Mlockall()
3086   except Exception: # pylint: disable-msg=W0703
3087     pass
3088   time.sleep(5)
3089   hyper.PowercycleNode()
3090
3091
3092 class HooksRunner(object):
3093   """Hook runner.
3094
3095   This class is instantiated on the node side (ganeti-noded) and not
3096   on the master side.
3097
3098   """
3099   def __init__(self, hooks_base_dir=None):
3100     """Constructor for hooks runner.
3101
3102     @type hooks_base_dir: str or None
3103     @param hooks_base_dir: if not None, this overrides the
3104         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3105
3106     """
3107     if hooks_base_dir is None:
3108       hooks_base_dir = constants.HOOKS_BASE_DIR
3109     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3110     # constant
3111     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3112
3113   def RunHooks(self, hpath, phase, env):
3114     """Run the scripts in the hooks directory.
3115
3116     @type hpath: str
3117     @param hpath: the path to the hooks directory which
3118         holds the scripts
3119     @type phase: str
3120     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3121         L{constants.HOOKS_PHASE_POST}
3122     @type env: dict
3123     @param env: dictionary with the environment for the hook
3124     @rtype: list
3125     @return: list of 3-element tuples:
3126       - script path
3127       - script result, either L{constants.HKR_SUCCESS} or
3128         L{constants.HKR_FAIL}
3129       - output of the script
3130
3131     @raise errors.ProgrammerError: for invalid input
3132         parameters
3133
3134     """
3135     if phase == constants.HOOKS_PHASE_PRE:
3136       suffix = "pre"
3137     elif phase == constants.HOOKS_PHASE_POST:
3138       suffix = "post"
3139     else:
3140       _Fail("Unknown hooks phase '%s'", phase)
3141
3142
3143     subdir = "%s-%s.d" % (hpath, suffix)
3144     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3145
3146     results = []
3147
3148     if not os.path.isdir(dir_name):
3149       # for non-existing/non-dirs, we simply exit instead of logging a
3150       # warning at every operation
3151       return results
3152
3153     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3154
3155     for (relname, relstatus, runresult)  in runparts_results:
3156       if relstatus == constants.RUNPARTS_SKIP:
3157         rrval = constants.HKR_SKIP
3158         output = ""
3159       elif relstatus == constants.RUNPARTS_ERR:
3160         rrval = constants.HKR_FAIL
3161         output = "Hook script execution error: %s" % runresult
3162       elif relstatus == constants.RUNPARTS_RUN:
3163         if runresult.failed:
3164           rrval = constants.HKR_FAIL
3165         else:
3166           rrval = constants.HKR_SUCCESS
3167         output = utils.SafeEncode(runresult.output.strip())
3168       results.append(("%s/%s" % (subdir, relname), rrval, output))
3169
3170     return results
3171
3172
3173 class IAllocatorRunner(object):
3174   """IAllocator runner.
3175
3176   This class is instantiated on the node side (ganeti-noded) and not on
3177   the master side.
3178
3179   """
3180   @staticmethod
3181   def Run(name, idata):
3182     """Run an iallocator script.
3183
3184     @type name: str
3185     @param name: the iallocator script name
3186     @type idata: str
3187     @param idata: the allocator input data
3188
3189     @rtype: tuple
3190     @return: two element tuple of:
3191        - status
3192        - either error message or stdout of allocator (for success)
3193
3194     """
3195     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3196                                   os.path.isfile)
3197     if alloc_script is None:
3198       _Fail("iallocator module '%s' not found in the search path", name)
3199
3200     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3201     try:
3202       os.write(fd, idata)
3203       os.close(fd)
3204       result = utils.RunCmd([alloc_script, fin_name])
3205       if result.failed:
3206         _Fail("iallocator module '%s' failed: %s, output '%s'",
3207               name, result.fail_reason, result.output)
3208     finally:
3209       os.unlink(fin_name)
3210
3211     return result.stdout
3212
3213
3214 class DevCacheManager(object):
3215   """Simple class for managing a cache of block device information.
3216
3217   """
3218   _DEV_PREFIX = "/dev/"
3219   _ROOT_DIR = constants.BDEV_CACHE_DIR
3220
3221   @classmethod
3222   def _ConvertPath(cls, dev_path):
3223     """Converts a /dev/name path to the cache file name.
3224
3225     This replaces slashes with underscores and strips the /dev
3226     prefix. It then returns the full path to the cache file.
3227
3228     @type dev_path: str
3229     @param dev_path: the C{/dev/} path name
3230     @rtype: str
3231     @return: the converted path name
3232
3233     """
3234     if dev_path.startswith(cls._DEV_PREFIX):
3235       dev_path = dev_path[len(cls._DEV_PREFIX):]
3236     dev_path = dev_path.replace("/", "_")
3237     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3238     return fpath
3239
3240   @classmethod
3241   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3242     """Updates the cache information for a given device.
3243
3244     @type dev_path: str
3245     @param dev_path: the pathname of the device
3246     @type owner: str
3247     @param owner: the owner (instance name) of the device
3248     @type on_primary: bool
3249     @param on_primary: whether this is the primary
3250         node nor not
3251     @type iv_name: str
3252     @param iv_name: the instance-visible name of the
3253         device, as in objects.Disk.iv_name
3254
3255     @rtype: None
3256
3257     """
3258     if dev_path is None:
3259       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3260       return
3261     fpath = cls._ConvertPath(dev_path)
3262     if on_primary:
3263       state = "primary"
3264     else:
3265       state = "secondary"
3266     if iv_name is None:
3267       iv_name = "not_visible"
3268     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3269     try:
3270       utils.WriteFile(fpath, data=fdata)
3271     except EnvironmentError, err:
3272       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3273
3274   @classmethod
3275   def RemoveCache(cls, dev_path):
3276     """Remove data for a dev_path.
3277
3278     This is just a wrapper over L{utils.RemoveFile} with a converted
3279     path name and logging.
3280
3281     @type dev_path: str
3282     @param dev_path: the pathname of the device
3283
3284     @rtype: None
3285
3286     """
3287     if dev_path is None:
3288       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3289       return
3290     fpath = cls._ConvertPath(dev_path)
3291     try:
3292       utils.RemoveFile(fpath)
3293     except EnvironmentError, err:
3294       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)