Simplify and extend the instance OS env
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62 from ganeti import runtime
63
64
65 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
66 _ALLOWED_CLEAN_DIRS = frozenset([
67   constants.DATA_DIR,
68   constants.JOB_QUEUE_ARCHIVE_DIR,
69   constants.QUEUE_DIR,
70   constants.CRYPTO_KEYS_DIR,
71   ])
72 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
73 _X509_KEY_FILE = "key"
74 _X509_CERT_FILE = "cert"
75 _IES_STATUS_FILE = "status"
76 _IES_PID_FILE = "pid"
77 _IES_CA_FILE = "ca"
78
79
80 class RPCFail(Exception):
81   """Class denoting RPC failure.
82
83   Its argument is the error message.
84
85   """
86
87
88 def _Fail(msg, *args, **kwargs):
89   """Log an error and the raise an RPCFail exception.
90
91   This exception is then handled specially in the ganeti daemon and
92   turned into a 'failed' return type. As such, this function is a
93   useful shortcut for logging the error and returning it to the master
94   daemon.
95
96   @type msg: string
97   @param msg: the text of the exception
98   @raise RPCFail
99
100   """
101   if args:
102     msg = msg % args
103   if "log" not in kwargs or kwargs["log"]: # if we should log this error
104     if "exc" in kwargs and kwargs["exc"]:
105       logging.exception(msg)
106     else:
107       logging.error(msg)
108   raise RPCFail(msg)
109
110
111 def _GetConfig():
112   """Simple wrapper to return a SimpleStore.
113
114   @rtype: L{ssconf.SimpleStore}
115   @return: a SimpleStore instance
116
117   """
118   return ssconf.SimpleStore()
119
120
121 def _GetSshRunner(cluster_name):
122   """Simple wrapper to return an SshRunner.
123
124   @type cluster_name: str
125   @param cluster_name: the cluster name, which is needed
126       by the SshRunner constructor
127   @rtype: L{ssh.SshRunner}
128   @return: an SshRunner instance
129
130   """
131   return ssh.SshRunner(cluster_name)
132
133
134 def _Decompress(data):
135   """Unpacks data compressed by the RPC client.
136
137   @type data: list or tuple
138   @param data: Data sent by RPC client
139   @rtype: str
140   @return: Decompressed data
141
142   """
143   assert isinstance(data, (list, tuple))
144   assert len(data) == 2
145   (encoding, content) = data
146   if encoding == constants.RPC_ENCODING_NONE:
147     return content
148   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
149     return zlib.decompress(base64.b64decode(content))
150   else:
151     raise AssertionError("Unknown data encoding")
152
153
154 def _CleanDirectory(path, exclude=None):
155   """Removes all regular files in a directory.
156
157   @type path: str
158   @param path: the directory to clean
159   @type exclude: list
160   @param exclude: list of files to be excluded, defaults
161       to the empty list
162
163   """
164   if path not in _ALLOWED_CLEAN_DIRS:
165     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
166           path)
167
168   if not os.path.isdir(path):
169     return
170   if exclude is None:
171     exclude = []
172   else:
173     # Normalize excluded paths
174     exclude = [os.path.normpath(i) for i in exclude]
175
176   for rel_name in utils.ListVisibleFiles(path):
177     full_name = utils.PathJoin(path, rel_name)
178     if full_name in exclude:
179       continue
180     if os.path.isfile(full_name) and not os.path.islink(full_name):
181       utils.RemoveFile(full_name)
182
183
184 def _BuildUploadFileList():
185   """Build the list of allowed upload files.
186
187   This is abstracted so that it's built only once at module import time.
188
189   """
190   allowed_files = set([
191     constants.CLUSTER_CONF_FILE,
192     constants.ETC_HOSTS,
193     constants.SSH_KNOWN_HOSTS_FILE,
194     constants.VNC_PASSWORD_FILE,
195     constants.RAPI_CERT_FILE,
196     constants.RAPI_USERS_FILE,
197     constants.CONFD_HMAC_KEY,
198     constants.CLUSTER_DOMAIN_SECRET_FILE,
199     ])
200
201   for hv_name in constants.HYPER_TYPES:
202     hv_class = hypervisor.GetHypervisorClass(hv_name)
203     allowed_files.update(hv_class.GetAncillaryFiles())
204
205   return frozenset(allowed_files)
206
207
208 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
209
210
211 def JobQueuePurge():
212   """Removes job queue files and archived jobs.
213
214   @rtype: tuple
215   @return: True, None
216
217   """
218   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
219   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
220
221
222 def GetMasterInfo():
223   """Returns master information.
224
225   This is an utility function to compute master information, either
226   for consumption here or from the node daemon.
227
228   @rtype: tuple
229   @return: master_netdev, master_ip, master_name, primary_ip_family
230   @raise RPCFail: in case of errors
231
232   """
233   try:
234     cfg = _GetConfig()
235     master_netdev = cfg.GetMasterNetdev()
236     master_ip = cfg.GetMasterIP()
237     master_node = cfg.GetMasterNode()
238     primary_ip_family = cfg.GetPrimaryIPFamily()
239   except errors.ConfigurationError, err:
240     _Fail("Cluster configuration incomplete: %s", err, exc=True)
241   return (master_netdev, master_ip, master_node, primary_ip_family)
242
243
244 def StartMaster(start_daemons, no_voting):
245   """Activate local node as master node.
246
247   The function will either try activate the IP address of the master
248   (unless someone else has it) or also start the master daemons, based
249   on the start_daemons parameter.
250
251   @type start_daemons: boolean
252   @param start_daemons: whether to start the master daemons
253       (ganeti-masterd and ganeti-rapi), or (if false) activate the
254       master ip
255   @type no_voting: boolean
256   @param no_voting: whether to start ganeti-masterd without a node vote
257       (if start_daemons is True), but still non-interactively
258   @rtype: None
259
260   """
261   # GetMasterInfo will raise an exception if not able to return data
262   master_netdev, master_ip, _, family = GetMasterInfo()
263
264   err_msgs = []
265   # either start the master and rapi daemons
266   if start_daemons:
267     if no_voting:
268       masterd_args = "--no-voting --yes-do-it"
269     else:
270       masterd_args = ""
271
272     env = {
273       "EXTRA_MASTERD_ARGS": masterd_args,
274       }
275
276     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
277     if result.failed:
278       msg = "Can't start Ganeti master: %s" % result.output
279       logging.error(msg)
280       err_msgs.append(msg)
281   # or activate the IP
282   else:
283     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
284       if netutils.IPAddress.Own(master_ip):
285         # we already have the ip:
286         logging.debug("Master IP already configured, doing nothing")
287       else:
288         msg = "Someone else has the master ip, not activating"
289         logging.error(msg)
290         err_msgs.append(msg)
291     else:
292       ipcls = netutils.IP4Address
293       if family == netutils.IP6Address.family:
294         ipcls = netutils.IP6Address
295
296       result = utils.RunCmd(["ip", "address", "add",
297                              "%s/%d" % (master_ip, ipcls.iplen),
298                              "dev", master_netdev, "label",
299                              "%s:0" % master_netdev])
300       if result.failed:
301         msg = "Can't activate master IP: %s" % result.output
302         logging.error(msg)
303         err_msgs.append(msg)
304
305       # we ignore the exit code of the following cmds
306       if ipcls == netutils.IP4Address:
307         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
308                       master_ip, master_ip])
309       elif ipcls == netutils.IP6Address:
310         try:
311           utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
312         except errors.OpExecError:
313           # TODO: Better error reporting
314           logging.warning("Can't execute ndisc6, please install if missing")
315
316   if err_msgs:
317     _Fail("; ".join(err_msgs))
318
319
320 def StopMaster(stop_daemons):
321   """Deactivate this node as master.
322
323   The function will always try to deactivate the IP address of the
324   master. It will also stop the master daemons depending on the
325   stop_daemons parameter.
326
327   @type stop_daemons: boolean
328   @param stop_daemons: whether to also stop the master daemons
329       (ganeti-masterd and ganeti-rapi)
330   @rtype: None
331
332   """
333   # TODO: log and report back to the caller the error failures; we
334   # need to decide in which case we fail the RPC for this
335
336   # GetMasterInfo will raise an exception if not able to return data
337   master_netdev, master_ip, _, family = GetMasterInfo()
338
339   ipcls = netutils.IP4Address
340   if family == netutils.IP6Address.family:
341     ipcls = netutils.IP6Address
342
343   result = utils.RunCmd(["ip", "address", "del",
344                          "%s/%d" % (master_ip, ipcls.iplen),
345                          "dev", master_netdev])
346   if result.failed:
347     logging.error("Can't remove the master IP, error: %s", result.output)
348     # but otherwise ignore the failure
349
350   if stop_daemons:
351     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
352     if result.failed:
353       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
354                     " and error %s",
355                     result.cmd, result.exit_code, result.output)
356
357
358 def EtcHostsModify(mode, host, ip):
359   """Modify a host entry in /etc/hosts.
360
361   @param mode: The mode to operate. Either add or remove entry
362   @param host: The host to operate on
363   @param ip: The ip associated with the entry
364
365   """
366   if mode == constants.ETC_HOSTS_ADD:
367     if not ip:
368       RPCFail("Mode 'add' needs 'ip' parameter, but parameter not"
369               " present")
370     utils.AddHostToEtcHosts(host, ip)
371   elif mode == constants.ETC_HOSTS_REMOVE:
372     if ip:
373       RPCFail("Mode 'remove' does not allow 'ip' parameter, but"
374               " parameter is present")
375     utils.RemoveHostFromEtcHosts(host)
376   else:
377     RPCFail("Mode not supported")
378
379
380 def LeaveCluster(modify_ssh_setup):
381   """Cleans up and remove the current node.
382
383   This function cleans up and prepares the current node to be removed
384   from the cluster.
385
386   If processing is successful, then it raises an
387   L{errors.QuitGanetiException} which is used as a special case to
388   shutdown the node daemon.
389
390   @param modify_ssh_setup: boolean
391
392   """
393   _CleanDirectory(constants.DATA_DIR)
394   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
395   JobQueuePurge()
396
397   if modify_ssh_setup:
398     try:
399       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
400
401       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
402
403       utils.RemoveFile(priv_key)
404       utils.RemoveFile(pub_key)
405     except errors.OpExecError:
406       logging.exception("Error while processing ssh files")
407
408   try:
409     utils.RemoveFile(constants.CONFD_HMAC_KEY)
410     utils.RemoveFile(constants.RAPI_CERT_FILE)
411     utils.RemoveFile(constants.NODED_CERT_FILE)
412   except: # pylint: disable-msg=W0702
413     logging.exception("Error while removing cluster secrets")
414
415   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
416   if result.failed:
417     logging.error("Command %s failed with exitcode %s and error %s",
418                   result.cmd, result.exit_code, result.output)
419
420   # Raise a custom exception (handled in ganeti-noded)
421   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
422
423
424 def GetNodeInfo(vgname, hypervisor_type):
425   """Gives back a hash with different information about the node.
426
427   @type vgname: C{string}
428   @param vgname: the name of the volume group to ask for disk space information
429   @type hypervisor_type: C{str}
430   @param hypervisor_type: the name of the hypervisor to ask for
431       memory information
432   @rtype: C{dict}
433   @return: dictionary with the following keys:
434       - vg_size is the size of the configured volume group in MiB
435       - vg_free is the free size of the volume group in MiB
436       - memory_dom0 is the memory allocated for domain0 in MiB
437       - memory_free is the currently available (free) ram in MiB
438       - memory_total is the total number of ram in MiB
439
440   """
441   outputarray = {}
442
443   vginfo = bdev.LogicalVolume.GetVGInfo([vgname])
444   vg_free = vg_size = None
445   if vginfo:
446     vg_free = int(round(vginfo[0][0], 0))
447     vg_size = int(round(vginfo[0][1], 0))
448
449   outputarray['vg_size'] = vg_size
450   outputarray['vg_free'] = vg_free
451
452   hyper = hypervisor.GetHypervisor(hypervisor_type)
453   hyp_info = hyper.GetNodeInfo()
454   if hyp_info is not None:
455     outputarray.update(hyp_info)
456
457   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
458
459   return outputarray
460
461
462 def VerifyNode(what, cluster_name):
463   """Verify the status of the local node.
464
465   Based on the input L{what} parameter, various checks are done on the
466   local node.
467
468   If the I{filelist} key is present, this list of
469   files is checksummed and the file/checksum pairs are returned.
470
471   If the I{nodelist} key is present, we check that we have
472   connectivity via ssh with the target nodes (and check the hostname
473   report).
474
475   If the I{node-net-test} key is present, we check that we have
476   connectivity to the given nodes via both primary IP and, if
477   applicable, secondary IPs.
478
479   @type what: C{dict}
480   @param what: a dictionary of things to check:
481       - filelist: list of files for which to compute checksums
482       - nodelist: list of nodes we should check ssh communication with
483       - node-net-test: list of nodes we should check node daemon port
484         connectivity with
485       - hypervisor: list with hypervisors to run the verify for
486   @rtype: dict
487   @return: a dictionary with the same keys as the input dict, and
488       values representing the result of the checks
489
490   """
491   result = {}
492   my_name = netutils.Hostname.GetSysName()
493   port = netutils.GetDaemonPort(constants.NODED)
494
495   if constants.NV_HYPERVISOR in what:
496     result[constants.NV_HYPERVISOR] = tmp = {}
497     for hv_name in what[constants.NV_HYPERVISOR]:
498       try:
499         val = hypervisor.GetHypervisor(hv_name).Verify()
500       except errors.HypervisorError, err:
501         val = "Error while checking hypervisor: %s" % str(err)
502       tmp[hv_name] = val
503
504   if constants.NV_FILELIST in what:
505     result[constants.NV_FILELIST] = utils.FingerprintFiles(
506       what[constants.NV_FILELIST])
507
508   if constants.NV_NODELIST in what:
509     result[constants.NV_NODELIST] = tmp = {}
510     random.shuffle(what[constants.NV_NODELIST])
511     for node in what[constants.NV_NODELIST]:
512       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
513       if not success:
514         tmp[node] = message
515
516   if constants.NV_NODENETTEST in what:
517     result[constants.NV_NODENETTEST] = tmp = {}
518     my_pip = my_sip = None
519     for name, pip, sip in what[constants.NV_NODENETTEST]:
520       if name == my_name:
521         my_pip = pip
522         my_sip = sip
523         break
524     if not my_pip:
525       tmp[my_name] = ("Can't find my own primary/secondary IP"
526                       " in the node list")
527     else:
528       for name, pip, sip in what[constants.NV_NODENETTEST]:
529         fail = []
530         if not netutils.TcpPing(pip, port, source=my_pip):
531           fail.append("primary")
532         if sip != pip:
533           if not netutils.TcpPing(sip, port, source=my_sip):
534             fail.append("secondary")
535         if fail:
536           tmp[name] = ("failure using the %s interface(s)" %
537                        " and ".join(fail))
538
539   if constants.NV_MASTERIP in what:
540     # FIXME: add checks on incoming data structures (here and in the
541     # rest of the function)
542     master_name, master_ip = what[constants.NV_MASTERIP]
543     if master_name == my_name:
544       source = constants.IP4_ADDRESS_LOCALHOST
545     else:
546       source = None
547     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
548                                                   source=source)
549
550   if constants.NV_LVLIST in what:
551     try:
552       val = GetVolumeList(what[constants.NV_LVLIST])
553     except RPCFail, err:
554       val = str(err)
555     result[constants.NV_LVLIST] = val
556
557   if constants.NV_INSTANCELIST in what:
558     # GetInstanceList can fail
559     try:
560       val = GetInstanceList(what[constants.NV_INSTANCELIST])
561     except RPCFail, err:
562       val = str(err)
563     result[constants.NV_INSTANCELIST] = val
564
565   if constants.NV_VGLIST in what:
566     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
567
568   if constants.NV_PVLIST in what:
569     result[constants.NV_PVLIST] = \
570       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
571                                    filter_allocatable=False)
572
573   if constants.NV_VERSION in what:
574     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
575                                     constants.RELEASE_VERSION)
576
577   if constants.NV_HVINFO in what:
578     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
579     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
580
581   if constants.NV_DRBDLIST in what:
582     try:
583       used_minors = bdev.DRBD8.GetUsedDevs().keys()
584     except errors.BlockDeviceError, err:
585       logging.warning("Can't get used minors list", exc_info=True)
586       used_minors = str(err)
587     result[constants.NV_DRBDLIST] = used_minors
588
589   if constants.NV_DRBDHELPER in what:
590     status = True
591     try:
592       payload = bdev.BaseDRBD.GetUsermodeHelper()
593     except errors.BlockDeviceError, err:
594       logging.error("Can't get DRBD usermode helper: %s", str(err))
595       status = False
596       payload = str(err)
597     result[constants.NV_DRBDHELPER] = (status, payload)
598
599   if constants.NV_NODESETUP in what:
600     result[constants.NV_NODESETUP] = tmpr = []
601     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
602       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
603                   " under /sys, missing required directories /sys/block"
604                   " and /sys/class/net")
605     if (not os.path.isdir("/proc/sys") or
606         not os.path.isfile("/proc/sysrq-trigger")):
607       tmpr.append("The procfs filesystem doesn't seem to be mounted"
608                   " under /proc, missing required directory /proc/sys and"
609                   " the file /proc/sysrq-trigger")
610
611   if constants.NV_TIME in what:
612     result[constants.NV_TIME] = utils.SplitTime(time.time())
613
614   if constants.NV_OSLIST in what:
615     result[constants.NV_OSLIST] = DiagnoseOS()
616
617   return result
618
619
620 def GetVolumeList(vg_name):
621   """Compute list of logical volumes and their size.
622
623   @type vg_name: str
624   @param vg_name: the volume group whose LVs we should list
625   @rtype: dict
626   @return:
627       dictionary of all partions (key) with value being a tuple of
628       their size (in MiB), inactive and online status::
629
630         {'test1': ('20.06', True, True)}
631
632       in case of errors, a string is returned with the error
633       details.
634
635   """
636   lvs = {}
637   sep = '|'
638   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
639                          "--separator=%s" % sep,
640                          "-olv_name,lv_size,lv_attr", vg_name])
641   if result.failed:
642     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
643
644   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
645   for line in result.stdout.splitlines():
646     line = line.strip()
647     match = valid_line_re.match(line)
648     if not match:
649       logging.error("Invalid line returned from lvs output: '%s'", line)
650       continue
651     name, size, attr = match.groups()
652     inactive = attr[4] == '-'
653     online = attr[5] == 'o'
654     virtual = attr[0] == 'v'
655     if virtual:
656       # we don't want to report such volumes as existing, since they
657       # don't really hold data
658       continue
659     lvs[name] = (size, inactive, online)
660
661   return lvs
662
663
664 def ListVolumeGroups():
665   """List the volume groups and their size.
666
667   @rtype: dict
668   @return: dictionary with keys volume name and values the
669       size of the volume
670
671   """
672   return utils.ListVolumeGroups()
673
674
675 def NodeVolumes():
676   """List all volumes on this node.
677
678   @rtype: list
679   @return:
680     A list of dictionaries, each having four keys:
681       - name: the logical volume name,
682       - size: the size of the logical volume
683       - dev: the physical device on which the LV lives
684       - vg: the volume group to which it belongs
685
686     In case of errors, we return an empty list and log the
687     error.
688
689     Note that since a logical volume can live on multiple physical
690     volumes, the resulting list might include a logical volume
691     multiple times.
692
693   """
694   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
695                          "--separator=|",
696                          "--options=lv_name,lv_size,devices,vg_name"])
697   if result.failed:
698     _Fail("Failed to list logical volumes, lvs output: %s",
699           result.output)
700
701   def parse_dev(dev):
702     return dev.split('(')[0]
703
704   def handle_dev(dev):
705     return [parse_dev(x) for x in dev.split(",")]
706
707   def map_line(line):
708     line = [v.strip() for v in line]
709     return [{'name': line[0], 'size': line[1],
710              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
711
712   all_devs = []
713   for line in result.stdout.splitlines():
714     if line.count('|') >= 3:
715       all_devs.extend(map_line(line.split('|')))
716     else:
717       logging.warning("Strange line in the output from lvs: '%s'", line)
718   return all_devs
719
720
721 def BridgesExist(bridges_list):
722   """Check if a list of bridges exist on the current node.
723
724   @rtype: boolean
725   @return: C{True} if all of them exist, C{False} otherwise
726
727   """
728   missing = []
729   for bridge in bridges_list:
730     if not utils.BridgeExists(bridge):
731       missing.append(bridge)
732
733   if missing:
734     _Fail("Missing bridges %s", utils.CommaJoin(missing))
735
736
737 def GetInstanceList(hypervisor_list):
738   """Provides a list of instances.
739
740   @type hypervisor_list: list
741   @param hypervisor_list: the list of hypervisors to query information
742
743   @rtype: list
744   @return: a list of all running instances on the current node
745     - instance1.example.com
746     - instance2.example.com
747
748   """
749   results = []
750   for hname in hypervisor_list:
751     try:
752       names = hypervisor.GetHypervisor(hname).ListInstances()
753       results.extend(names)
754     except errors.HypervisorError, err:
755       _Fail("Error enumerating instances (hypervisor %s): %s",
756             hname, err, exc=True)
757
758   return results
759
760
761 def GetInstanceInfo(instance, hname):
762   """Gives back the information about an instance as a dictionary.
763
764   @type instance: string
765   @param instance: the instance name
766   @type hname: string
767   @param hname: the hypervisor type of the instance
768
769   @rtype: dict
770   @return: dictionary with the following keys:
771       - memory: memory size of instance (int)
772       - state: xen state of instance (string)
773       - time: cpu time of instance (float)
774
775   """
776   output = {}
777
778   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
779   if iinfo is not None:
780     output['memory'] = iinfo[2]
781     output['state'] = iinfo[4]
782     output['time'] = iinfo[5]
783
784   return output
785
786
787 def GetInstanceMigratable(instance):
788   """Gives whether an instance can be migrated.
789
790   @type instance: L{objects.Instance}
791   @param instance: object representing the instance to be checked.
792
793   @rtype: tuple
794   @return: tuple of (result, description) where:
795       - result: whether the instance can be migrated or not
796       - description: a description of the issue, if relevant
797
798   """
799   hyper = hypervisor.GetHypervisor(instance.hypervisor)
800   iname = instance.name
801   if iname not in hyper.ListInstances():
802     _Fail("Instance %s is not running", iname)
803
804   for idx in range(len(instance.disks)):
805     link_name = _GetBlockDevSymlinkPath(iname, idx)
806     if not os.path.islink(link_name):
807       logging.warning("Instance %s is missing symlink %s for disk %d",
808                       iname, link_name, idx)
809
810
811 def GetAllInstancesInfo(hypervisor_list):
812   """Gather data about all instances.
813
814   This is the equivalent of L{GetInstanceInfo}, except that it
815   computes data for all instances at once, thus being faster if one
816   needs data about more than one instance.
817
818   @type hypervisor_list: list
819   @param hypervisor_list: list of hypervisors to query for instance data
820
821   @rtype: dict
822   @return: dictionary of instance: data, with data having the following keys:
823       - memory: memory size of instance (int)
824       - state: xen state of instance (string)
825       - time: cpu time of instance (float)
826       - vcpus: the number of vcpus
827
828   """
829   output = {}
830
831   for hname in hypervisor_list:
832     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
833     if iinfo:
834       for name, _, memory, vcpus, state, times in iinfo:
835         value = {
836           'memory': memory,
837           'vcpus': vcpus,
838           'state': state,
839           'time': times,
840           }
841         if name in output:
842           # we only check static parameters, like memory and vcpus,
843           # and not state and time which can change between the
844           # invocations of the different hypervisors
845           for key in 'memory', 'vcpus':
846             if value[key] != output[name][key]:
847               _Fail("Instance %s is running twice"
848                     " with different parameters", name)
849         output[name] = value
850
851   return output
852
853
854 def _InstanceLogName(kind, os_name, instance):
855   """Compute the OS log filename for a given instance and operation.
856
857   The instance name and os name are passed in as strings since not all
858   operations have these as part of an instance object.
859
860   @type kind: string
861   @param kind: the operation type (e.g. add, import, etc.)
862   @type os_name: string
863   @param os_name: the os name
864   @type instance: string
865   @param instance: the name of the instance being imported/added/etc.
866
867   """
868   # TODO: Use tempfile.mkstemp to create unique filename
869   base = ("%s-%s-%s-%s.log" %
870           (kind, os_name, instance, utils.TimestampForFilename()))
871   return utils.PathJoin(constants.LOG_OS_DIR, base)
872
873
874 def InstanceOsAdd(instance, reinstall, debug):
875   """Add an OS to an instance.
876
877   @type instance: L{objects.Instance}
878   @param instance: Instance whose OS is to be installed
879   @type reinstall: boolean
880   @param reinstall: whether this is an instance reinstall
881   @type debug: integer
882   @param debug: debug level, passed to the OS scripts
883   @rtype: None
884
885   """
886   inst_os = OSFromDisk(instance.os)
887
888   create_env = OSEnvironment(instance, inst_os, debug)
889   if reinstall:
890     create_env['INSTANCE_REINSTALL'] = "1"
891
892   logfile = _InstanceLogName("add", instance.os, instance.name)
893
894   result = utils.RunCmd([inst_os.create_script], env=create_env,
895                         cwd=inst_os.path, output=logfile,)
896   if result.failed:
897     logging.error("os create command '%s' returned error: %s, logfile: %s,"
898                   " output: %s", result.cmd, result.fail_reason, logfile,
899                   result.output)
900     lines = [utils.SafeEncode(val)
901              for val in utils.TailFile(logfile, lines=20)]
902     _Fail("OS create script failed (%s), last lines in the"
903           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
904
905
906 def RunRenameInstance(instance, old_name, debug):
907   """Run the OS rename script for an instance.
908
909   @type instance: L{objects.Instance}
910   @param instance: Instance whose OS is to be installed
911   @type old_name: string
912   @param old_name: previous instance name
913   @type debug: integer
914   @param debug: debug level, passed to the OS scripts
915   @rtype: boolean
916   @return: the success of the operation
917
918   """
919   inst_os = OSFromDisk(instance.os)
920
921   rename_env = OSEnvironment(instance, inst_os, debug)
922   rename_env['OLD_INSTANCE_NAME'] = old_name
923
924   logfile = _InstanceLogName("rename", instance.os,
925                              "%s-%s" % (old_name, instance.name))
926
927   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
928                         cwd=inst_os.path, output=logfile)
929
930   if result.failed:
931     logging.error("os create command '%s' returned error: %s output: %s",
932                   result.cmd, result.fail_reason, result.output)
933     lines = [utils.SafeEncode(val)
934              for val in utils.TailFile(logfile, lines=20)]
935     _Fail("OS rename script failed (%s), last lines in the"
936           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
937
938
939 def _GetBlockDevSymlinkPath(instance_name, idx):
940   return utils.PathJoin(constants.DISK_LINKS_DIR,
941                         "%s:%d" % (instance_name, idx))
942
943
944 def _SymlinkBlockDev(instance_name, device_path, idx):
945   """Set up symlinks to a instance's block device.
946
947   This is an auxiliary function run when an instance is start (on the primary
948   node) or when an instance is migrated (on the target node).
949
950
951   @param instance_name: the name of the target instance
952   @param device_path: path of the physical block device, on the node
953   @param idx: the disk index
954   @return: absolute path to the disk's symlink
955
956   """
957   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
958   try:
959     os.symlink(device_path, link_name)
960   except OSError, err:
961     if err.errno == errno.EEXIST:
962       if (not os.path.islink(link_name) or
963           os.readlink(link_name) != device_path):
964         os.remove(link_name)
965         os.symlink(device_path, link_name)
966     else:
967       raise
968
969   return link_name
970
971
972 def _RemoveBlockDevLinks(instance_name, disks):
973   """Remove the block device symlinks belonging to the given instance.
974
975   """
976   for idx, _ in enumerate(disks):
977     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
978     if os.path.islink(link_name):
979       try:
980         os.remove(link_name)
981       except OSError:
982         logging.exception("Can't remove symlink '%s'", link_name)
983
984
985 def _GatherAndLinkBlockDevs(instance):
986   """Set up an instance's block device(s).
987
988   This is run on the primary node at instance startup. The block
989   devices must be already assembled.
990
991   @type instance: L{objects.Instance}
992   @param instance: the instance whose disks we shoul assemble
993   @rtype: list
994   @return: list of (disk_object, device_path)
995
996   """
997   block_devices = []
998   for idx, disk in enumerate(instance.disks):
999     device = _RecursiveFindBD(disk)
1000     if device is None:
1001       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1002                                     str(disk))
1003     device.Open()
1004     try:
1005       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1006     except OSError, e:
1007       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1008                                     e.strerror)
1009
1010     block_devices.append((disk, link_name))
1011
1012   return block_devices
1013
1014
1015 def StartInstance(instance):
1016   """Start an instance.
1017
1018   @type instance: L{objects.Instance}
1019   @param instance: the instance object
1020   @rtype: None
1021
1022   """
1023   running_instances = GetInstanceList([instance.hypervisor])
1024
1025   if instance.name in running_instances:
1026     logging.info("Instance %s already running, not starting", instance.name)
1027     return
1028
1029   try:
1030     block_devices = _GatherAndLinkBlockDevs(instance)
1031     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1032     hyper.StartInstance(instance, block_devices)
1033   except errors.BlockDeviceError, err:
1034     _Fail("Block device error: %s", err, exc=True)
1035   except errors.HypervisorError, err:
1036     _RemoveBlockDevLinks(instance.name, instance.disks)
1037     _Fail("Hypervisor error: %s", err, exc=True)
1038
1039
1040 def InstanceShutdown(instance, timeout):
1041   """Shut an instance down.
1042
1043   @note: this functions uses polling with a hardcoded timeout.
1044
1045   @type instance: L{objects.Instance}
1046   @param instance: the instance object
1047   @type timeout: integer
1048   @param timeout: maximum timeout for soft shutdown
1049   @rtype: None
1050
1051   """
1052   hv_name = instance.hypervisor
1053   hyper = hypervisor.GetHypervisor(hv_name)
1054   iname = instance.name
1055
1056   if instance.name not in hyper.ListInstances():
1057     logging.info("Instance %s not running, doing nothing", iname)
1058     return
1059
1060   class _TryShutdown:
1061     def __init__(self):
1062       self.tried_once = False
1063
1064     def __call__(self):
1065       if iname not in hyper.ListInstances():
1066         return
1067
1068       try:
1069         hyper.StopInstance(instance, retry=self.tried_once)
1070       except errors.HypervisorError, err:
1071         if iname not in hyper.ListInstances():
1072           # if the instance is no longer existing, consider this a
1073           # success and go to cleanup
1074           return
1075
1076         _Fail("Failed to stop instance %s: %s", iname, err)
1077
1078       self.tried_once = True
1079
1080       raise utils.RetryAgain()
1081
1082   try:
1083     utils.Retry(_TryShutdown(), 5, timeout)
1084   except utils.RetryTimeout:
1085     # the shutdown did not succeed
1086     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1087
1088     try:
1089       hyper.StopInstance(instance, force=True)
1090     except errors.HypervisorError, err:
1091       if iname in hyper.ListInstances():
1092         # only raise an error if the instance still exists, otherwise
1093         # the error could simply be "instance ... unknown"!
1094         _Fail("Failed to force stop instance %s: %s", iname, err)
1095
1096     time.sleep(1)
1097
1098     if iname in hyper.ListInstances():
1099       _Fail("Could not shutdown instance %s even by destroy", iname)
1100
1101   try:
1102     hyper.CleanupInstance(instance.name)
1103   except errors.HypervisorError, err:
1104     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1105
1106   _RemoveBlockDevLinks(iname, instance.disks)
1107
1108
1109 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1110   """Reboot an instance.
1111
1112   @type instance: L{objects.Instance}
1113   @param instance: the instance object to reboot
1114   @type reboot_type: str
1115   @param reboot_type: the type of reboot, one the following
1116     constants:
1117       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1118         instance OS, do not recreate the VM
1119       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1120         restart the VM (at the hypervisor level)
1121       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1122         not accepted here, since that mode is handled differently, in
1123         cmdlib, and translates into full stop and start of the
1124         instance (instead of a call_instance_reboot RPC)
1125   @type shutdown_timeout: integer
1126   @param shutdown_timeout: maximum timeout for soft shutdown
1127   @rtype: None
1128
1129   """
1130   running_instances = GetInstanceList([instance.hypervisor])
1131
1132   if instance.name not in running_instances:
1133     _Fail("Cannot reboot instance %s that is not running", instance.name)
1134
1135   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1136   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1137     try:
1138       hyper.RebootInstance(instance)
1139     except errors.HypervisorError, err:
1140       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1141   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1142     try:
1143       InstanceShutdown(instance, shutdown_timeout)
1144       return StartInstance(instance)
1145     except errors.HypervisorError, err:
1146       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1147   else:
1148     _Fail("Invalid reboot_type received: %s", reboot_type)
1149
1150
1151 def MigrationInfo(instance):
1152   """Gather information about an instance to be migrated.
1153
1154   @type instance: L{objects.Instance}
1155   @param instance: the instance definition
1156
1157   """
1158   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1159   try:
1160     info = hyper.MigrationInfo(instance)
1161   except errors.HypervisorError, err:
1162     _Fail("Failed to fetch migration information: %s", err, exc=True)
1163   return info
1164
1165
1166 def AcceptInstance(instance, info, target):
1167   """Prepare the node to accept an instance.
1168
1169   @type instance: L{objects.Instance}
1170   @param instance: the instance definition
1171   @type info: string/data (opaque)
1172   @param info: migration information, from the source node
1173   @type target: string
1174   @param target: target host (usually ip), on this node
1175
1176   """
1177   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1178   try:
1179     hyper.AcceptInstance(instance, info, target)
1180   except errors.HypervisorError, err:
1181     _Fail("Failed to accept instance: %s", err, exc=True)
1182
1183
1184 def FinalizeMigration(instance, info, success):
1185   """Finalize any preparation to accept an instance.
1186
1187   @type instance: L{objects.Instance}
1188   @param instance: the instance definition
1189   @type info: string/data (opaque)
1190   @param info: migration information, from the source node
1191   @type success: boolean
1192   @param success: whether the migration was a success or a failure
1193
1194   """
1195   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1196   try:
1197     hyper.FinalizeMigration(instance, info, success)
1198   except errors.HypervisorError, err:
1199     _Fail("Failed to finalize migration: %s", err, exc=True)
1200
1201
1202 def MigrateInstance(instance, target, live):
1203   """Migrates an instance to another node.
1204
1205   @type instance: L{objects.Instance}
1206   @param instance: the instance definition
1207   @type target: string
1208   @param target: the target node name
1209   @type live: boolean
1210   @param live: whether the migration should be done live or not (the
1211       interpretation of this parameter is left to the hypervisor)
1212   @rtype: tuple
1213   @return: a tuple of (success, msg) where:
1214       - succes is a boolean denoting the success/failure of the operation
1215       - msg is a string with details in case of failure
1216
1217   """
1218   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1219
1220   try:
1221     hyper.MigrateInstance(instance, target, live)
1222   except errors.HypervisorError, err:
1223     _Fail("Failed to migrate instance: %s", err, exc=True)
1224
1225
1226 def BlockdevCreate(disk, size, owner, on_primary, info):
1227   """Creates a block device for an instance.
1228
1229   @type disk: L{objects.Disk}
1230   @param disk: the object describing the disk we should create
1231   @type size: int
1232   @param size: the size of the physical underlying device, in MiB
1233   @type owner: str
1234   @param owner: the name of the instance for which disk is created,
1235       used for device cache data
1236   @type on_primary: boolean
1237   @param on_primary:  indicates if it is the primary node or not
1238   @type info: string
1239   @param info: string that will be sent to the physical device
1240       creation, used for example to set (LVM) tags on LVs
1241
1242   @return: the new unique_id of the device (this can sometime be
1243       computed only after creation), or None. On secondary nodes,
1244       it's not required to return anything.
1245
1246   """
1247   # TODO: remove the obsolete 'size' argument
1248   # pylint: disable-msg=W0613
1249   clist = []
1250   if disk.children:
1251     for child in disk.children:
1252       try:
1253         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1254       except errors.BlockDeviceError, err:
1255         _Fail("Can't assemble device %s: %s", child, err)
1256       if on_primary or disk.AssembleOnSecondary():
1257         # we need the children open in case the device itself has to
1258         # be assembled
1259         try:
1260           # pylint: disable-msg=E1103
1261           crdev.Open()
1262         except errors.BlockDeviceError, err:
1263           _Fail("Can't make child '%s' read-write: %s", child, err)
1264       clist.append(crdev)
1265
1266   try:
1267     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1268   except errors.BlockDeviceError, err:
1269     _Fail("Can't create block device: %s", err)
1270
1271   if on_primary or disk.AssembleOnSecondary():
1272     try:
1273       device.Assemble()
1274     except errors.BlockDeviceError, err:
1275       _Fail("Can't assemble device after creation, unusual event: %s", err)
1276     device.SetSyncSpeed(constants.SYNC_SPEED)
1277     if on_primary or disk.OpenOnSecondary():
1278       try:
1279         device.Open(force=True)
1280       except errors.BlockDeviceError, err:
1281         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1282     DevCacheManager.UpdateCache(device.dev_path, owner,
1283                                 on_primary, disk.iv_name)
1284
1285   device.SetInfo(info)
1286
1287   return device.unique_id
1288
1289
1290 def _WipeDevice(path):
1291   """This function actually wipes the device.
1292
1293   @param path: The path to the device to wipe
1294
1295   """
1296   result = utils.RunCmd("%s%s" % (constants.WIPE_CMD, utils.ShellQuote(path)))
1297
1298   if result.failed:
1299     _Fail("Wipe command '%s' exited with error: %s; output: %s", result.cmd,
1300           result.fail_reason, result.output)
1301
1302
1303 def BlockdevWipe(disk):
1304   """Wipes a block device.
1305
1306   @type disk: L{objects.Disk}
1307   @param disk: the disk object we want to wipe
1308
1309   """
1310   try:
1311     rdev = _RecursiveFindBD(disk)
1312   except errors.BlockDeviceError, err:
1313     _Fail("Cannot execute wipe for device %s: device not found", err)
1314
1315   _WipeDevice(rdev.dev_path)
1316
1317
1318 def BlockdevRemove(disk):
1319   """Remove a block device.
1320
1321   @note: This is intended to be called recursively.
1322
1323   @type disk: L{objects.Disk}
1324   @param disk: the disk object we should remove
1325   @rtype: boolean
1326   @return: the success of the operation
1327
1328   """
1329   msgs = []
1330   try:
1331     rdev = _RecursiveFindBD(disk)
1332   except errors.BlockDeviceError, err:
1333     # probably can't attach
1334     logging.info("Can't attach to device %s in remove", disk)
1335     rdev = None
1336   if rdev is not None:
1337     r_path = rdev.dev_path
1338     try:
1339       rdev.Remove()
1340     except errors.BlockDeviceError, err:
1341       msgs.append(str(err))
1342     if not msgs:
1343       DevCacheManager.RemoveCache(r_path)
1344
1345   if disk.children:
1346     for child in disk.children:
1347       try:
1348         BlockdevRemove(child)
1349       except RPCFail, err:
1350         msgs.append(str(err))
1351
1352   if msgs:
1353     _Fail("; ".join(msgs))
1354
1355
1356 def _RecursiveAssembleBD(disk, owner, as_primary):
1357   """Activate a block device for an instance.
1358
1359   This is run on the primary and secondary nodes for an instance.
1360
1361   @note: this function is called recursively.
1362
1363   @type disk: L{objects.Disk}
1364   @param disk: the disk we try to assemble
1365   @type owner: str
1366   @param owner: the name of the instance which owns the disk
1367   @type as_primary: boolean
1368   @param as_primary: if we should make the block device
1369       read/write
1370
1371   @return: the assembled device or None (in case no device
1372       was assembled)
1373   @raise errors.BlockDeviceError: in case there is an error
1374       during the activation of the children or the device
1375       itself
1376
1377   """
1378   children = []
1379   if disk.children:
1380     mcn = disk.ChildrenNeeded()
1381     if mcn == -1:
1382       mcn = 0 # max number of Nones allowed
1383     else:
1384       mcn = len(disk.children) - mcn # max number of Nones
1385     for chld_disk in disk.children:
1386       try:
1387         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1388       except errors.BlockDeviceError, err:
1389         if children.count(None) >= mcn:
1390           raise
1391         cdev = None
1392         logging.error("Error in child activation (but continuing): %s",
1393                       str(err))
1394       children.append(cdev)
1395
1396   if as_primary or disk.AssembleOnSecondary():
1397     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1398     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1399     result = r_dev
1400     if as_primary or disk.OpenOnSecondary():
1401       r_dev.Open()
1402     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1403                                 as_primary, disk.iv_name)
1404
1405   else:
1406     result = True
1407   return result
1408
1409
1410 def BlockdevAssemble(disk, owner, as_primary):
1411   """Activate a block device for an instance.
1412
1413   This is a wrapper over _RecursiveAssembleBD.
1414
1415   @rtype: str or boolean
1416   @return: a C{/dev/...} path for primary nodes, and
1417       C{True} for secondary nodes
1418
1419   """
1420   try:
1421     result = _RecursiveAssembleBD(disk, owner, as_primary)
1422     if isinstance(result, bdev.BlockDev):
1423       # pylint: disable-msg=E1103
1424       result = result.dev_path
1425   except errors.BlockDeviceError, err:
1426     _Fail("Error while assembling disk: %s", err, exc=True)
1427
1428   return result
1429
1430
1431 def BlockdevShutdown(disk):
1432   """Shut down a block device.
1433
1434   First, if the device is assembled (Attach() is successful), then
1435   the device is shutdown. Then the children of the device are
1436   shutdown.
1437
1438   This function is called recursively. Note that we don't cache the
1439   children or such, as oppossed to assemble, shutdown of different
1440   devices doesn't require that the upper device was active.
1441
1442   @type disk: L{objects.Disk}
1443   @param disk: the description of the disk we should
1444       shutdown
1445   @rtype: None
1446
1447   """
1448   msgs = []
1449   r_dev = _RecursiveFindBD(disk)
1450   if r_dev is not None:
1451     r_path = r_dev.dev_path
1452     try:
1453       r_dev.Shutdown()
1454       DevCacheManager.RemoveCache(r_path)
1455     except errors.BlockDeviceError, err:
1456       msgs.append(str(err))
1457
1458   if disk.children:
1459     for child in disk.children:
1460       try:
1461         BlockdevShutdown(child)
1462       except RPCFail, err:
1463         msgs.append(str(err))
1464
1465   if msgs:
1466     _Fail("; ".join(msgs))
1467
1468
1469 def BlockdevAddchildren(parent_cdev, new_cdevs):
1470   """Extend a mirrored block device.
1471
1472   @type parent_cdev: L{objects.Disk}
1473   @param parent_cdev: the disk to which we should add children
1474   @type new_cdevs: list of L{objects.Disk}
1475   @param new_cdevs: the list of children which we should add
1476   @rtype: None
1477
1478   """
1479   parent_bdev = _RecursiveFindBD(parent_cdev)
1480   if parent_bdev is None:
1481     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1482   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1483   if new_bdevs.count(None) > 0:
1484     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1485   parent_bdev.AddChildren(new_bdevs)
1486
1487
1488 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1489   """Shrink a mirrored block device.
1490
1491   @type parent_cdev: L{objects.Disk}
1492   @param parent_cdev: the disk from which we should remove children
1493   @type new_cdevs: list of L{objects.Disk}
1494   @param new_cdevs: the list of children which we should remove
1495   @rtype: None
1496
1497   """
1498   parent_bdev = _RecursiveFindBD(parent_cdev)
1499   if parent_bdev is None:
1500     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1501   devs = []
1502   for disk in new_cdevs:
1503     rpath = disk.StaticDevPath()
1504     if rpath is None:
1505       bd = _RecursiveFindBD(disk)
1506       if bd is None:
1507         _Fail("Can't find device %s while removing children", disk)
1508       else:
1509         devs.append(bd.dev_path)
1510     else:
1511       if not utils.IsNormAbsPath(rpath):
1512         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1513       devs.append(rpath)
1514   parent_bdev.RemoveChildren(devs)
1515
1516
1517 def BlockdevGetmirrorstatus(disks):
1518   """Get the mirroring status of a list of devices.
1519
1520   @type disks: list of L{objects.Disk}
1521   @param disks: the list of disks which we should query
1522   @rtype: disk
1523   @return:
1524       a list of (mirror_done, estimated_time) tuples, which
1525       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1526   @raise errors.BlockDeviceError: if any of the disks cannot be
1527       found
1528
1529   """
1530   stats = []
1531   for dsk in disks:
1532     rbd = _RecursiveFindBD(dsk)
1533     if rbd is None:
1534       _Fail("Can't find device %s", dsk)
1535
1536     stats.append(rbd.CombinedSyncStatus())
1537
1538   return stats
1539
1540
1541 def _RecursiveFindBD(disk):
1542   """Check if a device is activated.
1543
1544   If so, return information about the real device.
1545
1546   @type disk: L{objects.Disk}
1547   @param disk: the disk object we need to find
1548
1549   @return: None if the device can't be found,
1550       otherwise the device instance
1551
1552   """
1553   children = []
1554   if disk.children:
1555     for chdisk in disk.children:
1556       children.append(_RecursiveFindBD(chdisk))
1557
1558   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1559
1560
1561 def _OpenRealBD(disk):
1562   """Opens the underlying block device of a disk.
1563
1564   @type disk: L{objects.Disk}
1565   @param disk: the disk object we want to open
1566
1567   """
1568   real_disk = _RecursiveFindBD(disk)
1569   if real_disk is None:
1570     _Fail("Block device '%s' is not set up", disk)
1571
1572   real_disk.Open()
1573
1574   return real_disk
1575
1576
1577 def BlockdevFind(disk):
1578   """Check if a device is activated.
1579
1580   If it is, return information about the real device.
1581
1582   @type disk: L{objects.Disk}
1583   @param disk: the disk to find
1584   @rtype: None or objects.BlockDevStatus
1585   @return: None if the disk cannot be found, otherwise a the current
1586            information
1587
1588   """
1589   try:
1590     rbd = _RecursiveFindBD(disk)
1591   except errors.BlockDeviceError, err:
1592     _Fail("Failed to find device: %s", err, exc=True)
1593
1594   if rbd is None:
1595     return None
1596
1597   return rbd.GetSyncStatus()
1598
1599
1600 def BlockdevGetsize(disks):
1601   """Computes the size of the given disks.
1602
1603   If a disk is not found, returns None instead.
1604
1605   @type disks: list of L{objects.Disk}
1606   @param disks: the list of disk to compute the size for
1607   @rtype: list
1608   @return: list with elements None if the disk cannot be found,
1609       otherwise the size
1610
1611   """
1612   result = []
1613   for cf in disks:
1614     try:
1615       rbd = _RecursiveFindBD(cf)
1616     except errors.BlockDeviceError:
1617       result.append(None)
1618       continue
1619     if rbd is None:
1620       result.append(None)
1621     else:
1622       result.append(rbd.GetActualSize())
1623   return result
1624
1625
1626 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1627   """Export a block device to a remote node.
1628
1629   @type disk: L{objects.Disk}
1630   @param disk: the description of the disk to export
1631   @type dest_node: str
1632   @param dest_node: the destination node to export to
1633   @type dest_path: str
1634   @param dest_path: the destination path on the target node
1635   @type cluster_name: str
1636   @param cluster_name: the cluster name, needed for SSH hostalias
1637   @rtype: None
1638
1639   """
1640   real_disk = _OpenRealBD(disk)
1641
1642   # the block size on the read dd is 1MiB to match our units
1643   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1644                                "dd if=%s bs=1048576 count=%s",
1645                                real_disk.dev_path, str(disk.size))
1646
1647   # we set here a smaller block size as, due to ssh buffering, more
1648   # than 64-128k will mostly ignored; we use nocreat to fail if the
1649   # device is not already there or we pass a wrong path; we use
1650   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1651   # to not buffer too much memory; this means that at best, we flush
1652   # every 64k, which will not be very fast
1653   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1654                                 " oflag=dsync", dest_path)
1655
1656   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1657                                                    constants.GANETI_RUNAS,
1658                                                    destcmd)
1659
1660   # all commands have been checked, so we're safe to combine them
1661   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1662
1663   result = utils.RunCmd(["bash", "-c", command])
1664
1665   if result.failed:
1666     _Fail("Disk copy command '%s' returned error: %s"
1667           " output: %s", command, result.fail_reason, result.output)
1668
1669
1670 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1671   """Write a file to the filesystem.
1672
1673   This allows the master to overwrite(!) a file. It will only perform
1674   the operation if the file belongs to a list of configuration files.
1675
1676   @type file_name: str
1677   @param file_name: the target file name
1678   @type data: str
1679   @param data: the new contents of the file
1680   @type mode: int
1681   @param mode: the mode to give the file (can be None)
1682   @type uid: int
1683   @param uid: the owner of the file (can be -1 for default)
1684   @type gid: int
1685   @param gid: the group of the file (can be -1 for default)
1686   @type atime: float
1687   @param atime: the atime to set on the file (can be None)
1688   @type mtime: float
1689   @param mtime: the mtime to set on the file (can be None)
1690   @rtype: None
1691
1692   """
1693   if not os.path.isabs(file_name):
1694     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1695
1696   if file_name not in _ALLOWED_UPLOAD_FILES:
1697     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1698           file_name)
1699
1700   raw_data = _Decompress(data)
1701
1702   utils.SafeWriteFile(file_name, None,
1703                       data=raw_data, mode=mode, uid=uid, gid=gid,
1704                       atime=atime, mtime=mtime)
1705
1706
1707 def WriteSsconfFiles(values):
1708   """Update all ssconf files.
1709
1710   Wrapper around the SimpleStore.WriteFiles.
1711
1712   """
1713   ssconf.SimpleStore().WriteFiles(values)
1714
1715
1716 def _ErrnoOrStr(err):
1717   """Format an EnvironmentError exception.
1718
1719   If the L{err} argument has an errno attribute, it will be looked up
1720   and converted into a textual C{E...} description. Otherwise the
1721   string representation of the error will be returned.
1722
1723   @type err: L{EnvironmentError}
1724   @param err: the exception to format
1725
1726   """
1727   if hasattr(err, 'errno'):
1728     detail = errno.errorcode[err.errno]
1729   else:
1730     detail = str(err)
1731   return detail
1732
1733
1734 def _OSOndiskAPIVersion(os_dir):
1735   """Compute and return the API version of a given OS.
1736
1737   This function will try to read the API version of the OS residing in
1738   the 'os_dir' directory.
1739
1740   @type os_dir: str
1741   @param os_dir: the directory in which we should look for the OS
1742   @rtype: tuple
1743   @return: tuple (status, data) with status denoting the validity and
1744       data holding either the vaid versions or an error message
1745
1746   """
1747   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1748
1749   try:
1750     st = os.stat(api_file)
1751   except EnvironmentError, err:
1752     return False, ("Required file '%s' not found under path %s: %s" %
1753                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1754
1755   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1756     return False, ("File '%s' in %s is not a regular file" %
1757                    (constants.OS_API_FILE, os_dir))
1758
1759   try:
1760     api_versions = utils.ReadFile(api_file).splitlines()
1761   except EnvironmentError, err:
1762     return False, ("Error while reading the API version file at %s: %s" %
1763                    (api_file, _ErrnoOrStr(err)))
1764
1765   try:
1766     api_versions = [int(version.strip()) for version in api_versions]
1767   except (TypeError, ValueError), err:
1768     return False, ("API version(s) can't be converted to integer: %s" %
1769                    str(err))
1770
1771   return True, api_versions
1772
1773
1774 def DiagnoseOS(top_dirs=None):
1775   """Compute the validity for all OSes.
1776
1777   @type top_dirs: list
1778   @param top_dirs: the list of directories in which to
1779       search (if not given defaults to
1780       L{constants.OS_SEARCH_PATH})
1781   @rtype: list of L{objects.OS}
1782   @return: a list of tuples (name, path, status, diagnose, variants,
1783       parameters, api_version) for all (potential) OSes under all
1784       search paths, where:
1785           - name is the (potential) OS name
1786           - path is the full path to the OS
1787           - status True/False is the validity of the OS
1788           - diagnose is the error message for an invalid OS, otherwise empty
1789           - variants is a list of supported OS variants, if any
1790           - parameters is a list of (name, help) parameters, if any
1791           - api_version is a list of support OS API versions
1792
1793   """
1794   if top_dirs is None:
1795     top_dirs = constants.OS_SEARCH_PATH
1796
1797   result = []
1798   for dir_name in top_dirs:
1799     if os.path.isdir(dir_name):
1800       try:
1801         f_names = utils.ListVisibleFiles(dir_name)
1802       except EnvironmentError, err:
1803         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1804         break
1805       for name in f_names:
1806         os_path = utils.PathJoin(dir_name, name)
1807         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1808         if status:
1809           diagnose = ""
1810           variants = os_inst.supported_variants
1811           parameters = os_inst.supported_parameters
1812           api_versions = os_inst.api_versions
1813         else:
1814           diagnose = os_inst
1815           variants = parameters = api_versions = []
1816         result.append((name, os_path, status, diagnose, variants,
1817                        parameters, api_versions))
1818
1819   return result
1820
1821
1822 def _TryOSFromDisk(name, base_dir=None):
1823   """Create an OS instance from disk.
1824
1825   This function will return an OS instance if the given name is a
1826   valid OS name.
1827
1828   @type base_dir: string
1829   @keyword base_dir: Base directory containing OS installations.
1830                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1831   @rtype: tuple
1832   @return: success and either the OS instance if we find a valid one,
1833       or error message
1834
1835   """
1836   if base_dir is None:
1837     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1838   else:
1839     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1840
1841   if os_dir is None:
1842     return False, "Directory for OS %s not found in search path" % name
1843
1844   status, api_versions = _OSOndiskAPIVersion(os_dir)
1845   if not status:
1846     # push the error up
1847     return status, api_versions
1848
1849   if not constants.OS_API_VERSIONS.intersection(api_versions):
1850     return False, ("API version mismatch for path '%s': found %s, want %s." %
1851                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1852
1853   # OS Files dictionary, we will populate it with the absolute path names
1854   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1855
1856   if max(api_versions) >= constants.OS_API_V15:
1857     os_files[constants.OS_VARIANTS_FILE] = ''
1858
1859   if max(api_versions) >= constants.OS_API_V20:
1860     os_files[constants.OS_PARAMETERS_FILE] = ''
1861   else:
1862     del os_files[constants.OS_SCRIPT_VERIFY]
1863
1864   for filename in os_files:
1865     os_files[filename] = utils.PathJoin(os_dir, filename)
1866
1867     try:
1868       st = os.stat(os_files[filename])
1869     except EnvironmentError, err:
1870       return False, ("File '%s' under path '%s' is missing (%s)" %
1871                      (filename, os_dir, _ErrnoOrStr(err)))
1872
1873     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1874       return False, ("File '%s' under path '%s' is not a regular file" %
1875                      (filename, os_dir))
1876
1877     if filename in constants.OS_SCRIPTS:
1878       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1879         return False, ("File '%s' under path '%s' is not executable" %
1880                        (filename, os_dir))
1881
1882   variants = []
1883   if constants.OS_VARIANTS_FILE in os_files:
1884     variants_file = os_files[constants.OS_VARIANTS_FILE]
1885     try:
1886       variants = utils.ReadFile(variants_file).splitlines()
1887     except EnvironmentError, err:
1888       return False, ("Error while reading the OS variants file at %s: %s" %
1889                      (variants_file, _ErrnoOrStr(err)))
1890     if not variants:
1891       return False, ("No supported os variant found")
1892
1893   parameters = []
1894   if constants.OS_PARAMETERS_FILE in os_files:
1895     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1896     try:
1897       parameters = utils.ReadFile(parameters_file).splitlines()
1898     except EnvironmentError, err:
1899       return False, ("Error while reading the OS parameters file at %s: %s" %
1900                      (parameters_file, _ErrnoOrStr(err)))
1901     parameters = [v.split(None, 1) for v in parameters]
1902
1903   os_obj = objects.OS(name=name, path=os_dir,
1904                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1905                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1906                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1907                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1908                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1909                                                  None),
1910                       supported_variants=variants,
1911                       supported_parameters=parameters,
1912                       api_versions=api_versions)
1913   return True, os_obj
1914
1915
1916 def OSFromDisk(name, base_dir=None):
1917   """Create an OS instance from disk.
1918
1919   This function will return an OS instance if the given name is a
1920   valid OS name. Otherwise, it will raise an appropriate
1921   L{RPCFail} exception, detailing why this is not a valid OS.
1922
1923   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1924   an exception but returns true/false status data.
1925
1926   @type base_dir: string
1927   @keyword base_dir: Base directory containing OS installations.
1928                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1929   @rtype: L{objects.OS}
1930   @return: the OS instance if we find a valid one
1931   @raise RPCFail: if we don't find a valid OS
1932
1933   """
1934   name_only = objects.OS.GetName(name)
1935   status, payload = _TryOSFromDisk(name_only, base_dir)
1936
1937   if not status:
1938     _Fail(payload)
1939
1940   return payload
1941
1942
1943 def OSCoreEnv(os_name, inst_os, os_params, debug=0):
1944   """Calculate the basic environment for an os script.
1945
1946   @type os_name: str
1947   @param os_name: full operating system name (including variant)
1948   @type inst_os: L{objects.OS}
1949   @param inst_os: operating system for which the environment is being built
1950   @type os_params: dict
1951   @param os_params: the OS parameters
1952   @type debug: integer
1953   @param debug: debug level (0 or 1, for OS Api 10)
1954   @rtype: dict
1955   @return: dict of environment variables
1956   @raise errors.BlockDeviceError: if the block device
1957       cannot be found
1958
1959   """
1960   result = {}
1961   api_version = \
1962     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1963   result['OS_API_VERSION'] = '%d' % api_version
1964   result['OS_NAME'] = inst_os.name
1965   result['DEBUG_LEVEL'] = '%d' % debug
1966
1967   # OS variants
1968   if api_version >= constants.OS_API_V15:
1969     variant = objects.OS.GetVariant(os_name)
1970     if not variant:
1971       variant = inst_os.supported_variants[0]
1972     result['OS_VARIANT'] = variant
1973
1974   # OS params
1975   for pname, pvalue in os_params.items():
1976     result['OSP_%s' % pname.upper()] = pvalue
1977
1978   return result
1979
1980
1981 def OSEnvironment(instance, inst_os, debug=0):
1982   """Calculate the environment for an os script.
1983
1984   @type instance: L{objects.Instance}
1985   @param instance: target instance for the os script run
1986   @type inst_os: L{objects.OS}
1987   @param inst_os: operating system for which the environment is being built
1988   @type debug: integer
1989   @param debug: debug level (0 or 1, for OS Api 10)
1990   @rtype: dict
1991   @return: dict of environment variables
1992   @raise errors.BlockDeviceError: if the block device
1993       cannot be found
1994
1995   """
1996   result = OSCoreEnv(instance.os, inst_os, instance.osparams, debug=debug)
1997
1998   for attr in ["name", "os", "uuid", "ctime", "mtime"]:
1999     result["INSTANCE_%s" % attr.upper()] = str(getattr(instance, attr))
2000
2001   result['HYPERVISOR'] = instance.hypervisor
2002   result['DISK_COUNT'] = '%d' % len(instance.disks)
2003   result['NIC_COUNT'] = '%d' % len(instance.nics)
2004
2005   # Disks
2006   for idx, disk in enumerate(instance.disks):
2007     real_disk = _OpenRealBD(disk)
2008     result['DISK_%d_PATH' % idx] = real_disk.dev_path
2009     result['DISK_%d_ACCESS' % idx] = disk.mode
2010     if constants.HV_DISK_TYPE in instance.hvparams:
2011       result['DISK_%d_FRONTEND_TYPE' % idx] = \
2012         instance.hvparams[constants.HV_DISK_TYPE]
2013     if disk.dev_type in constants.LDS_BLOCK:
2014       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
2015     elif disk.dev_type == constants.LD_FILE:
2016       result['DISK_%d_BACKEND_TYPE' % idx] = \
2017         'file:%s' % disk.physical_id[0]
2018
2019   # NICs
2020   for idx, nic in enumerate(instance.nics):
2021     result['NIC_%d_MAC' % idx] = nic.mac
2022     if nic.ip:
2023       result['NIC_%d_IP' % idx] = nic.ip
2024     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2025     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2026       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2027     if nic.nicparams[constants.NIC_LINK]:
2028       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2029     if constants.HV_NIC_TYPE in instance.hvparams:
2030       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2031         instance.hvparams[constants.HV_NIC_TYPE]
2032
2033   # HV/BE params
2034   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2035     for key, value in source.items():
2036       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2037
2038   return result
2039
2040
2041 def BlockdevGrow(disk, amount):
2042   """Grow a stack of block devices.
2043
2044   This function is called recursively, with the childrens being the
2045   first ones to resize.
2046
2047   @type disk: L{objects.Disk}
2048   @param disk: the disk to be grown
2049   @rtype: (status, result)
2050   @return: a tuple with the status of the operation
2051       (True/False), and the errors message if status
2052       is False
2053
2054   """
2055   r_dev = _RecursiveFindBD(disk)
2056   if r_dev is None:
2057     _Fail("Cannot find block device %s", disk)
2058
2059   try:
2060     r_dev.Grow(amount)
2061   except errors.BlockDeviceError, err:
2062     _Fail("Failed to grow block device: %s", err, exc=True)
2063
2064
2065 def BlockdevSnapshot(disk):
2066   """Create a snapshot copy of a block device.
2067
2068   This function is called recursively, and the snapshot is actually created
2069   just for the leaf lvm backend device.
2070
2071   @type disk: L{objects.Disk}
2072   @param disk: the disk to be snapshotted
2073   @rtype: string
2074   @return: snapshot disk path
2075
2076   """
2077   if disk.dev_type == constants.LD_DRBD8:
2078     if not disk.children:
2079       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2080             disk.unique_id)
2081     return BlockdevSnapshot(disk.children[0])
2082   elif disk.dev_type == constants.LD_LV:
2083     r_dev = _RecursiveFindBD(disk)
2084     if r_dev is not None:
2085       # FIXME: choose a saner value for the snapshot size
2086       # let's stay on the safe side and ask for the full size, for now
2087       return r_dev.Snapshot(disk.size)
2088     else:
2089       _Fail("Cannot find block device %s", disk)
2090   else:
2091     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2092           disk.unique_id, disk.dev_type)
2093
2094
2095 def FinalizeExport(instance, snap_disks):
2096   """Write out the export configuration information.
2097
2098   @type instance: L{objects.Instance}
2099   @param instance: the instance which we export, used for
2100       saving configuration
2101   @type snap_disks: list of L{objects.Disk}
2102   @param snap_disks: list of snapshot block devices, which
2103       will be used to get the actual name of the dump file
2104
2105   @rtype: None
2106
2107   """
2108   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2109   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2110
2111   config = objects.SerializableConfigParser()
2112
2113   config.add_section(constants.INISECT_EXP)
2114   config.set(constants.INISECT_EXP, 'version', '0')
2115   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2116   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2117   config.set(constants.INISECT_EXP, 'os', instance.os)
2118   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2119
2120   config.add_section(constants.INISECT_INS)
2121   config.set(constants.INISECT_INS, 'name', instance.name)
2122   config.set(constants.INISECT_INS, 'memory', '%d' %
2123              instance.beparams[constants.BE_MEMORY])
2124   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2125              instance.beparams[constants.BE_VCPUS])
2126   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2127   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2128
2129   nic_total = 0
2130   for nic_count, nic in enumerate(instance.nics):
2131     nic_total += 1
2132     config.set(constants.INISECT_INS, 'nic%d_mac' %
2133                nic_count, '%s' % nic.mac)
2134     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2135     for param in constants.NICS_PARAMETER_TYPES:
2136       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2137                  '%s' % nic.nicparams.get(param, None))
2138   # TODO: redundant: on load can read nics until it doesn't exist
2139   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2140
2141   disk_total = 0
2142   for disk_count, disk in enumerate(snap_disks):
2143     if disk:
2144       disk_total += 1
2145       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2146                  ('%s' % disk.iv_name))
2147       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2148                  ('%s' % disk.physical_id[1]))
2149       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2150                  ('%d' % disk.size))
2151
2152   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2153
2154   # New-style hypervisor/backend parameters
2155
2156   config.add_section(constants.INISECT_HYP)
2157   for name, value in instance.hvparams.items():
2158     if name not in constants.HVC_GLOBALS:
2159       config.set(constants.INISECT_HYP, name, str(value))
2160
2161   config.add_section(constants.INISECT_BEP)
2162   for name, value in instance.beparams.items():
2163     config.set(constants.INISECT_BEP, name, str(value))
2164
2165   config.add_section(constants.INISECT_OSP)
2166   for name, value in instance.osparams.items():
2167     config.set(constants.INISECT_OSP, name, str(value))
2168
2169   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2170                   data=config.Dumps())
2171   shutil.rmtree(finaldestdir, ignore_errors=True)
2172   shutil.move(destdir, finaldestdir)
2173
2174
2175 def ExportInfo(dest):
2176   """Get export configuration information.
2177
2178   @type dest: str
2179   @param dest: directory containing the export
2180
2181   @rtype: L{objects.SerializableConfigParser}
2182   @return: a serializable config file containing the
2183       export info
2184
2185   """
2186   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2187
2188   config = objects.SerializableConfigParser()
2189   config.read(cff)
2190
2191   if (not config.has_section(constants.INISECT_EXP) or
2192       not config.has_section(constants.INISECT_INS)):
2193     _Fail("Export info file doesn't have the required fields")
2194
2195   return config.Dumps()
2196
2197
2198 def ListExports():
2199   """Return a list of exports currently available on this machine.
2200
2201   @rtype: list
2202   @return: list of the exports
2203
2204   """
2205   if os.path.isdir(constants.EXPORT_DIR):
2206     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2207   else:
2208     _Fail("No exports directory")
2209
2210
2211 def RemoveExport(export):
2212   """Remove an existing export from the node.
2213
2214   @type export: str
2215   @param export: the name of the export to remove
2216   @rtype: None
2217
2218   """
2219   target = utils.PathJoin(constants.EXPORT_DIR, export)
2220
2221   try:
2222     shutil.rmtree(target)
2223   except EnvironmentError, err:
2224     _Fail("Error while removing the export: %s", err, exc=True)
2225
2226
2227 def BlockdevRename(devlist):
2228   """Rename a list of block devices.
2229
2230   @type devlist: list of tuples
2231   @param devlist: list of tuples of the form  (disk,
2232       new_logical_id, new_physical_id); disk is an
2233       L{objects.Disk} object describing the current disk,
2234       and new logical_id/physical_id is the name we
2235       rename it to
2236   @rtype: boolean
2237   @return: True if all renames succeeded, False otherwise
2238
2239   """
2240   msgs = []
2241   result = True
2242   for disk, unique_id in devlist:
2243     dev = _RecursiveFindBD(disk)
2244     if dev is None:
2245       msgs.append("Can't find device %s in rename" % str(disk))
2246       result = False
2247       continue
2248     try:
2249       old_rpath = dev.dev_path
2250       dev.Rename(unique_id)
2251       new_rpath = dev.dev_path
2252       if old_rpath != new_rpath:
2253         DevCacheManager.RemoveCache(old_rpath)
2254         # FIXME: we should add the new cache information here, like:
2255         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2256         # but we don't have the owner here - maybe parse from existing
2257         # cache? for now, we only lose lvm data when we rename, which
2258         # is less critical than DRBD or MD
2259     except errors.BlockDeviceError, err:
2260       msgs.append("Can't rename device '%s' to '%s': %s" %
2261                   (dev, unique_id, err))
2262       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2263       result = False
2264   if not result:
2265     _Fail("; ".join(msgs))
2266
2267
2268 def _TransformFileStorageDir(file_storage_dir):
2269   """Checks whether given file_storage_dir is valid.
2270
2271   Checks wheter the given file_storage_dir is within the cluster-wide
2272   default file_storage_dir stored in SimpleStore. Only paths under that
2273   directory are allowed.
2274
2275   @type file_storage_dir: str
2276   @param file_storage_dir: the path to check
2277
2278   @return: the normalized path if valid, None otherwise
2279
2280   """
2281   if not constants.ENABLE_FILE_STORAGE:
2282     _Fail("File storage disabled at configure time")
2283   cfg = _GetConfig()
2284   file_storage_dir = os.path.normpath(file_storage_dir)
2285   base_file_storage_dir = cfg.GetFileStorageDir()
2286   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2287       base_file_storage_dir):
2288     _Fail("File storage directory '%s' is not under base file"
2289           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2290   return file_storage_dir
2291
2292
2293 def CreateFileStorageDir(file_storage_dir):
2294   """Create file storage directory.
2295
2296   @type file_storage_dir: str
2297   @param file_storage_dir: directory to create
2298
2299   @rtype: tuple
2300   @return: tuple with first element a boolean indicating wheter dir
2301       creation was successful or not
2302
2303   """
2304   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2305   if os.path.exists(file_storage_dir):
2306     if not os.path.isdir(file_storage_dir):
2307       _Fail("Specified storage dir '%s' is not a directory",
2308             file_storage_dir)
2309   else:
2310     try:
2311       os.makedirs(file_storage_dir, 0750)
2312     except OSError, err:
2313       _Fail("Cannot create file storage directory '%s': %s",
2314             file_storage_dir, err, exc=True)
2315
2316
2317 def RemoveFileStorageDir(file_storage_dir):
2318   """Remove file storage directory.
2319
2320   Remove it only if it's empty. If not log an error and return.
2321
2322   @type file_storage_dir: str
2323   @param file_storage_dir: the directory we should cleanup
2324   @rtype: tuple (success,)
2325   @return: tuple of one element, C{success}, denoting
2326       whether the operation was successful
2327
2328   """
2329   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2330   if os.path.exists(file_storage_dir):
2331     if not os.path.isdir(file_storage_dir):
2332       _Fail("Specified Storage directory '%s' is not a directory",
2333             file_storage_dir)
2334     # deletes dir only if empty, otherwise we want to fail the rpc call
2335     try:
2336       os.rmdir(file_storage_dir)
2337     except OSError, err:
2338       _Fail("Cannot remove file storage directory '%s': %s",
2339             file_storage_dir, err)
2340
2341
2342 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2343   """Rename the file storage directory.
2344
2345   @type old_file_storage_dir: str
2346   @param old_file_storage_dir: the current path
2347   @type new_file_storage_dir: str
2348   @param new_file_storage_dir: the name we should rename to
2349   @rtype: tuple (success,)
2350   @return: tuple of one element, C{success}, denoting
2351       whether the operation was successful
2352
2353   """
2354   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2355   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2356   if not os.path.exists(new_file_storage_dir):
2357     if os.path.isdir(old_file_storage_dir):
2358       try:
2359         os.rename(old_file_storage_dir, new_file_storage_dir)
2360       except OSError, err:
2361         _Fail("Cannot rename '%s' to '%s': %s",
2362               old_file_storage_dir, new_file_storage_dir, err)
2363     else:
2364       _Fail("Specified storage dir '%s' is not a directory",
2365             old_file_storage_dir)
2366   else:
2367     if os.path.exists(old_file_storage_dir):
2368       _Fail("Cannot rename '%s' to '%s': both locations exist",
2369             old_file_storage_dir, new_file_storage_dir)
2370
2371
2372 def _EnsureJobQueueFile(file_name):
2373   """Checks whether the given filename is in the queue directory.
2374
2375   @type file_name: str
2376   @param file_name: the file name we should check
2377   @rtype: None
2378   @raises RPCFail: if the file is not valid
2379
2380   """
2381   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2382   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2383
2384   if not result:
2385     _Fail("Passed job queue file '%s' does not belong to"
2386           " the queue directory '%s'", file_name, queue_dir)
2387
2388
2389 def JobQueueUpdate(file_name, content):
2390   """Updates a file in the queue directory.
2391
2392   This is just a wrapper over L{utils.WriteFile}, with proper
2393   checking.
2394
2395   @type file_name: str
2396   @param file_name: the job file name
2397   @type content: str
2398   @param content: the new job contents
2399   @rtype: boolean
2400   @return: the success of the operation
2401
2402   """
2403   _EnsureJobQueueFile(file_name)
2404   getents = runtime.GetEnts()
2405
2406   # Write and replace the file atomically
2407   utils.WriteFile(file_name, data=_Decompress(content), uid=getents.masterd_uid,
2408                   gid=getents.masterd_gid)
2409
2410
2411 def JobQueueRename(old, new):
2412   """Renames a job queue file.
2413
2414   This is just a wrapper over os.rename with proper checking.
2415
2416   @type old: str
2417   @param old: the old (actual) file name
2418   @type new: str
2419   @param new: the desired file name
2420   @rtype: tuple
2421   @return: the success of the operation and payload
2422
2423   """
2424   _EnsureJobQueueFile(old)
2425   _EnsureJobQueueFile(new)
2426
2427   utils.RenameFile(old, new, mkdir=True)
2428
2429
2430 def BlockdevClose(instance_name, disks):
2431   """Closes the given block devices.
2432
2433   This means they will be switched to secondary mode (in case of
2434   DRBD).
2435
2436   @param instance_name: if the argument is not empty, the symlinks
2437       of this instance will be removed
2438   @type disks: list of L{objects.Disk}
2439   @param disks: the list of disks to be closed
2440   @rtype: tuple (success, message)
2441   @return: a tuple of success and message, where success
2442       indicates the succes of the operation, and message
2443       which will contain the error details in case we
2444       failed
2445
2446   """
2447   bdevs = []
2448   for cf in disks:
2449     rd = _RecursiveFindBD(cf)
2450     if rd is None:
2451       _Fail("Can't find device %s", cf)
2452     bdevs.append(rd)
2453
2454   msg = []
2455   for rd in bdevs:
2456     try:
2457       rd.Close()
2458     except errors.BlockDeviceError, err:
2459       msg.append(str(err))
2460   if msg:
2461     _Fail("Can't make devices secondary: %s", ",".join(msg))
2462   else:
2463     if instance_name:
2464       _RemoveBlockDevLinks(instance_name, disks)
2465
2466
2467 def ValidateHVParams(hvname, hvparams):
2468   """Validates the given hypervisor parameters.
2469
2470   @type hvname: string
2471   @param hvname: the hypervisor name
2472   @type hvparams: dict
2473   @param hvparams: the hypervisor parameters to be validated
2474   @rtype: None
2475
2476   """
2477   try:
2478     hv_type = hypervisor.GetHypervisor(hvname)
2479     hv_type.ValidateParameters(hvparams)
2480   except errors.HypervisorError, err:
2481     _Fail(str(err), log=False)
2482
2483
2484 def _CheckOSPList(os_obj, parameters):
2485   """Check whether a list of parameters is supported by the OS.
2486
2487   @type os_obj: L{objects.OS}
2488   @param os_obj: OS object to check
2489   @type parameters: list
2490   @param parameters: the list of parameters to check
2491
2492   """
2493   supported = [v[0] for v in os_obj.supported_parameters]
2494   delta = frozenset(parameters).difference(supported)
2495   if delta:
2496     _Fail("The following parameters are not supported"
2497           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2498
2499
2500 def ValidateOS(required, osname, checks, osparams):
2501   """Validate the given OS' parameters.
2502
2503   @type required: boolean
2504   @param required: whether absence of the OS should translate into
2505       failure or not
2506   @type osname: string
2507   @param osname: the OS to be validated
2508   @type checks: list
2509   @param checks: list of the checks to run (currently only 'parameters')
2510   @type osparams: dict
2511   @param osparams: dictionary with OS parameters
2512   @rtype: boolean
2513   @return: True if the validation passed, or False if the OS was not
2514       found and L{required} was false
2515
2516   """
2517   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2518     _Fail("Unknown checks required for OS %s: %s", osname,
2519           set(checks).difference(constants.OS_VALIDATE_CALLS))
2520
2521   name_only = objects.OS.GetName(osname)
2522   status, tbv = _TryOSFromDisk(name_only, None)
2523
2524   if not status:
2525     if required:
2526       _Fail(tbv)
2527     else:
2528       return False
2529
2530   if max(tbv.api_versions) < constants.OS_API_V20:
2531     return True
2532
2533   if constants.OS_VALIDATE_PARAMETERS in checks:
2534     _CheckOSPList(tbv, osparams.keys())
2535
2536   validate_env = OSCoreEnv(osname, tbv, osparams)
2537   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2538                         cwd=tbv.path)
2539   if result.failed:
2540     logging.error("os validate command '%s' returned error: %s output: %s",
2541                   result.cmd, result.fail_reason, result.output)
2542     _Fail("OS validation script failed (%s), output: %s",
2543           result.fail_reason, result.output, log=False)
2544
2545   return True
2546
2547
2548 def DemoteFromMC():
2549   """Demotes the current node from master candidate role.
2550
2551   """
2552   # try to ensure we're not the master by mistake
2553   master, myself = ssconf.GetMasterAndMyself()
2554   if master == myself:
2555     _Fail("ssconf status shows I'm the master node, will not demote")
2556
2557   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2558   if not result.failed:
2559     _Fail("The master daemon is running, will not demote")
2560
2561   try:
2562     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2563       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2564   except EnvironmentError, err:
2565     if err.errno != errno.ENOENT:
2566       _Fail("Error while backing up cluster file: %s", err, exc=True)
2567
2568   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2569
2570
2571 def _GetX509Filenames(cryptodir, name):
2572   """Returns the full paths for the private key and certificate.
2573
2574   """
2575   return (utils.PathJoin(cryptodir, name),
2576           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2577           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2578
2579
2580 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2581   """Creates a new X509 certificate for SSL/TLS.
2582
2583   @type validity: int
2584   @param validity: Validity in seconds
2585   @rtype: tuple; (string, string)
2586   @return: Certificate name and public part
2587
2588   """
2589   (key_pem, cert_pem) = \
2590     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2591                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2592
2593   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2594                               prefix="x509-%s-" % utils.TimestampForFilename())
2595   try:
2596     name = os.path.basename(cert_dir)
2597     assert len(name) > 5
2598
2599     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2600
2601     utils.WriteFile(key_file, mode=0400, data=key_pem)
2602     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2603
2604     # Never return private key as it shouldn't leave the node
2605     return (name, cert_pem)
2606   except Exception:
2607     shutil.rmtree(cert_dir, ignore_errors=True)
2608     raise
2609
2610
2611 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2612   """Removes a X509 certificate.
2613
2614   @type name: string
2615   @param name: Certificate name
2616
2617   """
2618   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2619
2620   utils.RemoveFile(key_file)
2621   utils.RemoveFile(cert_file)
2622
2623   try:
2624     os.rmdir(cert_dir)
2625   except EnvironmentError, err:
2626     _Fail("Cannot remove certificate directory '%s': %s",
2627           cert_dir, err)
2628
2629
2630 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2631   """Returns the command for the requested input/output.
2632
2633   @type instance: L{objects.Instance}
2634   @param instance: The instance object
2635   @param mode: Import/export mode
2636   @param ieio: Input/output type
2637   @param ieargs: Input/output arguments
2638
2639   """
2640   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2641
2642   env = None
2643   prefix = None
2644   suffix = None
2645   exp_size = None
2646
2647   if ieio == constants.IEIO_FILE:
2648     (filename, ) = ieargs
2649
2650     if not utils.IsNormAbsPath(filename):
2651       _Fail("Path '%s' is not normalized or absolute", filename)
2652
2653     directory = os.path.normpath(os.path.dirname(filename))
2654
2655     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2656         constants.EXPORT_DIR):
2657       _Fail("File '%s' is not under exports directory '%s'",
2658             filename, constants.EXPORT_DIR)
2659
2660     # Create directory
2661     utils.Makedirs(directory, mode=0750)
2662
2663     quoted_filename = utils.ShellQuote(filename)
2664
2665     if mode == constants.IEM_IMPORT:
2666       suffix = "> %s" % quoted_filename
2667     elif mode == constants.IEM_EXPORT:
2668       suffix = "< %s" % quoted_filename
2669
2670       # Retrieve file size
2671       try:
2672         st = os.stat(filename)
2673       except EnvironmentError, err:
2674         logging.error("Can't stat(2) %s: %s", filename, err)
2675       else:
2676         exp_size = utils.BytesToMebibyte(st.st_size)
2677
2678   elif ieio == constants.IEIO_RAW_DISK:
2679     (disk, ) = ieargs
2680
2681     real_disk = _OpenRealBD(disk)
2682
2683     if mode == constants.IEM_IMPORT:
2684       # we set here a smaller block size as, due to transport buffering, more
2685       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2686       # is not already there or we pass a wrong path; we use notrunc to no
2687       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2688       # much memory; this means that at best, we flush every 64k, which will
2689       # not be very fast
2690       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2691                                     " bs=%s oflag=dsync"),
2692                                     real_disk.dev_path,
2693                                     str(64 * 1024))
2694
2695     elif mode == constants.IEM_EXPORT:
2696       # the block size on the read dd is 1MiB to match our units
2697       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2698                                    real_disk.dev_path,
2699                                    str(1024 * 1024), # 1 MB
2700                                    str(disk.size))
2701       exp_size = disk.size
2702
2703   elif ieio == constants.IEIO_SCRIPT:
2704     (disk, disk_index, ) = ieargs
2705
2706     assert isinstance(disk_index, (int, long))
2707
2708     real_disk = _OpenRealBD(disk)
2709
2710     inst_os = OSFromDisk(instance.os)
2711     env = OSEnvironment(instance, inst_os)
2712
2713     if mode == constants.IEM_IMPORT:
2714       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2715       env["IMPORT_INDEX"] = str(disk_index)
2716       script = inst_os.import_script
2717
2718     elif mode == constants.IEM_EXPORT:
2719       env["EXPORT_DEVICE"] = real_disk.dev_path
2720       env["EXPORT_INDEX"] = str(disk_index)
2721       script = inst_os.export_script
2722
2723     # TODO: Pass special environment only to script
2724     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2725
2726     if mode == constants.IEM_IMPORT:
2727       suffix = "| %s" % script_cmd
2728
2729     elif mode == constants.IEM_EXPORT:
2730       prefix = "%s |" % script_cmd
2731
2732     # Let script predict size
2733     exp_size = constants.IE_CUSTOM_SIZE
2734
2735   else:
2736     _Fail("Invalid %s I/O mode %r", mode, ieio)
2737
2738   return (env, prefix, suffix, exp_size)
2739
2740
2741 def _CreateImportExportStatusDir(prefix):
2742   """Creates status directory for import/export.
2743
2744   """
2745   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2746                           prefix=("%s-%s-" %
2747                                   (prefix, utils.TimestampForFilename())))
2748
2749
2750 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2751   """Starts an import or export daemon.
2752
2753   @param mode: Import/output mode
2754   @type opts: L{objects.ImportExportOptions}
2755   @param opts: Daemon options
2756   @type host: string
2757   @param host: Remote host for export (None for import)
2758   @type port: int
2759   @param port: Remote port for export (None for import)
2760   @type instance: L{objects.Instance}
2761   @param instance: Instance object
2762   @param ieio: Input/output type
2763   @param ieioargs: Input/output arguments
2764
2765   """
2766   if mode == constants.IEM_IMPORT:
2767     prefix = "import"
2768
2769     if not (host is None and port is None):
2770       _Fail("Can not specify host or port on import")
2771
2772   elif mode == constants.IEM_EXPORT:
2773     prefix = "export"
2774
2775     if host is None or port is None:
2776       _Fail("Host and port must be specified for an export")
2777
2778   else:
2779     _Fail("Invalid mode %r", mode)
2780
2781   if (opts.key_name is None) ^ (opts.ca_pem is None):
2782     _Fail("Cluster certificate can only be used for both key and CA")
2783
2784   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2785     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2786
2787   if opts.key_name is None:
2788     # Use server.pem
2789     key_path = constants.NODED_CERT_FILE
2790     cert_path = constants.NODED_CERT_FILE
2791     assert opts.ca_pem is None
2792   else:
2793     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2794                                                  opts.key_name)
2795     assert opts.ca_pem is not None
2796
2797   for i in [key_path, cert_path]:
2798     if not os.path.exists(i):
2799       _Fail("File '%s' does not exist" % i)
2800
2801   status_dir = _CreateImportExportStatusDir(prefix)
2802   try:
2803     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2804     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2805     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2806
2807     if opts.ca_pem is None:
2808       # Use server.pem
2809       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2810     else:
2811       ca = opts.ca_pem
2812
2813     # Write CA file
2814     utils.WriteFile(ca_file, data=ca, mode=0400)
2815
2816     cmd = [
2817       constants.IMPORT_EXPORT_DAEMON,
2818       status_file, mode,
2819       "--key=%s" % key_path,
2820       "--cert=%s" % cert_path,
2821       "--ca=%s" % ca_file,
2822       ]
2823
2824     if host:
2825       cmd.append("--host=%s" % host)
2826
2827     if port:
2828       cmd.append("--port=%s" % port)
2829
2830     if opts.compress:
2831       cmd.append("--compress=%s" % opts.compress)
2832
2833     if opts.magic:
2834       cmd.append("--magic=%s" % opts.magic)
2835
2836     if exp_size is not None:
2837       cmd.append("--expected-size=%s" % exp_size)
2838
2839     if cmd_prefix:
2840       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2841
2842     if cmd_suffix:
2843       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2844
2845     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2846
2847     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2848     # support for receiving a file descriptor for output
2849     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2850                       output=logfile)
2851
2852     # The import/export name is simply the status directory name
2853     return os.path.basename(status_dir)
2854
2855   except Exception:
2856     shutil.rmtree(status_dir, ignore_errors=True)
2857     raise
2858
2859
2860 def GetImportExportStatus(names):
2861   """Returns import/export daemon status.
2862
2863   @type names: sequence
2864   @param names: List of names
2865   @rtype: List of dicts
2866   @return: Returns a list of the state of each named import/export or None if a
2867            status couldn't be read
2868
2869   """
2870   result = []
2871
2872   for name in names:
2873     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2874                                  _IES_STATUS_FILE)
2875
2876     try:
2877       data = utils.ReadFile(status_file)
2878     except EnvironmentError, err:
2879       if err.errno != errno.ENOENT:
2880         raise
2881       data = None
2882
2883     if not data:
2884       result.append(None)
2885       continue
2886
2887     result.append(serializer.LoadJson(data))
2888
2889   return result
2890
2891
2892 def AbortImportExport(name):
2893   """Sends SIGTERM to a running import/export daemon.
2894
2895   """
2896   logging.info("Abort import/export %s", name)
2897
2898   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2899   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2900
2901   if pid:
2902     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2903                  name, pid)
2904     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2905
2906
2907 def CleanupImportExport(name):
2908   """Cleanup after an import or export.
2909
2910   If the import/export daemon is still running it's killed. Afterwards the
2911   whole status directory is removed.
2912
2913   """
2914   logging.info("Finalizing import/export %s", name)
2915
2916   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2917
2918   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2919
2920   if pid:
2921     logging.info("Import/export %s is still running with PID %s",
2922                  name, pid)
2923     utils.KillProcess(pid, waitpid=False)
2924
2925   shutil.rmtree(status_dir, ignore_errors=True)
2926
2927
2928 def _FindDisks(nodes_ip, disks):
2929   """Sets the physical ID on disks and returns the block devices.
2930
2931   """
2932   # set the correct physical ID
2933   my_name = netutils.Hostname.GetSysName()
2934   for cf in disks:
2935     cf.SetPhysicalID(my_name, nodes_ip)
2936
2937   bdevs = []
2938
2939   for cf in disks:
2940     rd = _RecursiveFindBD(cf)
2941     if rd is None:
2942       _Fail("Can't find device %s", cf)
2943     bdevs.append(rd)
2944   return bdevs
2945
2946
2947 def DrbdDisconnectNet(nodes_ip, disks):
2948   """Disconnects the network on a list of drbd devices.
2949
2950   """
2951   bdevs = _FindDisks(nodes_ip, disks)
2952
2953   # disconnect disks
2954   for rd in bdevs:
2955     try:
2956       rd.DisconnectNet()
2957     except errors.BlockDeviceError, err:
2958       _Fail("Can't change network configuration to standalone mode: %s",
2959             err, exc=True)
2960
2961
2962 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2963   """Attaches the network on a list of drbd devices.
2964
2965   """
2966   bdevs = _FindDisks(nodes_ip, disks)
2967
2968   if multimaster:
2969     for idx, rd in enumerate(bdevs):
2970       try:
2971         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2972       except EnvironmentError, err:
2973         _Fail("Can't create symlink: %s", err)
2974   # reconnect disks, switch to new master configuration and if
2975   # needed primary mode
2976   for rd in bdevs:
2977     try:
2978       rd.AttachNet(multimaster)
2979     except errors.BlockDeviceError, err:
2980       _Fail("Can't change network configuration: %s", err)
2981
2982   # wait until the disks are connected; we need to retry the re-attach
2983   # if the device becomes standalone, as this might happen if the one
2984   # node disconnects and reconnects in a different mode before the
2985   # other node reconnects; in this case, one or both of the nodes will
2986   # decide it has wrong configuration and switch to standalone
2987
2988   def _Attach():
2989     all_connected = True
2990
2991     for rd in bdevs:
2992       stats = rd.GetProcStatus()
2993
2994       all_connected = (all_connected and
2995                        (stats.is_connected or stats.is_in_resync))
2996
2997       if stats.is_standalone:
2998         # peer had different config info and this node became
2999         # standalone, even though this should not happen with the
3000         # new staged way of changing disk configs
3001         try:
3002           rd.AttachNet(multimaster)
3003         except errors.BlockDeviceError, err:
3004           _Fail("Can't change network configuration: %s", err)
3005
3006     if not all_connected:
3007       raise utils.RetryAgain()
3008
3009   try:
3010     # Start with a delay of 100 miliseconds and go up to 5 seconds
3011     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
3012   except utils.RetryTimeout:
3013     _Fail("Timeout in disk reconnecting")
3014
3015   if multimaster:
3016     # change to primary mode
3017     for rd in bdevs:
3018       try:
3019         rd.Open()
3020       except errors.BlockDeviceError, err:
3021         _Fail("Can't change to primary mode: %s", err)
3022
3023
3024 def DrbdWaitSync(nodes_ip, disks):
3025   """Wait until DRBDs have synchronized.
3026
3027   """
3028   def _helper(rd):
3029     stats = rd.GetProcStatus()
3030     if not (stats.is_connected or stats.is_in_resync):
3031       raise utils.RetryAgain()
3032     return stats
3033
3034   bdevs = _FindDisks(nodes_ip, disks)
3035
3036   min_resync = 100
3037   alldone = True
3038   for rd in bdevs:
3039     try:
3040       # poll each second for 15 seconds
3041       stats = utils.Retry(_helper, 1, 15, args=[rd])
3042     except utils.RetryTimeout:
3043       stats = rd.GetProcStatus()
3044       # last check
3045       if not (stats.is_connected or stats.is_in_resync):
3046         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3047     alldone = alldone and (not stats.is_in_resync)
3048     if stats.sync_percent is not None:
3049       min_resync = min(min_resync, stats.sync_percent)
3050
3051   return (alldone, min_resync)
3052
3053
3054 def GetDrbdUsermodeHelper():
3055   """Returns DRBD usermode helper currently configured.
3056
3057   """
3058   try:
3059     return bdev.BaseDRBD.GetUsermodeHelper()
3060   except errors.BlockDeviceError, err:
3061     _Fail(str(err))
3062
3063
3064 def PowercycleNode(hypervisor_type):
3065   """Hard-powercycle the node.
3066
3067   Because we need to return first, and schedule the powercycle in the
3068   background, we won't be able to report failures nicely.
3069
3070   """
3071   hyper = hypervisor.GetHypervisor(hypervisor_type)
3072   try:
3073     pid = os.fork()
3074   except OSError:
3075     # if we can't fork, we'll pretend that we're in the child process
3076     pid = 0
3077   if pid > 0:
3078     return "Reboot scheduled in 5 seconds"
3079   # ensure the child is running on ram
3080   try:
3081     utils.Mlockall()
3082   except Exception: # pylint: disable-msg=W0703
3083     pass
3084   time.sleep(5)
3085   hyper.PowercycleNode()
3086
3087
3088 class HooksRunner(object):
3089   """Hook runner.
3090
3091   This class is instantiated on the node side (ganeti-noded) and not
3092   on the master side.
3093
3094   """
3095   def __init__(self, hooks_base_dir=None):
3096     """Constructor for hooks runner.
3097
3098     @type hooks_base_dir: str or None
3099     @param hooks_base_dir: if not None, this overrides the
3100         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3101
3102     """
3103     if hooks_base_dir is None:
3104       hooks_base_dir = constants.HOOKS_BASE_DIR
3105     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3106     # constant
3107     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3108
3109   def RunHooks(self, hpath, phase, env):
3110     """Run the scripts in the hooks directory.
3111
3112     @type hpath: str
3113     @param hpath: the path to the hooks directory which
3114         holds the scripts
3115     @type phase: str
3116     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3117         L{constants.HOOKS_PHASE_POST}
3118     @type env: dict
3119     @param env: dictionary with the environment for the hook
3120     @rtype: list
3121     @return: list of 3-element tuples:
3122       - script path
3123       - script result, either L{constants.HKR_SUCCESS} or
3124         L{constants.HKR_FAIL}
3125       - output of the script
3126
3127     @raise errors.ProgrammerError: for invalid input
3128         parameters
3129
3130     """
3131     if phase == constants.HOOKS_PHASE_PRE:
3132       suffix = "pre"
3133     elif phase == constants.HOOKS_PHASE_POST:
3134       suffix = "post"
3135     else:
3136       _Fail("Unknown hooks phase '%s'", phase)
3137
3138
3139     subdir = "%s-%s.d" % (hpath, suffix)
3140     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3141
3142     results = []
3143
3144     if not os.path.isdir(dir_name):
3145       # for non-existing/non-dirs, we simply exit instead of logging a
3146       # warning at every operation
3147       return results
3148
3149     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3150
3151     for (relname, relstatus, runresult)  in runparts_results:
3152       if relstatus == constants.RUNPARTS_SKIP:
3153         rrval = constants.HKR_SKIP
3154         output = ""
3155       elif relstatus == constants.RUNPARTS_ERR:
3156         rrval = constants.HKR_FAIL
3157         output = "Hook script execution error: %s" % runresult
3158       elif relstatus == constants.RUNPARTS_RUN:
3159         if runresult.failed:
3160           rrval = constants.HKR_FAIL
3161         else:
3162           rrval = constants.HKR_SUCCESS
3163         output = utils.SafeEncode(runresult.output.strip())
3164       results.append(("%s/%s" % (subdir, relname), rrval, output))
3165
3166     return results
3167
3168
3169 class IAllocatorRunner(object):
3170   """IAllocator runner.
3171
3172   This class is instantiated on the node side (ganeti-noded) and not on
3173   the master side.
3174
3175   """
3176   @staticmethod
3177   def Run(name, idata):
3178     """Run an iallocator script.
3179
3180     @type name: str
3181     @param name: the iallocator script name
3182     @type idata: str
3183     @param idata: the allocator input data
3184
3185     @rtype: tuple
3186     @return: two element tuple of:
3187        - status
3188        - either error message or stdout of allocator (for success)
3189
3190     """
3191     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3192                                   os.path.isfile)
3193     if alloc_script is None:
3194       _Fail("iallocator module '%s' not found in the search path", name)
3195
3196     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3197     try:
3198       os.write(fd, idata)
3199       os.close(fd)
3200       result = utils.RunCmd([alloc_script, fin_name])
3201       if result.failed:
3202         _Fail("iallocator module '%s' failed: %s, output '%s'",
3203               name, result.fail_reason, result.output)
3204     finally:
3205       os.unlink(fin_name)
3206
3207     return result.stdout
3208
3209
3210 class DevCacheManager(object):
3211   """Simple class for managing a cache of block device information.
3212
3213   """
3214   _DEV_PREFIX = "/dev/"
3215   _ROOT_DIR = constants.BDEV_CACHE_DIR
3216
3217   @classmethod
3218   def _ConvertPath(cls, dev_path):
3219     """Converts a /dev/name path to the cache file name.
3220
3221     This replaces slashes with underscores and strips the /dev
3222     prefix. It then returns the full path to the cache file.
3223
3224     @type dev_path: str
3225     @param dev_path: the C{/dev/} path name
3226     @rtype: str
3227     @return: the converted path name
3228
3229     """
3230     if dev_path.startswith(cls._DEV_PREFIX):
3231       dev_path = dev_path[len(cls._DEV_PREFIX):]
3232     dev_path = dev_path.replace("/", "_")
3233     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3234     return fpath
3235
3236   @classmethod
3237   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3238     """Updates the cache information for a given device.
3239
3240     @type dev_path: str
3241     @param dev_path: the pathname of the device
3242     @type owner: str
3243     @param owner: the owner (instance name) of the device
3244     @type on_primary: bool
3245     @param on_primary: whether this is the primary
3246         node nor not
3247     @type iv_name: str
3248     @param iv_name: the instance-visible name of the
3249         device, as in objects.Disk.iv_name
3250
3251     @rtype: None
3252
3253     """
3254     if dev_path is None:
3255       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3256       return
3257     fpath = cls._ConvertPath(dev_path)
3258     if on_primary:
3259       state = "primary"
3260     else:
3261       state = "secondary"
3262     if iv_name is None:
3263       iv_name = "not_visible"
3264     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3265     try:
3266       utils.WriteFile(fpath, data=fdata)
3267     except EnvironmentError, err:
3268       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3269
3270   @classmethod
3271   def RemoveCache(cls, dev_path):
3272     """Remove data for a dev_path.
3273
3274     This is just a wrapper over L{utils.RemoveFile} with a converted
3275     path name and logging.
3276
3277     @type dev_path: str
3278     @param dev_path: the pathname of the device
3279
3280     @rtype: None
3281
3282     """
3283     if dev_path is None:
3284       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3285       return
3286     fpath = cls._ConvertPath(dev_path)
3287     try:
3288       utils.RemoveFile(fpath)
3289     except EnvironmentError, err:
3290       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)