Removing all ssh setup code from the core
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2010 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50 import signal
51
52 from ganeti import errors
53 from ganeti import utils
54 from ganeti import ssh
55 from ganeti import hypervisor
56 from ganeti import constants
57 from ganeti import bdev
58 from ganeti import objects
59 from ganeti import ssconf
60 from ganeti import serializer
61 from ganeti import netutils
62
63
64 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
65 _ALLOWED_CLEAN_DIRS = frozenset([
66   constants.DATA_DIR,
67   constants.JOB_QUEUE_ARCHIVE_DIR,
68   constants.QUEUE_DIR,
69   constants.CRYPTO_KEYS_DIR,
70   ])
71 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
72 _X509_KEY_FILE = "key"
73 _X509_CERT_FILE = "cert"
74 _IES_STATUS_FILE = "status"
75 _IES_PID_FILE = "pid"
76 _IES_CA_FILE = "ca"
77
78
79 class RPCFail(Exception):
80   """Class denoting RPC failure.
81
82   Its argument is the error message.
83
84   """
85
86
87 def _Fail(msg, *args, **kwargs):
88   """Log an error and the raise an RPCFail exception.
89
90   This exception is then handled specially in the ganeti daemon and
91   turned into a 'failed' return type. As such, this function is a
92   useful shortcut for logging the error and returning it to the master
93   daemon.
94
95   @type msg: string
96   @param msg: the text of the exception
97   @raise RPCFail
98
99   """
100   if args:
101     msg = msg % args
102   if "log" not in kwargs or kwargs["log"]: # if we should log this error
103     if "exc" in kwargs and kwargs["exc"]:
104       logging.exception(msg)
105     else:
106       logging.error(msg)
107   raise RPCFail(msg)
108
109
110 def _GetConfig():
111   """Simple wrapper to return a SimpleStore.
112
113   @rtype: L{ssconf.SimpleStore}
114   @return: a SimpleStore instance
115
116   """
117   return ssconf.SimpleStore()
118
119
120 def _GetSshRunner(cluster_name):
121   """Simple wrapper to return an SshRunner.
122
123   @type cluster_name: str
124   @param cluster_name: the cluster name, which is needed
125       by the SshRunner constructor
126   @rtype: L{ssh.SshRunner}
127   @return: an SshRunner instance
128
129   """
130   return ssh.SshRunner(cluster_name)
131
132
133 def _Decompress(data):
134   """Unpacks data compressed by the RPC client.
135
136   @type data: list or tuple
137   @param data: Data sent by RPC client
138   @rtype: str
139   @return: Decompressed data
140
141   """
142   assert isinstance(data, (list, tuple))
143   assert len(data) == 2
144   (encoding, content) = data
145   if encoding == constants.RPC_ENCODING_NONE:
146     return content
147   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
148     return zlib.decompress(base64.b64decode(content))
149   else:
150     raise AssertionError("Unknown data encoding")
151
152
153 def _CleanDirectory(path, exclude=None):
154   """Removes all regular files in a directory.
155
156   @type path: str
157   @param path: the directory to clean
158   @type exclude: list
159   @param exclude: list of files to be excluded, defaults
160       to the empty list
161
162   """
163   if path not in _ALLOWED_CLEAN_DIRS:
164     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
165           path)
166
167   if not os.path.isdir(path):
168     return
169   if exclude is None:
170     exclude = []
171   else:
172     # Normalize excluded paths
173     exclude = [os.path.normpath(i) for i in exclude]
174
175   for rel_name in utils.ListVisibleFiles(path):
176     full_name = utils.PathJoin(path, rel_name)
177     if full_name in exclude:
178       continue
179     if os.path.isfile(full_name) and not os.path.islink(full_name):
180       utils.RemoveFile(full_name)
181
182
183 def _BuildUploadFileList():
184   """Build the list of allowed upload files.
185
186   This is abstracted so that it's built only once at module import time.
187
188   """
189   allowed_files = set([
190     constants.CLUSTER_CONF_FILE,
191     constants.ETC_HOSTS,
192     constants.SSH_KNOWN_HOSTS_FILE,
193     constants.VNC_PASSWORD_FILE,
194     constants.RAPI_CERT_FILE,
195     constants.RAPI_USERS_FILE,
196     constants.CONFD_HMAC_KEY,
197     constants.CLUSTER_DOMAIN_SECRET_FILE,
198     ])
199
200   for hv_name in constants.HYPER_TYPES:
201     hv_class = hypervisor.GetHypervisorClass(hv_name)
202     allowed_files.update(hv_class.GetAncillaryFiles())
203
204   return frozenset(allowed_files)
205
206
207 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
208
209
210 def JobQueuePurge():
211   """Removes job queue files and archived jobs.
212
213   @rtype: tuple
214   @return: True, None
215
216   """
217   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
218   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
219
220
221 def GetMasterInfo():
222   """Returns master information.
223
224   This is an utility function to compute master information, either
225   for consumption here or from the node daemon.
226
227   @rtype: tuple
228   @return: master_netdev, master_ip, master_name
229   @raise RPCFail: in case of errors
230
231   """
232   try:
233     cfg = _GetConfig()
234     master_netdev = cfg.GetMasterNetdev()
235     master_ip = cfg.GetMasterIP()
236     master_node = cfg.GetMasterNode()
237   except errors.ConfigurationError, err:
238     _Fail("Cluster configuration incomplete: %s", err, exc=True)
239   return (master_netdev, master_ip, master_node)
240
241
242 def StartMaster(start_daemons, no_voting):
243   """Activate local node as master node.
244
245   The function will either try activate the IP address of the master
246   (unless someone else has it) or also start the master daemons, based
247   on the start_daemons parameter.
248
249   @type start_daemons: boolean
250   @param start_daemons: whether to start the master daemons
251       (ganeti-masterd and ganeti-rapi), or (if false) activate the
252       master ip
253   @type no_voting: boolean
254   @param no_voting: whether to start ganeti-masterd without a node vote
255       (if start_daemons is True), but still non-interactively
256   @rtype: None
257
258   """
259   # GetMasterInfo will raise an exception if not able to return data
260   master_netdev, master_ip, _ = GetMasterInfo()
261
262   err_msgs = []
263   # either start the master and rapi daemons
264   if start_daemons:
265     if no_voting:
266       masterd_args = "--no-voting --yes-do-it"
267     else:
268       masterd_args = ""
269
270     env = {
271       "EXTRA_MASTERD_ARGS": masterd_args,
272       }
273
274     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
275     if result.failed:
276       msg = "Can't start Ganeti master: %s" % result.output
277       logging.error(msg)
278       err_msgs.append(msg)
279   # or activate the IP
280   else:
281     if netutils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
282       if netutils.IPAddress.Own(master_ip):
283         # we already have the ip:
284         logging.debug("Master IP already configured, doing nothing")
285       else:
286         msg = "Someone else has the master ip, not activating"
287         logging.error(msg)
288         err_msgs.append(msg)
289     else:
290       netmask = 32
291       if netutils.IP6Address.IsValid(master_ip):
292         netmask = 128
293
294       result = utils.RunCmd(["ip", "address", "add",
295                              "%s/%d" % (master_ip, netmask),
296                              "dev", master_netdev, "label",
297                              "%s:0" % master_netdev])
298       if result.failed:
299         msg = "Can't activate master IP: %s" % result.output
300         logging.error(msg)
301         err_msgs.append(msg)
302
303       # we ignore the exit code of the following cmds
304       if netutils.IP4Address.IsValid(master_ip):
305         utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev, "-s",
306                       master_ip, master_ip])
307       elif netutils.IP6Address.IsValid(master_ip):
308         utils.RunCmd(["ndisc6", "-q", "-r 3", master_ip, master_netdev])
309
310   if err_msgs:
311     _Fail("; ".join(err_msgs))
312
313
314 def StopMaster(stop_daemons):
315   """Deactivate this node as master.
316
317   The function will always try to deactivate the IP address of the
318   master. It will also stop the master daemons depending on the
319   stop_daemons parameter.
320
321   @type stop_daemons: boolean
322   @param stop_daemons: whether to also stop the master daemons
323       (ganeti-masterd and ganeti-rapi)
324   @rtype: None
325
326   """
327   # TODO: log and report back to the caller the error failures; we
328   # need to decide in which case we fail the RPC for this
329
330   # GetMasterInfo will raise an exception if not able to return data
331   master_netdev, master_ip, _ = GetMasterInfo()
332
333   netmask = 32
334   if netutils.IP6Address.IsValid(master_ip):
335     netmask = 128
336
337   result = utils.RunCmd(["ip", "address", "del",
338                          "%s/%d" % (master_ip, netmask),
339                          "dev", master_netdev])
340   if result.failed:
341     logging.error("Can't remove the master IP, error: %s", result.output)
342     # but otherwise ignore the failure
343
344   if stop_daemons:
345     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
346     if result.failed:
347       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
348                     " and error %s",
349                     result.cmd, result.exit_code, result.output)
350
351
352 def LeaveCluster(modify_ssh_setup):
353   """Cleans up and remove the current node.
354
355   This function cleans up and prepares the current node to be removed
356   from the cluster.
357
358   If processing is successful, then it raises an
359   L{errors.QuitGanetiException} which is used as a special case to
360   shutdown the node daemon.
361
362   @param modify_ssh_setup: boolean
363
364   """
365   _CleanDirectory(constants.DATA_DIR)
366   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
367   JobQueuePurge()
368
369   if modify_ssh_setup:
370     try:
371       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
372
373       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
374
375       utils.RemoveFile(priv_key)
376       utils.RemoveFile(pub_key)
377     except errors.OpExecError:
378       logging.exception("Error while processing ssh files")
379
380   try:
381     utils.RemoveFile(constants.CONFD_HMAC_KEY)
382     utils.RemoveFile(constants.RAPI_CERT_FILE)
383     utils.RemoveFile(constants.NODED_CERT_FILE)
384   except: # pylint: disable-msg=W0702
385     logging.exception("Error while removing cluster secrets")
386
387   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
388   if result.failed:
389     logging.error("Command %s failed with exitcode %s and error %s",
390                   result.cmd, result.exit_code, result.output)
391
392   # Raise a custom exception (handled in ganeti-noded)
393   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
394
395
396 def GetNodeInfo(vgname, hypervisor_type):
397   """Gives back a hash with different information about the node.
398
399   @type vgname: C{string}
400   @param vgname: the name of the volume group to ask for disk space information
401   @type hypervisor_type: C{str}
402   @param hypervisor_type: the name of the hypervisor to ask for
403       memory information
404   @rtype: C{dict}
405   @return: dictionary with the following keys:
406       - vg_size is the size of the configured volume group in MiB
407       - vg_free is the free size of the volume group in MiB
408       - memory_dom0 is the memory allocated for domain0 in MiB
409       - memory_free is the currently available (free) ram in MiB
410       - memory_total is the total number of ram in MiB
411
412   """
413   outputarray = {}
414   vginfo = _GetVGInfo(vgname)
415   outputarray['vg_size'] = vginfo['vg_size']
416   outputarray['vg_free'] = vginfo['vg_free']
417
418   hyper = hypervisor.GetHypervisor(hypervisor_type)
419   hyp_info = hyper.GetNodeInfo()
420   if hyp_info is not None:
421     outputarray.update(hyp_info)
422
423   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
424
425   return outputarray
426
427
428 def VerifyNode(what, cluster_name):
429   """Verify the status of the local node.
430
431   Based on the input L{what} parameter, various checks are done on the
432   local node.
433
434   If the I{filelist} key is present, this list of
435   files is checksummed and the file/checksum pairs are returned.
436
437   If the I{nodelist} key is present, we check that we have
438   connectivity via ssh with the target nodes (and check the hostname
439   report).
440
441   If the I{node-net-test} key is present, we check that we have
442   connectivity to the given nodes via both primary IP and, if
443   applicable, secondary IPs.
444
445   @type what: C{dict}
446   @param what: a dictionary of things to check:
447       - filelist: list of files for which to compute checksums
448       - nodelist: list of nodes we should check ssh communication with
449       - node-net-test: list of nodes we should check node daemon port
450         connectivity with
451       - hypervisor: list with hypervisors to run the verify for
452   @rtype: dict
453   @return: a dictionary with the same keys as the input dict, and
454       values representing the result of the checks
455
456   """
457   result = {}
458   my_name = netutils.Hostname.GetSysName()
459   port = netutils.GetDaemonPort(constants.NODED)
460
461   if constants.NV_HYPERVISOR in what:
462     result[constants.NV_HYPERVISOR] = tmp = {}
463     for hv_name in what[constants.NV_HYPERVISOR]:
464       try:
465         val = hypervisor.GetHypervisor(hv_name).Verify()
466       except errors.HypervisorError, err:
467         val = "Error while checking hypervisor: %s" % str(err)
468       tmp[hv_name] = val
469
470   if constants.NV_FILELIST in what:
471     result[constants.NV_FILELIST] = utils.FingerprintFiles(
472       what[constants.NV_FILELIST])
473
474   if constants.NV_NODELIST in what:
475     result[constants.NV_NODELIST] = tmp = {}
476     random.shuffle(what[constants.NV_NODELIST])
477     for node in what[constants.NV_NODELIST]:
478       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
479       if not success:
480         tmp[node] = message
481
482   if constants.NV_NODENETTEST in what:
483     result[constants.NV_NODENETTEST] = tmp = {}
484     my_pip = my_sip = None
485     for name, pip, sip in what[constants.NV_NODENETTEST]:
486       if name == my_name:
487         my_pip = pip
488         my_sip = sip
489         break
490     if not my_pip:
491       tmp[my_name] = ("Can't find my own primary/secondary IP"
492                       " in the node list")
493     else:
494       for name, pip, sip in what[constants.NV_NODENETTEST]:
495         fail = []
496         if not netutils.TcpPing(pip, port, source=my_pip):
497           fail.append("primary")
498         if sip != pip:
499           if not netutils.TcpPing(sip, port, source=my_sip):
500             fail.append("secondary")
501         if fail:
502           tmp[name] = ("failure using the %s interface(s)" %
503                        " and ".join(fail))
504
505   if constants.NV_MASTERIP in what:
506     # FIXME: add checks on incoming data structures (here and in the
507     # rest of the function)
508     master_name, master_ip = what[constants.NV_MASTERIP]
509     if master_name == my_name:
510       source = constants.IP4_ADDRESS_LOCALHOST
511     else:
512       source = None
513     result[constants.NV_MASTERIP] = netutils.TcpPing(master_ip, port,
514                                                   source=source)
515
516   if constants.NV_LVLIST in what:
517     try:
518       val = GetVolumeList(what[constants.NV_LVLIST])
519     except RPCFail, err:
520       val = str(err)
521     result[constants.NV_LVLIST] = val
522
523   if constants.NV_INSTANCELIST in what:
524     # GetInstanceList can fail
525     try:
526       val = GetInstanceList(what[constants.NV_INSTANCELIST])
527     except RPCFail, err:
528       val = str(err)
529     result[constants.NV_INSTANCELIST] = val
530
531   if constants.NV_VGLIST in what:
532     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
533
534   if constants.NV_PVLIST in what:
535     result[constants.NV_PVLIST] = \
536       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
537                                    filter_allocatable=False)
538
539   if constants.NV_VERSION in what:
540     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
541                                     constants.RELEASE_VERSION)
542
543   if constants.NV_HVINFO in what:
544     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
545     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
546
547   if constants.NV_DRBDLIST in what:
548     try:
549       used_minors = bdev.DRBD8.GetUsedDevs().keys()
550     except errors.BlockDeviceError, err:
551       logging.warning("Can't get used minors list", exc_info=True)
552       used_minors = str(err)
553     result[constants.NV_DRBDLIST] = used_minors
554
555   if constants.NV_DRBDHELPER in what:
556     status = True
557     try:
558       payload = bdev.BaseDRBD.GetUsermodeHelper()
559     except errors.BlockDeviceError, err:
560       logging.error("Can't get DRBD usermode helper: %s", str(err))
561       status = False
562       payload = str(err)
563     result[constants.NV_DRBDHELPER] = (status, payload)
564
565   if constants.NV_NODESETUP in what:
566     result[constants.NV_NODESETUP] = tmpr = []
567     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
568       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
569                   " under /sys, missing required directories /sys/block"
570                   " and /sys/class/net")
571     if (not os.path.isdir("/proc/sys") or
572         not os.path.isfile("/proc/sysrq-trigger")):
573       tmpr.append("The procfs filesystem doesn't seem to be mounted"
574                   " under /proc, missing required directory /proc/sys and"
575                   " the file /proc/sysrq-trigger")
576
577   if constants.NV_TIME in what:
578     result[constants.NV_TIME] = utils.SplitTime(time.time())
579
580   if constants.NV_OSLIST in what:
581     result[constants.NV_OSLIST] = DiagnoseOS()
582
583   return result
584
585
586 def GetVolumeList(vg_name):
587   """Compute list of logical volumes and their size.
588
589   @type vg_name: str
590   @param vg_name: the volume group whose LVs we should list
591   @rtype: dict
592   @return:
593       dictionary of all partions (key) with value being a tuple of
594       their size (in MiB), inactive and online status::
595
596         {'test1': ('20.06', True, True)}
597
598       in case of errors, a string is returned with the error
599       details.
600
601   """
602   lvs = {}
603   sep = '|'
604   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
605                          "--separator=%s" % sep,
606                          "-olv_name,lv_size,lv_attr", vg_name])
607   if result.failed:
608     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
609
610   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
611   for line in result.stdout.splitlines():
612     line = line.strip()
613     match = valid_line_re.match(line)
614     if not match:
615       logging.error("Invalid line returned from lvs output: '%s'", line)
616       continue
617     name, size, attr = match.groups()
618     inactive = attr[4] == '-'
619     online = attr[5] == 'o'
620     virtual = attr[0] == 'v'
621     if virtual:
622       # we don't want to report such volumes as existing, since they
623       # don't really hold data
624       continue
625     lvs[name] = (size, inactive, online)
626
627   return lvs
628
629
630 def ListVolumeGroups():
631   """List the volume groups and their size.
632
633   @rtype: dict
634   @return: dictionary with keys volume name and values the
635       size of the volume
636
637   """
638   return utils.ListVolumeGroups()
639
640
641 def NodeVolumes():
642   """List all volumes on this node.
643
644   @rtype: list
645   @return:
646     A list of dictionaries, each having four keys:
647       - name: the logical volume name,
648       - size: the size of the logical volume
649       - dev: the physical device on which the LV lives
650       - vg: the volume group to which it belongs
651
652     In case of errors, we return an empty list and log the
653     error.
654
655     Note that since a logical volume can live on multiple physical
656     volumes, the resulting list might include a logical volume
657     multiple times.
658
659   """
660   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
661                          "--separator=|",
662                          "--options=lv_name,lv_size,devices,vg_name"])
663   if result.failed:
664     _Fail("Failed to list logical volumes, lvs output: %s",
665           result.output)
666
667   def parse_dev(dev):
668     return dev.split('(')[0]
669
670   def handle_dev(dev):
671     return [parse_dev(x) for x in dev.split(",")]
672
673   def map_line(line):
674     line = [v.strip() for v in line]
675     return [{'name': line[0], 'size': line[1],
676              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
677
678   all_devs = []
679   for line in result.stdout.splitlines():
680     if line.count('|') >= 3:
681       all_devs.extend(map_line(line.split('|')))
682     else:
683       logging.warning("Strange line in the output from lvs: '%s'", line)
684   return all_devs
685
686
687 def BridgesExist(bridges_list):
688   """Check if a list of bridges exist on the current node.
689
690   @rtype: boolean
691   @return: C{True} if all of them exist, C{False} otherwise
692
693   """
694   missing = []
695   for bridge in bridges_list:
696     if not utils.BridgeExists(bridge):
697       missing.append(bridge)
698
699   if missing:
700     _Fail("Missing bridges %s", utils.CommaJoin(missing))
701
702
703 def GetInstanceList(hypervisor_list):
704   """Provides a list of instances.
705
706   @type hypervisor_list: list
707   @param hypervisor_list: the list of hypervisors to query information
708
709   @rtype: list
710   @return: a list of all running instances on the current node
711     - instance1.example.com
712     - instance2.example.com
713
714   """
715   results = []
716   for hname in hypervisor_list:
717     try:
718       names = hypervisor.GetHypervisor(hname).ListInstances()
719       results.extend(names)
720     except errors.HypervisorError, err:
721       _Fail("Error enumerating instances (hypervisor %s): %s",
722             hname, err, exc=True)
723
724   return results
725
726
727 def GetInstanceInfo(instance, hname):
728   """Gives back the information about an instance as a dictionary.
729
730   @type instance: string
731   @param instance: the instance name
732   @type hname: string
733   @param hname: the hypervisor type of the instance
734
735   @rtype: dict
736   @return: dictionary with the following keys:
737       - memory: memory size of instance (int)
738       - state: xen state of instance (string)
739       - time: cpu time of instance (float)
740
741   """
742   output = {}
743
744   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
745   if iinfo is not None:
746     output['memory'] = iinfo[2]
747     output['state'] = iinfo[4]
748     output['time'] = iinfo[5]
749
750   return output
751
752
753 def GetInstanceMigratable(instance):
754   """Gives whether an instance can be migrated.
755
756   @type instance: L{objects.Instance}
757   @param instance: object representing the instance to be checked.
758
759   @rtype: tuple
760   @return: tuple of (result, description) where:
761       - result: whether the instance can be migrated or not
762       - description: a description of the issue, if relevant
763
764   """
765   hyper = hypervisor.GetHypervisor(instance.hypervisor)
766   iname = instance.name
767   if iname not in hyper.ListInstances():
768     _Fail("Instance %s is not running", iname)
769
770   for idx in range(len(instance.disks)):
771     link_name = _GetBlockDevSymlinkPath(iname, idx)
772     if not os.path.islink(link_name):
773       logging.warning("Instance %s is missing symlink %s for disk %d",
774                       iname, link_name, idx)
775
776
777 def GetAllInstancesInfo(hypervisor_list):
778   """Gather data about all instances.
779
780   This is the equivalent of L{GetInstanceInfo}, except that it
781   computes data for all instances at once, thus being faster if one
782   needs data about more than one instance.
783
784   @type hypervisor_list: list
785   @param hypervisor_list: list of hypervisors to query for instance data
786
787   @rtype: dict
788   @return: dictionary of instance: data, with data having the following keys:
789       - memory: memory size of instance (int)
790       - state: xen state of instance (string)
791       - time: cpu time of instance (float)
792       - vcpus: the number of vcpus
793
794   """
795   output = {}
796
797   for hname in hypervisor_list:
798     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
799     if iinfo:
800       for name, _, memory, vcpus, state, times in iinfo:
801         value = {
802           'memory': memory,
803           'vcpus': vcpus,
804           'state': state,
805           'time': times,
806           }
807         if name in output:
808           # we only check static parameters, like memory and vcpus,
809           # and not state and time which can change between the
810           # invocations of the different hypervisors
811           for key in 'memory', 'vcpus':
812             if value[key] != output[name][key]:
813               _Fail("Instance %s is running twice"
814                     " with different parameters", name)
815         output[name] = value
816
817   return output
818
819
820 def _InstanceLogName(kind, os_name, instance):
821   """Compute the OS log filename for a given instance and operation.
822
823   The instance name and os name are passed in as strings since not all
824   operations have these as part of an instance object.
825
826   @type kind: string
827   @param kind: the operation type (e.g. add, import, etc.)
828   @type os_name: string
829   @param os_name: the os name
830   @type instance: string
831   @param instance: the name of the instance being imported/added/etc.
832
833   """
834   # TODO: Use tempfile.mkstemp to create unique filename
835   base = ("%s-%s-%s-%s.log" %
836           (kind, os_name, instance, utils.TimestampForFilename()))
837   return utils.PathJoin(constants.LOG_OS_DIR, base)
838
839
840 def InstanceOsAdd(instance, reinstall, debug):
841   """Add an OS to an instance.
842
843   @type instance: L{objects.Instance}
844   @param instance: Instance whose OS is to be installed
845   @type reinstall: boolean
846   @param reinstall: whether this is an instance reinstall
847   @type debug: integer
848   @param debug: debug level, passed to the OS scripts
849   @rtype: None
850
851   """
852   inst_os = OSFromDisk(instance.os)
853
854   create_env = OSEnvironment(instance, inst_os, debug)
855   if reinstall:
856     create_env['INSTANCE_REINSTALL'] = "1"
857
858   logfile = _InstanceLogName("add", instance.os, instance.name)
859
860   result = utils.RunCmd([inst_os.create_script], env=create_env,
861                         cwd=inst_os.path, output=logfile,)
862   if result.failed:
863     logging.error("os create command '%s' returned error: %s, logfile: %s,"
864                   " output: %s", result.cmd, result.fail_reason, logfile,
865                   result.output)
866     lines = [utils.SafeEncode(val)
867              for val in utils.TailFile(logfile, lines=20)]
868     _Fail("OS create script failed (%s), last lines in the"
869           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
870
871
872 def RunRenameInstance(instance, old_name, debug):
873   """Run the OS rename script for an instance.
874
875   @type instance: L{objects.Instance}
876   @param instance: Instance whose OS is to be installed
877   @type old_name: string
878   @param old_name: previous instance name
879   @type debug: integer
880   @param debug: debug level, passed to the OS scripts
881   @rtype: boolean
882   @return: the success of the operation
883
884   """
885   inst_os = OSFromDisk(instance.os)
886
887   rename_env = OSEnvironment(instance, inst_os, debug)
888   rename_env['OLD_INSTANCE_NAME'] = old_name
889
890   logfile = _InstanceLogName("rename", instance.os,
891                              "%s-%s" % (old_name, instance.name))
892
893   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
894                         cwd=inst_os.path, output=logfile)
895
896   if result.failed:
897     logging.error("os create command '%s' returned error: %s output: %s",
898                   result.cmd, result.fail_reason, result.output)
899     lines = [utils.SafeEncode(val)
900              for val in utils.TailFile(logfile, lines=20)]
901     _Fail("OS rename script failed (%s), last lines in the"
902           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
903
904
905 def _GetVGInfo(vg_name):
906   """Get information about the volume group.
907
908   @type vg_name: str
909   @param vg_name: the volume group which we query
910   @rtype: dict
911   @return:
912     A dictionary with the following keys:
913       - C{vg_size} is the total size of the volume group in MiB
914       - C{vg_free} is the free size of the volume group in MiB
915       - C{pv_count} are the number of physical disks in that VG
916
917     If an error occurs during gathering of data, we return the same dict
918     with keys all set to None.
919
920   """
921   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
922
923   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
924                          "--nosuffix", "--units=m", "--separator=:", vg_name])
925
926   if retval.failed:
927     logging.error("volume group %s not present", vg_name)
928     return retdic
929   valarr = retval.stdout.strip().rstrip(':').split(':')
930   if len(valarr) == 3:
931     try:
932       retdic = {
933         "vg_size": int(round(float(valarr[0]), 0)),
934         "vg_free": int(round(float(valarr[1]), 0)),
935         "pv_count": int(valarr[2]),
936         }
937     except (TypeError, ValueError), err:
938       logging.exception("Fail to parse vgs output: %s", err)
939   else:
940     logging.error("vgs output has the wrong number of fields (expected"
941                   " three): %s", str(valarr))
942   return retdic
943
944
945 def _GetBlockDevSymlinkPath(instance_name, idx):
946   return utils.PathJoin(constants.DISK_LINKS_DIR,
947                         "%s:%d" % (instance_name, idx))
948
949
950 def _SymlinkBlockDev(instance_name, device_path, idx):
951   """Set up symlinks to a instance's block device.
952
953   This is an auxiliary function run when an instance is start (on the primary
954   node) or when an instance is migrated (on the target node).
955
956
957   @param instance_name: the name of the target instance
958   @param device_path: path of the physical block device, on the node
959   @param idx: the disk index
960   @return: absolute path to the disk's symlink
961
962   """
963   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
964   try:
965     os.symlink(device_path, link_name)
966   except OSError, err:
967     if err.errno == errno.EEXIST:
968       if (not os.path.islink(link_name) or
969           os.readlink(link_name) != device_path):
970         os.remove(link_name)
971         os.symlink(device_path, link_name)
972     else:
973       raise
974
975   return link_name
976
977
978 def _RemoveBlockDevLinks(instance_name, disks):
979   """Remove the block device symlinks belonging to the given instance.
980
981   """
982   for idx, _ in enumerate(disks):
983     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
984     if os.path.islink(link_name):
985       try:
986         os.remove(link_name)
987       except OSError:
988         logging.exception("Can't remove symlink '%s'", link_name)
989
990
991 def _GatherAndLinkBlockDevs(instance):
992   """Set up an instance's block device(s).
993
994   This is run on the primary node at instance startup. The block
995   devices must be already assembled.
996
997   @type instance: L{objects.Instance}
998   @param instance: the instance whose disks we shoul assemble
999   @rtype: list
1000   @return: list of (disk_object, device_path)
1001
1002   """
1003   block_devices = []
1004   for idx, disk in enumerate(instance.disks):
1005     device = _RecursiveFindBD(disk)
1006     if device is None:
1007       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1008                                     str(disk))
1009     device.Open()
1010     try:
1011       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1012     except OSError, e:
1013       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1014                                     e.strerror)
1015
1016     block_devices.append((disk, link_name))
1017
1018   return block_devices
1019
1020
1021 def StartInstance(instance):
1022   """Start an instance.
1023
1024   @type instance: L{objects.Instance}
1025   @param instance: the instance object
1026   @rtype: None
1027
1028   """
1029   running_instances = GetInstanceList([instance.hypervisor])
1030
1031   if instance.name in running_instances:
1032     logging.info("Instance %s already running, not starting", instance.name)
1033     return
1034
1035   try:
1036     block_devices = _GatherAndLinkBlockDevs(instance)
1037     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1038     hyper.StartInstance(instance, block_devices)
1039   except errors.BlockDeviceError, err:
1040     _Fail("Block device error: %s", err, exc=True)
1041   except errors.HypervisorError, err:
1042     _RemoveBlockDevLinks(instance.name, instance.disks)
1043     _Fail("Hypervisor error: %s", err, exc=True)
1044
1045
1046 def InstanceShutdown(instance, timeout):
1047   """Shut an instance down.
1048
1049   @note: this functions uses polling with a hardcoded timeout.
1050
1051   @type instance: L{objects.Instance}
1052   @param instance: the instance object
1053   @type timeout: integer
1054   @param timeout: maximum timeout for soft shutdown
1055   @rtype: None
1056
1057   """
1058   hv_name = instance.hypervisor
1059   hyper = hypervisor.GetHypervisor(hv_name)
1060   iname = instance.name
1061
1062   if instance.name not in hyper.ListInstances():
1063     logging.info("Instance %s not running, doing nothing", iname)
1064     return
1065
1066   class _TryShutdown:
1067     def __init__(self):
1068       self.tried_once = False
1069
1070     def __call__(self):
1071       if iname not in hyper.ListInstances():
1072         return
1073
1074       try:
1075         hyper.StopInstance(instance, retry=self.tried_once)
1076       except errors.HypervisorError, err:
1077         if iname not in hyper.ListInstances():
1078           # if the instance is no longer existing, consider this a
1079           # success and go to cleanup
1080           return
1081
1082         _Fail("Failed to stop instance %s: %s", iname, err)
1083
1084       self.tried_once = True
1085
1086       raise utils.RetryAgain()
1087
1088   try:
1089     utils.Retry(_TryShutdown(), 5, timeout)
1090   except utils.RetryTimeout:
1091     # the shutdown did not succeed
1092     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1093
1094     try:
1095       hyper.StopInstance(instance, force=True)
1096     except errors.HypervisorError, err:
1097       if iname in hyper.ListInstances():
1098         # only raise an error if the instance still exists, otherwise
1099         # the error could simply be "instance ... unknown"!
1100         _Fail("Failed to force stop instance %s: %s", iname, err)
1101
1102     time.sleep(1)
1103
1104     if iname in hyper.ListInstances():
1105       _Fail("Could not shutdown instance %s even by destroy", iname)
1106
1107   try:
1108     hyper.CleanupInstance(instance.name)
1109   except errors.HypervisorError, err:
1110     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1111
1112   _RemoveBlockDevLinks(iname, instance.disks)
1113
1114
1115 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1116   """Reboot an instance.
1117
1118   @type instance: L{objects.Instance}
1119   @param instance: the instance object to reboot
1120   @type reboot_type: str
1121   @param reboot_type: the type of reboot, one the following
1122     constants:
1123       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1124         instance OS, do not recreate the VM
1125       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1126         restart the VM (at the hypervisor level)
1127       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1128         not accepted here, since that mode is handled differently, in
1129         cmdlib, and translates into full stop and start of the
1130         instance (instead of a call_instance_reboot RPC)
1131   @type shutdown_timeout: integer
1132   @param shutdown_timeout: maximum timeout for soft shutdown
1133   @rtype: None
1134
1135   """
1136   running_instances = GetInstanceList([instance.hypervisor])
1137
1138   if instance.name not in running_instances:
1139     _Fail("Cannot reboot instance %s that is not running", instance.name)
1140
1141   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1142   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1143     try:
1144       hyper.RebootInstance(instance)
1145     except errors.HypervisorError, err:
1146       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1147   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1148     try:
1149       InstanceShutdown(instance, shutdown_timeout)
1150       return StartInstance(instance)
1151     except errors.HypervisorError, err:
1152       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1153   else:
1154     _Fail("Invalid reboot_type received: %s", reboot_type)
1155
1156
1157 def MigrationInfo(instance):
1158   """Gather information about an instance to be migrated.
1159
1160   @type instance: L{objects.Instance}
1161   @param instance: the instance definition
1162
1163   """
1164   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1165   try:
1166     info = hyper.MigrationInfo(instance)
1167   except errors.HypervisorError, err:
1168     _Fail("Failed to fetch migration information: %s", err, exc=True)
1169   return info
1170
1171
1172 def AcceptInstance(instance, info, target):
1173   """Prepare the node to accept an instance.
1174
1175   @type instance: L{objects.Instance}
1176   @param instance: the instance definition
1177   @type info: string/data (opaque)
1178   @param info: migration information, from the source node
1179   @type target: string
1180   @param target: target host (usually ip), on this node
1181
1182   """
1183   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1184   try:
1185     hyper.AcceptInstance(instance, info, target)
1186   except errors.HypervisorError, err:
1187     _Fail("Failed to accept instance: %s", err, exc=True)
1188
1189
1190 def FinalizeMigration(instance, info, success):
1191   """Finalize any preparation to accept an instance.
1192
1193   @type instance: L{objects.Instance}
1194   @param instance: the instance definition
1195   @type info: string/data (opaque)
1196   @param info: migration information, from the source node
1197   @type success: boolean
1198   @param success: whether the migration was a success or a failure
1199
1200   """
1201   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1202   try:
1203     hyper.FinalizeMigration(instance, info, success)
1204   except errors.HypervisorError, err:
1205     _Fail("Failed to finalize migration: %s", err, exc=True)
1206
1207
1208 def MigrateInstance(instance, target, live):
1209   """Migrates an instance to another node.
1210
1211   @type instance: L{objects.Instance}
1212   @param instance: the instance definition
1213   @type target: string
1214   @param target: the target node name
1215   @type live: boolean
1216   @param live: whether the migration should be done live or not (the
1217       interpretation of this parameter is left to the hypervisor)
1218   @rtype: tuple
1219   @return: a tuple of (success, msg) where:
1220       - succes is a boolean denoting the success/failure of the operation
1221       - msg is a string with details in case of failure
1222
1223   """
1224   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1225
1226   try:
1227     hyper.MigrateInstance(instance, target, live)
1228   except errors.HypervisorError, err:
1229     _Fail("Failed to migrate instance: %s", err, exc=True)
1230
1231
1232 def BlockdevCreate(disk, size, owner, on_primary, info):
1233   """Creates a block device for an instance.
1234
1235   @type disk: L{objects.Disk}
1236   @param disk: the object describing the disk we should create
1237   @type size: int
1238   @param size: the size of the physical underlying device, in MiB
1239   @type owner: str
1240   @param owner: the name of the instance for which disk is created,
1241       used for device cache data
1242   @type on_primary: boolean
1243   @param on_primary:  indicates if it is the primary node or not
1244   @type info: string
1245   @param info: string that will be sent to the physical device
1246       creation, used for example to set (LVM) tags on LVs
1247
1248   @return: the new unique_id of the device (this can sometime be
1249       computed only after creation), or None. On secondary nodes,
1250       it's not required to return anything.
1251
1252   """
1253   # TODO: remove the obsolete 'size' argument
1254   # pylint: disable-msg=W0613
1255   clist = []
1256   if disk.children:
1257     for child in disk.children:
1258       try:
1259         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1260       except errors.BlockDeviceError, err:
1261         _Fail("Can't assemble device %s: %s", child, err)
1262       if on_primary or disk.AssembleOnSecondary():
1263         # we need the children open in case the device itself has to
1264         # be assembled
1265         try:
1266           # pylint: disable-msg=E1103
1267           crdev.Open()
1268         except errors.BlockDeviceError, err:
1269           _Fail("Can't make child '%s' read-write: %s", child, err)
1270       clist.append(crdev)
1271
1272   try:
1273     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1274   except errors.BlockDeviceError, err:
1275     _Fail("Can't create block device: %s", err)
1276
1277   if on_primary or disk.AssembleOnSecondary():
1278     try:
1279       device.Assemble()
1280     except errors.BlockDeviceError, err:
1281       _Fail("Can't assemble device after creation, unusual event: %s", err)
1282     device.SetSyncSpeed(constants.SYNC_SPEED)
1283     if on_primary or disk.OpenOnSecondary():
1284       try:
1285         device.Open(force=True)
1286       except errors.BlockDeviceError, err:
1287         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1288     DevCacheManager.UpdateCache(device.dev_path, owner,
1289                                 on_primary, disk.iv_name)
1290
1291   device.SetInfo(info)
1292
1293   return device.unique_id
1294
1295
1296 def BlockdevRemove(disk):
1297   """Remove a block device.
1298
1299   @note: This is intended to be called recursively.
1300
1301   @type disk: L{objects.Disk}
1302   @param disk: the disk object we should remove
1303   @rtype: boolean
1304   @return: the success of the operation
1305
1306   """
1307   msgs = []
1308   try:
1309     rdev = _RecursiveFindBD(disk)
1310   except errors.BlockDeviceError, err:
1311     # probably can't attach
1312     logging.info("Can't attach to device %s in remove", disk)
1313     rdev = None
1314   if rdev is not None:
1315     r_path = rdev.dev_path
1316     try:
1317       rdev.Remove()
1318     except errors.BlockDeviceError, err:
1319       msgs.append(str(err))
1320     if not msgs:
1321       DevCacheManager.RemoveCache(r_path)
1322
1323   if disk.children:
1324     for child in disk.children:
1325       try:
1326         BlockdevRemove(child)
1327       except RPCFail, err:
1328         msgs.append(str(err))
1329
1330   if msgs:
1331     _Fail("; ".join(msgs))
1332
1333
1334 def _RecursiveAssembleBD(disk, owner, as_primary):
1335   """Activate a block device for an instance.
1336
1337   This is run on the primary and secondary nodes for an instance.
1338
1339   @note: this function is called recursively.
1340
1341   @type disk: L{objects.Disk}
1342   @param disk: the disk we try to assemble
1343   @type owner: str
1344   @param owner: the name of the instance which owns the disk
1345   @type as_primary: boolean
1346   @param as_primary: if we should make the block device
1347       read/write
1348
1349   @return: the assembled device or None (in case no device
1350       was assembled)
1351   @raise errors.BlockDeviceError: in case there is an error
1352       during the activation of the children or the device
1353       itself
1354
1355   """
1356   children = []
1357   if disk.children:
1358     mcn = disk.ChildrenNeeded()
1359     if mcn == -1:
1360       mcn = 0 # max number of Nones allowed
1361     else:
1362       mcn = len(disk.children) - mcn # max number of Nones
1363     for chld_disk in disk.children:
1364       try:
1365         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1366       except errors.BlockDeviceError, err:
1367         if children.count(None) >= mcn:
1368           raise
1369         cdev = None
1370         logging.error("Error in child activation (but continuing): %s",
1371                       str(err))
1372       children.append(cdev)
1373
1374   if as_primary or disk.AssembleOnSecondary():
1375     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1376     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1377     result = r_dev
1378     if as_primary or disk.OpenOnSecondary():
1379       r_dev.Open()
1380     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1381                                 as_primary, disk.iv_name)
1382
1383   else:
1384     result = True
1385   return result
1386
1387
1388 def BlockdevAssemble(disk, owner, as_primary):
1389   """Activate a block device for an instance.
1390
1391   This is a wrapper over _RecursiveAssembleBD.
1392
1393   @rtype: str or boolean
1394   @return: a C{/dev/...} path for primary nodes, and
1395       C{True} for secondary nodes
1396
1397   """
1398   try:
1399     result = _RecursiveAssembleBD(disk, owner, as_primary)
1400     if isinstance(result, bdev.BlockDev):
1401       # pylint: disable-msg=E1103
1402       result = result.dev_path
1403   except errors.BlockDeviceError, err:
1404     _Fail("Error while assembling disk: %s", err, exc=True)
1405
1406   return result
1407
1408
1409 def BlockdevShutdown(disk):
1410   """Shut down a block device.
1411
1412   First, if the device is assembled (Attach() is successful), then
1413   the device is shutdown. Then the children of the device are
1414   shutdown.
1415
1416   This function is called recursively. Note that we don't cache the
1417   children or such, as oppossed to assemble, shutdown of different
1418   devices doesn't require that the upper device was active.
1419
1420   @type disk: L{objects.Disk}
1421   @param disk: the description of the disk we should
1422       shutdown
1423   @rtype: None
1424
1425   """
1426   msgs = []
1427   r_dev = _RecursiveFindBD(disk)
1428   if r_dev is not None:
1429     r_path = r_dev.dev_path
1430     try:
1431       r_dev.Shutdown()
1432       DevCacheManager.RemoveCache(r_path)
1433     except errors.BlockDeviceError, err:
1434       msgs.append(str(err))
1435
1436   if disk.children:
1437     for child in disk.children:
1438       try:
1439         BlockdevShutdown(child)
1440       except RPCFail, err:
1441         msgs.append(str(err))
1442
1443   if msgs:
1444     _Fail("; ".join(msgs))
1445
1446
1447 def BlockdevAddchildren(parent_cdev, new_cdevs):
1448   """Extend a mirrored block device.
1449
1450   @type parent_cdev: L{objects.Disk}
1451   @param parent_cdev: the disk to which we should add children
1452   @type new_cdevs: list of L{objects.Disk}
1453   @param new_cdevs: the list of children which we should add
1454   @rtype: None
1455
1456   """
1457   parent_bdev = _RecursiveFindBD(parent_cdev)
1458   if parent_bdev is None:
1459     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1460   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1461   if new_bdevs.count(None) > 0:
1462     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1463   parent_bdev.AddChildren(new_bdevs)
1464
1465
1466 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1467   """Shrink a mirrored block device.
1468
1469   @type parent_cdev: L{objects.Disk}
1470   @param parent_cdev: the disk from which we should remove children
1471   @type new_cdevs: list of L{objects.Disk}
1472   @param new_cdevs: the list of children which we should remove
1473   @rtype: None
1474
1475   """
1476   parent_bdev = _RecursiveFindBD(parent_cdev)
1477   if parent_bdev is None:
1478     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1479   devs = []
1480   for disk in new_cdevs:
1481     rpath = disk.StaticDevPath()
1482     if rpath is None:
1483       bd = _RecursiveFindBD(disk)
1484       if bd is None:
1485         _Fail("Can't find device %s while removing children", disk)
1486       else:
1487         devs.append(bd.dev_path)
1488     else:
1489       if not utils.IsNormAbsPath(rpath):
1490         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1491       devs.append(rpath)
1492   parent_bdev.RemoveChildren(devs)
1493
1494
1495 def BlockdevGetmirrorstatus(disks):
1496   """Get the mirroring status of a list of devices.
1497
1498   @type disks: list of L{objects.Disk}
1499   @param disks: the list of disks which we should query
1500   @rtype: disk
1501   @return:
1502       a list of (mirror_done, estimated_time) tuples, which
1503       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1504   @raise errors.BlockDeviceError: if any of the disks cannot be
1505       found
1506
1507   """
1508   stats = []
1509   for dsk in disks:
1510     rbd = _RecursiveFindBD(dsk)
1511     if rbd is None:
1512       _Fail("Can't find device %s", dsk)
1513
1514     stats.append(rbd.CombinedSyncStatus())
1515
1516   return stats
1517
1518
1519 def _RecursiveFindBD(disk):
1520   """Check if a device is activated.
1521
1522   If so, return information about the real device.
1523
1524   @type disk: L{objects.Disk}
1525   @param disk: the disk object we need to find
1526
1527   @return: None if the device can't be found,
1528       otherwise the device instance
1529
1530   """
1531   children = []
1532   if disk.children:
1533     for chdisk in disk.children:
1534       children.append(_RecursiveFindBD(chdisk))
1535
1536   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1537
1538
1539 def _OpenRealBD(disk):
1540   """Opens the underlying block device of a disk.
1541
1542   @type disk: L{objects.Disk}
1543   @param disk: the disk object we want to open
1544
1545   """
1546   real_disk = _RecursiveFindBD(disk)
1547   if real_disk is None:
1548     _Fail("Block device '%s' is not set up", disk)
1549
1550   real_disk.Open()
1551
1552   return real_disk
1553
1554
1555 def BlockdevFind(disk):
1556   """Check if a device is activated.
1557
1558   If it is, return information about the real device.
1559
1560   @type disk: L{objects.Disk}
1561   @param disk: the disk to find
1562   @rtype: None or objects.BlockDevStatus
1563   @return: None if the disk cannot be found, otherwise a the current
1564            information
1565
1566   """
1567   try:
1568     rbd = _RecursiveFindBD(disk)
1569   except errors.BlockDeviceError, err:
1570     _Fail("Failed to find device: %s", err, exc=True)
1571
1572   if rbd is None:
1573     return None
1574
1575   return rbd.GetSyncStatus()
1576
1577
1578 def BlockdevGetsize(disks):
1579   """Computes the size of the given disks.
1580
1581   If a disk is not found, returns None instead.
1582
1583   @type disks: list of L{objects.Disk}
1584   @param disks: the list of disk to compute the size for
1585   @rtype: list
1586   @return: list with elements None if the disk cannot be found,
1587       otherwise the size
1588
1589   """
1590   result = []
1591   for cf in disks:
1592     try:
1593       rbd = _RecursiveFindBD(cf)
1594     except errors.BlockDeviceError:
1595       result.append(None)
1596       continue
1597     if rbd is None:
1598       result.append(None)
1599     else:
1600       result.append(rbd.GetActualSize())
1601   return result
1602
1603
1604 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1605   """Export a block device to a remote node.
1606
1607   @type disk: L{objects.Disk}
1608   @param disk: the description of the disk to export
1609   @type dest_node: str
1610   @param dest_node: the destination node to export to
1611   @type dest_path: str
1612   @param dest_path: the destination path on the target node
1613   @type cluster_name: str
1614   @param cluster_name: the cluster name, needed for SSH hostalias
1615   @rtype: None
1616
1617   """
1618   real_disk = _OpenRealBD(disk)
1619
1620   # the block size on the read dd is 1MiB to match our units
1621   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1622                                "dd if=%s bs=1048576 count=%s",
1623                                real_disk.dev_path, str(disk.size))
1624
1625   # we set here a smaller block size as, due to ssh buffering, more
1626   # than 64-128k will mostly ignored; we use nocreat to fail if the
1627   # device is not already there or we pass a wrong path; we use
1628   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1629   # to not buffer too much memory; this means that at best, we flush
1630   # every 64k, which will not be very fast
1631   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1632                                 " oflag=dsync", dest_path)
1633
1634   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1635                                                    constants.GANETI_RUNAS,
1636                                                    destcmd)
1637
1638   # all commands have been checked, so we're safe to combine them
1639   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1640
1641   result = utils.RunCmd(["bash", "-c", command])
1642
1643   if result.failed:
1644     _Fail("Disk copy command '%s' returned error: %s"
1645           " output: %s", command, result.fail_reason, result.output)
1646
1647
1648 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1649   """Write a file to the filesystem.
1650
1651   This allows the master to overwrite(!) a file. It will only perform
1652   the operation if the file belongs to a list of configuration files.
1653
1654   @type file_name: str
1655   @param file_name: the target file name
1656   @type data: str
1657   @param data: the new contents of the file
1658   @type mode: int
1659   @param mode: the mode to give the file (can be None)
1660   @type uid: int
1661   @param uid: the owner of the file (can be -1 for default)
1662   @type gid: int
1663   @param gid: the group of the file (can be -1 for default)
1664   @type atime: float
1665   @param atime: the atime to set on the file (can be None)
1666   @type mtime: float
1667   @param mtime: the mtime to set on the file (can be None)
1668   @rtype: None
1669
1670   """
1671   if not os.path.isabs(file_name):
1672     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1673
1674   if file_name not in _ALLOWED_UPLOAD_FILES:
1675     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1676           file_name)
1677
1678   raw_data = _Decompress(data)
1679
1680   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1681                   atime=atime, mtime=mtime)
1682
1683
1684 def WriteSsconfFiles(values):
1685   """Update all ssconf files.
1686
1687   Wrapper around the SimpleStore.WriteFiles.
1688
1689   """
1690   ssconf.SimpleStore().WriteFiles(values)
1691
1692
1693 def _ErrnoOrStr(err):
1694   """Format an EnvironmentError exception.
1695
1696   If the L{err} argument has an errno attribute, it will be looked up
1697   and converted into a textual C{E...} description. Otherwise the
1698   string representation of the error will be returned.
1699
1700   @type err: L{EnvironmentError}
1701   @param err: the exception to format
1702
1703   """
1704   if hasattr(err, 'errno'):
1705     detail = errno.errorcode[err.errno]
1706   else:
1707     detail = str(err)
1708   return detail
1709
1710
1711 def _OSOndiskAPIVersion(os_dir):
1712   """Compute and return the API version of a given OS.
1713
1714   This function will try to read the API version of the OS residing in
1715   the 'os_dir' directory.
1716
1717   @type os_dir: str
1718   @param os_dir: the directory in which we should look for the OS
1719   @rtype: tuple
1720   @return: tuple (status, data) with status denoting the validity and
1721       data holding either the vaid versions or an error message
1722
1723   """
1724   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1725
1726   try:
1727     st = os.stat(api_file)
1728   except EnvironmentError, err:
1729     return False, ("Required file '%s' not found under path %s: %s" %
1730                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1731
1732   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1733     return False, ("File '%s' in %s is not a regular file" %
1734                    (constants.OS_API_FILE, os_dir))
1735
1736   try:
1737     api_versions = utils.ReadFile(api_file).splitlines()
1738   except EnvironmentError, err:
1739     return False, ("Error while reading the API version file at %s: %s" %
1740                    (api_file, _ErrnoOrStr(err)))
1741
1742   try:
1743     api_versions = [int(version.strip()) for version in api_versions]
1744   except (TypeError, ValueError), err:
1745     return False, ("API version(s) can't be converted to integer: %s" %
1746                    str(err))
1747
1748   return True, api_versions
1749
1750
1751 def DiagnoseOS(top_dirs=None):
1752   """Compute the validity for all OSes.
1753
1754   @type top_dirs: list
1755   @param top_dirs: the list of directories in which to
1756       search (if not given defaults to
1757       L{constants.OS_SEARCH_PATH})
1758   @rtype: list of L{objects.OS}
1759   @return: a list of tuples (name, path, status, diagnose, variants,
1760       parameters, api_version) for all (potential) OSes under all
1761       search paths, where:
1762           - name is the (potential) OS name
1763           - path is the full path to the OS
1764           - status True/False is the validity of the OS
1765           - diagnose is the error message for an invalid OS, otherwise empty
1766           - variants is a list of supported OS variants, if any
1767           - parameters is a list of (name, help) parameters, if any
1768           - api_version is a list of support OS API versions
1769
1770   """
1771   if top_dirs is None:
1772     top_dirs = constants.OS_SEARCH_PATH
1773
1774   result = []
1775   for dir_name in top_dirs:
1776     if os.path.isdir(dir_name):
1777       try:
1778         f_names = utils.ListVisibleFiles(dir_name)
1779       except EnvironmentError, err:
1780         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1781         break
1782       for name in f_names:
1783         os_path = utils.PathJoin(dir_name, name)
1784         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1785         if status:
1786           diagnose = ""
1787           variants = os_inst.supported_variants
1788           parameters = os_inst.supported_parameters
1789           api_versions = os_inst.api_versions
1790         else:
1791           diagnose = os_inst
1792           variants = parameters = api_versions = []
1793         result.append((name, os_path, status, diagnose, variants,
1794                        parameters, api_versions))
1795
1796   return result
1797
1798
1799 def _TryOSFromDisk(name, base_dir=None):
1800   """Create an OS instance from disk.
1801
1802   This function will return an OS instance if the given name is a
1803   valid OS name.
1804
1805   @type base_dir: string
1806   @keyword base_dir: Base directory containing OS installations.
1807                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1808   @rtype: tuple
1809   @return: success and either the OS instance if we find a valid one,
1810       or error message
1811
1812   """
1813   if base_dir is None:
1814     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1815   else:
1816     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1817
1818   if os_dir is None:
1819     return False, "Directory for OS %s not found in search path" % name
1820
1821   status, api_versions = _OSOndiskAPIVersion(os_dir)
1822   if not status:
1823     # push the error up
1824     return status, api_versions
1825
1826   if not constants.OS_API_VERSIONS.intersection(api_versions):
1827     return False, ("API version mismatch for path '%s': found %s, want %s." %
1828                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1829
1830   # OS Files dictionary, we will populate it with the absolute path names
1831   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1832
1833   if max(api_versions) >= constants.OS_API_V15:
1834     os_files[constants.OS_VARIANTS_FILE] = ''
1835
1836   if max(api_versions) >= constants.OS_API_V20:
1837     os_files[constants.OS_PARAMETERS_FILE] = ''
1838   else:
1839     del os_files[constants.OS_SCRIPT_VERIFY]
1840
1841   for filename in os_files:
1842     os_files[filename] = utils.PathJoin(os_dir, filename)
1843
1844     try:
1845       st = os.stat(os_files[filename])
1846     except EnvironmentError, err:
1847       return False, ("File '%s' under path '%s' is missing (%s)" %
1848                      (filename, os_dir, _ErrnoOrStr(err)))
1849
1850     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1851       return False, ("File '%s' under path '%s' is not a regular file" %
1852                      (filename, os_dir))
1853
1854     if filename in constants.OS_SCRIPTS:
1855       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1856         return False, ("File '%s' under path '%s' is not executable" %
1857                        (filename, os_dir))
1858
1859   variants = []
1860   if constants.OS_VARIANTS_FILE in os_files:
1861     variants_file = os_files[constants.OS_VARIANTS_FILE]
1862     try:
1863       variants = utils.ReadFile(variants_file).splitlines()
1864     except EnvironmentError, err:
1865       return False, ("Error while reading the OS variants file at %s: %s" %
1866                      (variants_file, _ErrnoOrStr(err)))
1867     if not variants:
1868       return False, ("No supported os variant found")
1869
1870   parameters = []
1871   if constants.OS_PARAMETERS_FILE in os_files:
1872     parameters_file = os_files[constants.OS_PARAMETERS_FILE]
1873     try:
1874       parameters = utils.ReadFile(parameters_file).splitlines()
1875     except EnvironmentError, err:
1876       return False, ("Error while reading the OS parameters file at %s: %s" %
1877                      (parameters_file, _ErrnoOrStr(err)))
1878     parameters = [v.split(None, 1) for v in parameters]
1879
1880   os_obj = objects.OS(name=name, path=os_dir,
1881                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1882                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1883                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1884                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1885                       verify_script=os_files.get(constants.OS_SCRIPT_VERIFY,
1886                                                  None),
1887                       supported_variants=variants,
1888                       supported_parameters=parameters,
1889                       api_versions=api_versions)
1890   return True, os_obj
1891
1892
1893 def OSFromDisk(name, base_dir=None):
1894   """Create an OS instance from disk.
1895
1896   This function will return an OS instance if the given name is a
1897   valid OS name. Otherwise, it will raise an appropriate
1898   L{RPCFail} exception, detailing why this is not a valid OS.
1899
1900   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1901   an exception but returns true/false status data.
1902
1903   @type base_dir: string
1904   @keyword base_dir: Base directory containing OS installations.
1905                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1906   @rtype: L{objects.OS}
1907   @return: the OS instance if we find a valid one
1908   @raise RPCFail: if we don't find a valid OS
1909
1910   """
1911   name_only = name.split("+", 1)[0]
1912   status, payload = _TryOSFromDisk(name_only, base_dir)
1913
1914   if not status:
1915     _Fail(payload)
1916
1917   return payload
1918
1919
1920 def OSCoreEnv(inst_os, os_params, debug=0):
1921   """Calculate the basic environment for an os script.
1922
1923   @type inst_os: L{objects.OS}
1924   @param inst_os: operating system for which the environment is being built
1925   @type os_params: dict
1926   @param os_params: the OS parameters
1927   @type debug: integer
1928   @param debug: debug level (0 or 1, for OS Api 10)
1929   @rtype: dict
1930   @return: dict of environment variables
1931   @raise errors.BlockDeviceError: if the block device
1932       cannot be found
1933
1934   """
1935   result = {}
1936   api_version = \
1937     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1938   result['OS_API_VERSION'] = '%d' % api_version
1939   result['OS_NAME'] = inst_os.name
1940   result['DEBUG_LEVEL'] = '%d' % debug
1941
1942   # OS variants
1943   if api_version >= constants.OS_API_V15:
1944     try:
1945       variant = inst_os.name.split('+', 1)[1]
1946     except IndexError:
1947       variant = inst_os.supported_variants[0]
1948     result['OS_VARIANT'] = variant
1949
1950   # OS params
1951   for pname, pvalue in os_params.items():
1952     result['OSP_%s' % pname.upper()] = pvalue
1953
1954   return result
1955
1956
1957 def OSEnvironment(instance, inst_os, debug=0):
1958   """Calculate the environment for an os script.
1959
1960   @type instance: L{objects.Instance}
1961   @param instance: target instance for the os script run
1962   @type inst_os: L{objects.OS}
1963   @param inst_os: operating system for which the environment is being built
1964   @type debug: integer
1965   @param debug: debug level (0 or 1, for OS Api 10)
1966   @rtype: dict
1967   @return: dict of environment variables
1968   @raise errors.BlockDeviceError: if the block device
1969       cannot be found
1970
1971   """
1972   result = OSCoreEnv(inst_os, instance.osparams, debug=debug)
1973
1974   result['INSTANCE_NAME'] = instance.name
1975   result['INSTANCE_OS'] = instance.os
1976   result['HYPERVISOR'] = instance.hypervisor
1977   result['DISK_COUNT'] = '%d' % len(instance.disks)
1978   result['NIC_COUNT'] = '%d' % len(instance.nics)
1979
1980   # Disks
1981   for idx, disk in enumerate(instance.disks):
1982     real_disk = _OpenRealBD(disk)
1983     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1984     result['DISK_%d_ACCESS' % idx] = disk.mode
1985     if constants.HV_DISK_TYPE in instance.hvparams:
1986       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1987         instance.hvparams[constants.HV_DISK_TYPE]
1988     if disk.dev_type in constants.LDS_BLOCK:
1989       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1990     elif disk.dev_type == constants.LD_FILE:
1991       result['DISK_%d_BACKEND_TYPE' % idx] = \
1992         'file:%s' % disk.physical_id[0]
1993
1994   # NICs
1995   for idx, nic in enumerate(instance.nics):
1996     result['NIC_%d_MAC' % idx] = nic.mac
1997     if nic.ip:
1998       result['NIC_%d_IP' % idx] = nic.ip
1999     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
2000     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2001       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
2002     if nic.nicparams[constants.NIC_LINK]:
2003       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
2004     if constants.HV_NIC_TYPE in instance.hvparams:
2005       result['NIC_%d_FRONTEND_TYPE' % idx] = \
2006         instance.hvparams[constants.HV_NIC_TYPE]
2007
2008   # HV/BE params
2009   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
2010     for key, value in source.items():
2011       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
2012
2013   return result
2014
2015
2016 def BlockdevGrow(disk, amount):
2017   """Grow a stack of block devices.
2018
2019   This function is called recursively, with the childrens being the
2020   first ones to resize.
2021
2022   @type disk: L{objects.Disk}
2023   @param disk: the disk to be grown
2024   @rtype: (status, result)
2025   @return: a tuple with the status of the operation
2026       (True/False), and the errors message if status
2027       is False
2028
2029   """
2030   r_dev = _RecursiveFindBD(disk)
2031   if r_dev is None:
2032     _Fail("Cannot find block device %s", disk)
2033
2034   try:
2035     r_dev.Grow(amount)
2036   except errors.BlockDeviceError, err:
2037     _Fail("Failed to grow block device: %s", err, exc=True)
2038
2039
2040 def BlockdevSnapshot(disk):
2041   """Create a snapshot copy of a block device.
2042
2043   This function is called recursively, and the snapshot is actually created
2044   just for the leaf lvm backend device.
2045
2046   @type disk: L{objects.Disk}
2047   @param disk: the disk to be snapshotted
2048   @rtype: string
2049   @return: snapshot disk path
2050
2051   """
2052   if disk.dev_type == constants.LD_DRBD8:
2053     if not disk.children:
2054       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2055             disk.unique_id)
2056     return BlockdevSnapshot(disk.children[0])
2057   elif disk.dev_type == constants.LD_LV:
2058     r_dev = _RecursiveFindBD(disk)
2059     if r_dev is not None:
2060       # FIXME: choose a saner value for the snapshot size
2061       # let's stay on the safe side and ask for the full size, for now
2062       return r_dev.Snapshot(disk.size)
2063     else:
2064       _Fail("Cannot find block device %s", disk)
2065   else:
2066     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2067           disk.unique_id, disk.dev_type)
2068
2069
2070 def FinalizeExport(instance, snap_disks):
2071   """Write out the export configuration information.
2072
2073   @type instance: L{objects.Instance}
2074   @param instance: the instance which we export, used for
2075       saving configuration
2076   @type snap_disks: list of L{objects.Disk}
2077   @param snap_disks: list of snapshot block devices, which
2078       will be used to get the actual name of the dump file
2079
2080   @rtype: None
2081
2082   """
2083   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2084   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2085
2086   config = objects.SerializableConfigParser()
2087
2088   config.add_section(constants.INISECT_EXP)
2089   config.set(constants.INISECT_EXP, 'version', '0')
2090   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2091   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2092   config.set(constants.INISECT_EXP, 'os', instance.os)
2093   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2094
2095   config.add_section(constants.INISECT_INS)
2096   config.set(constants.INISECT_INS, 'name', instance.name)
2097   config.set(constants.INISECT_INS, 'memory', '%d' %
2098              instance.beparams[constants.BE_MEMORY])
2099   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2100              instance.beparams[constants.BE_VCPUS])
2101   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2102   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2103
2104   nic_total = 0
2105   for nic_count, nic in enumerate(instance.nics):
2106     nic_total += 1
2107     config.set(constants.INISECT_INS, 'nic%d_mac' %
2108                nic_count, '%s' % nic.mac)
2109     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2110     for param in constants.NICS_PARAMETER_TYPES:
2111       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2112                  '%s' % nic.nicparams.get(param, None))
2113   # TODO: redundant: on load can read nics until it doesn't exist
2114   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2115
2116   disk_total = 0
2117   for disk_count, disk in enumerate(snap_disks):
2118     if disk:
2119       disk_total += 1
2120       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2121                  ('%s' % disk.iv_name))
2122       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2123                  ('%s' % disk.physical_id[1]))
2124       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2125                  ('%d' % disk.size))
2126
2127   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2128
2129   # New-style hypervisor/backend parameters
2130
2131   config.add_section(constants.INISECT_HYP)
2132   for name, value in instance.hvparams.items():
2133     if name not in constants.HVC_GLOBALS:
2134       config.set(constants.INISECT_HYP, name, str(value))
2135
2136   config.add_section(constants.INISECT_BEP)
2137   for name, value in instance.beparams.items():
2138     config.set(constants.INISECT_BEP, name, str(value))
2139
2140   config.add_section(constants.INISECT_OSP)
2141   for name, value in instance.osparams.items():
2142     config.set(constants.INISECT_OSP, name, str(value))
2143
2144   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2145                   data=config.Dumps())
2146   shutil.rmtree(finaldestdir, ignore_errors=True)
2147   shutil.move(destdir, finaldestdir)
2148
2149
2150 def ExportInfo(dest):
2151   """Get export configuration information.
2152
2153   @type dest: str
2154   @param dest: directory containing the export
2155
2156   @rtype: L{objects.SerializableConfigParser}
2157   @return: a serializable config file containing the
2158       export info
2159
2160   """
2161   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2162
2163   config = objects.SerializableConfigParser()
2164   config.read(cff)
2165
2166   if (not config.has_section(constants.INISECT_EXP) or
2167       not config.has_section(constants.INISECT_INS)):
2168     _Fail("Export info file doesn't have the required fields")
2169
2170   return config.Dumps()
2171
2172
2173 def ListExports():
2174   """Return a list of exports currently available on this machine.
2175
2176   @rtype: list
2177   @return: list of the exports
2178
2179   """
2180   if os.path.isdir(constants.EXPORT_DIR):
2181     return sorted(utils.ListVisibleFiles(constants.EXPORT_DIR))
2182   else:
2183     _Fail("No exports directory")
2184
2185
2186 def RemoveExport(export):
2187   """Remove an existing export from the node.
2188
2189   @type export: str
2190   @param export: the name of the export to remove
2191   @rtype: None
2192
2193   """
2194   target = utils.PathJoin(constants.EXPORT_DIR, export)
2195
2196   try:
2197     shutil.rmtree(target)
2198   except EnvironmentError, err:
2199     _Fail("Error while removing the export: %s", err, exc=True)
2200
2201
2202 def BlockdevRename(devlist):
2203   """Rename a list of block devices.
2204
2205   @type devlist: list of tuples
2206   @param devlist: list of tuples of the form  (disk,
2207       new_logical_id, new_physical_id); disk is an
2208       L{objects.Disk} object describing the current disk,
2209       and new logical_id/physical_id is the name we
2210       rename it to
2211   @rtype: boolean
2212   @return: True if all renames succeeded, False otherwise
2213
2214   """
2215   msgs = []
2216   result = True
2217   for disk, unique_id in devlist:
2218     dev = _RecursiveFindBD(disk)
2219     if dev is None:
2220       msgs.append("Can't find device %s in rename" % str(disk))
2221       result = False
2222       continue
2223     try:
2224       old_rpath = dev.dev_path
2225       dev.Rename(unique_id)
2226       new_rpath = dev.dev_path
2227       if old_rpath != new_rpath:
2228         DevCacheManager.RemoveCache(old_rpath)
2229         # FIXME: we should add the new cache information here, like:
2230         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2231         # but we don't have the owner here - maybe parse from existing
2232         # cache? for now, we only lose lvm data when we rename, which
2233         # is less critical than DRBD or MD
2234     except errors.BlockDeviceError, err:
2235       msgs.append("Can't rename device '%s' to '%s': %s" %
2236                   (dev, unique_id, err))
2237       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2238       result = False
2239   if not result:
2240     _Fail("; ".join(msgs))
2241
2242
2243 def _TransformFileStorageDir(file_storage_dir):
2244   """Checks whether given file_storage_dir is valid.
2245
2246   Checks wheter the given file_storage_dir is within the cluster-wide
2247   default file_storage_dir stored in SimpleStore. Only paths under that
2248   directory are allowed.
2249
2250   @type file_storage_dir: str
2251   @param file_storage_dir: the path to check
2252
2253   @return: the normalized path if valid, None otherwise
2254
2255   """
2256   if not constants.ENABLE_FILE_STORAGE:
2257     _Fail("File storage disabled at configure time")
2258   cfg = _GetConfig()
2259   file_storage_dir = os.path.normpath(file_storage_dir)
2260   base_file_storage_dir = cfg.GetFileStorageDir()
2261   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2262       base_file_storage_dir):
2263     _Fail("File storage directory '%s' is not under base file"
2264           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2265   return file_storage_dir
2266
2267
2268 def CreateFileStorageDir(file_storage_dir):
2269   """Create file storage directory.
2270
2271   @type file_storage_dir: str
2272   @param file_storage_dir: directory to create
2273
2274   @rtype: tuple
2275   @return: tuple with first element a boolean indicating wheter dir
2276       creation was successful or not
2277
2278   """
2279   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2280   if os.path.exists(file_storage_dir):
2281     if not os.path.isdir(file_storage_dir):
2282       _Fail("Specified storage dir '%s' is not a directory",
2283             file_storage_dir)
2284   else:
2285     try:
2286       os.makedirs(file_storage_dir, 0750)
2287     except OSError, err:
2288       _Fail("Cannot create file storage directory '%s': %s",
2289             file_storage_dir, err, exc=True)
2290
2291
2292 def RemoveFileStorageDir(file_storage_dir):
2293   """Remove file storage directory.
2294
2295   Remove it only if it's empty. If not log an error and return.
2296
2297   @type file_storage_dir: str
2298   @param file_storage_dir: the directory we should cleanup
2299   @rtype: tuple (success,)
2300   @return: tuple of one element, C{success}, denoting
2301       whether the operation was successful
2302
2303   """
2304   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2305   if os.path.exists(file_storage_dir):
2306     if not os.path.isdir(file_storage_dir):
2307       _Fail("Specified Storage directory '%s' is not a directory",
2308             file_storage_dir)
2309     # deletes dir only if empty, otherwise we want to fail the rpc call
2310     try:
2311       os.rmdir(file_storage_dir)
2312     except OSError, err:
2313       _Fail("Cannot remove file storage directory '%s': %s",
2314             file_storage_dir, err)
2315
2316
2317 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2318   """Rename the file storage directory.
2319
2320   @type old_file_storage_dir: str
2321   @param old_file_storage_dir: the current path
2322   @type new_file_storage_dir: str
2323   @param new_file_storage_dir: the name we should rename to
2324   @rtype: tuple (success,)
2325   @return: tuple of one element, C{success}, denoting
2326       whether the operation was successful
2327
2328   """
2329   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2330   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2331   if not os.path.exists(new_file_storage_dir):
2332     if os.path.isdir(old_file_storage_dir):
2333       try:
2334         os.rename(old_file_storage_dir, new_file_storage_dir)
2335       except OSError, err:
2336         _Fail("Cannot rename '%s' to '%s': %s",
2337               old_file_storage_dir, new_file_storage_dir, err)
2338     else:
2339       _Fail("Specified storage dir '%s' is not a directory",
2340             old_file_storage_dir)
2341   else:
2342     if os.path.exists(old_file_storage_dir):
2343       _Fail("Cannot rename '%s' to '%s': both locations exist",
2344             old_file_storage_dir, new_file_storage_dir)
2345
2346
2347 def _EnsureJobQueueFile(file_name):
2348   """Checks whether the given filename is in the queue directory.
2349
2350   @type file_name: str
2351   @param file_name: the file name we should check
2352   @rtype: None
2353   @raises RPCFail: if the file is not valid
2354
2355   """
2356   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2357   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2358
2359   if not result:
2360     _Fail("Passed job queue file '%s' does not belong to"
2361           " the queue directory '%s'", file_name, queue_dir)
2362
2363
2364 def JobQueueUpdate(file_name, content):
2365   """Updates a file in the queue directory.
2366
2367   This is just a wrapper over L{utils.WriteFile}, with proper
2368   checking.
2369
2370   @type file_name: str
2371   @param file_name: the job file name
2372   @type content: str
2373   @param content: the new job contents
2374   @rtype: boolean
2375   @return: the success of the operation
2376
2377   """
2378   _EnsureJobQueueFile(file_name)
2379
2380   # Write and replace the file atomically
2381   utils.WriteFile(file_name, data=_Decompress(content))
2382
2383
2384 def JobQueueRename(old, new):
2385   """Renames a job queue file.
2386
2387   This is just a wrapper over os.rename with proper checking.
2388
2389   @type old: str
2390   @param old: the old (actual) file name
2391   @type new: str
2392   @param new: the desired file name
2393   @rtype: tuple
2394   @return: the success of the operation and payload
2395
2396   """
2397   _EnsureJobQueueFile(old)
2398   _EnsureJobQueueFile(new)
2399
2400   utils.RenameFile(old, new, mkdir=True)
2401
2402
2403 def BlockdevClose(instance_name, disks):
2404   """Closes the given block devices.
2405
2406   This means they will be switched to secondary mode (in case of
2407   DRBD).
2408
2409   @param instance_name: if the argument is not empty, the symlinks
2410       of this instance will be removed
2411   @type disks: list of L{objects.Disk}
2412   @param disks: the list of disks to be closed
2413   @rtype: tuple (success, message)
2414   @return: a tuple of success and message, where success
2415       indicates the succes of the operation, and message
2416       which will contain the error details in case we
2417       failed
2418
2419   """
2420   bdevs = []
2421   for cf in disks:
2422     rd = _RecursiveFindBD(cf)
2423     if rd is None:
2424       _Fail("Can't find device %s", cf)
2425     bdevs.append(rd)
2426
2427   msg = []
2428   for rd in bdevs:
2429     try:
2430       rd.Close()
2431     except errors.BlockDeviceError, err:
2432       msg.append(str(err))
2433   if msg:
2434     _Fail("Can't make devices secondary: %s", ",".join(msg))
2435   else:
2436     if instance_name:
2437       _RemoveBlockDevLinks(instance_name, disks)
2438
2439
2440 def ValidateHVParams(hvname, hvparams):
2441   """Validates the given hypervisor parameters.
2442
2443   @type hvname: string
2444   @param hvname: the hypervisor name
2445   @type hvparams: dict
2446   @param hvparams: the hypervisor parameters to be validated
2447   @rtype: None
2448
2449   """
2450   try:
2451     hv_type = hypervisor.GetHypervisor(hvname)
2452     hv_type.ValidateParameters(hvparams)
2453   except errors.HypervisorError, err:
2454     _Fail(str(err), log=False)
2455
2456
2457 def _CheckOSPList(os_obj, parameters):
2458   """Check whether a list of parameters is supported by the OS.
2459
2460   @type os_obj: L{objects.OS}
2461   @param os_obj: OS object to check
2462   @type parameters: list
2463   @param parameters: the list of parameters to check
2464
2465   """
2466   supported = [v[0] for v in os_obj.supported_parameters]
2467   delta = frozenset(parameters).difference(supported)
2468   if delta:
2469     _Fail("The following parameters are not supported"
2470           " by the OS %s: %s" % (os_obj.name, utils.CommaJoin(delta)))
2471
2472
2473 def ValidateOS(required, osname, checks, osparams):
2474   """Validate the given OS' parameters.
2475
2476   @type required: boolean
2477   @param required: whether absence of the OS should translate into
2478       failure or not
2479   @type osname: string
2480   @param osname: the OS to be validated
2481   @type checks: list
2482   @param checks: list of the checks to run (currently only 'parameters')
2483   @type osparams: dict
2484   @param osparams: dictionary with OS parameters
2485   @rtype: boolean
2486   @return: True if the validation passed, or False if the OS was not
2487       found and L{required} was false
2488
2489   """
2490   if not constants.OS_VALIDATE_CALLS.issuperset(checks):
2491     _Fail("Unknown checks required for OS %s: %s", osname,
2492           set(checks).difference(constants.OS_VALIDATE_CALLS))
2493
2494   name_only = osname.split("+", 1)[0]
2495   status, tbv = _TryOSFromDisk(name_only, None)
2496
2497   if not status:
2498     if required:
2499       _Fail(tbv)
2500     else:
2501       return False
2502
2503   if max(tbv.api_versions) < constants.OS_API_V20:
2504     return True
2505
2506   if constants.OS_VALIDATE_PARAMETERS in checks:
2507     _CheckOSPList(tbv, osparams.keys())
2508
2509   validate_env = OSCoreEnv(tbv, osparams)
2510   result = utils.RunCmd([tbv.verify_script] + checks, env=validate_env,
2511                         cwd=tbv.path)
2512   if result.failed:
2513     logging.error("os validate command '%s' returned error: %s output: %s",
2514                   result.cmd, result.fail_reason, result.output)
2515     _Fail("OS validation script failed (%s), output: %s",
2516           result.fail_reason, result.output, log=False)
2517
2518   return True
2519
2520
2521 def DemoteFromMC():
2522   """Demotes the current node from master candidate role.
2523
2524   """
2525   # try to ensure we're not the master by mistake
2526   master, myself = ssconf.GetMasterAndMyself()
2527   if master == myself:
2528     _Fail("ssconf status shows I'm the master node, will not demote")
2529
2530   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2531   if not result.failed:
2532     _Fail("The master daemon is running, will not demote")
2533
2534   try:
2535     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2536       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2537   except EnvironmentError, err:
2538     if err.errno != errno.ENOENT:
2539       _Fail("Error while backing up cluster file: %s", err, exc=True)
2540
2541   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2542
2543
2544 def _GetX509Filenames(cryptodir, name):
2545   """Returns the full paths for the private key and certificate.
2546
2547   """
2548   return (utils.PathJoin(cryptodir, name),
2549           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2550           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2551
2552
2553 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2554   """Creates a new X509 certificate for SSL/TLS.
2555
2556   @type validity: int
2557   @param validity: Validity in seconds
2558   @rtype: tuple; (string, string)
2559   @return: Certificate name and public part
2560
2561   """
2562   (key_pem, cert_pem) = \
2563     utils.GenerateSelfSignedX509Cert(netutils.Hostname.GetSysName(),
2564                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2565
2566   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2567                               prefix="x509-%s-" % utils.TimestampForFilename())
2568   try:
2569     name = os.path.basename(cert_dir)
2570     assert len(name) > 5
2571
2572     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2573
2574     utils.WriteFile(key_file, mode=0400, data=key_pem)
2575     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2576
2577     # Never return private key as it shouldn't leave the node
2578     return (name, cert_pem)
2579   except Exception:
2580     shutil.rmtree(cert_dir, ignore_errors=True)
2581     raise
2582
2583
2584 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2585   """Removes a X509 certificate.
2586
2587   @type name: string
2588   @param name: Certificate name
2589
2590   """
2591   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2592
2593   utils.RemoveFile(key_file)
2594   utils.RemoveFile(cert_file)
2595
2596   try:
2597     os.rmdir(cert_dir)
2598   except EnvironmentError, err:
2599     _Fail("Cannot remove certificate directory '%s': %s",
2600           cert_dir, err)
2601
2602
2603 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2604   """Returns the command for the requested input/output.
2605
2606   @type instance: L{objects.Instance}
2607   @param instance: The instance object
2608   @param mode: Import/export mode
2609   @param ieio: Input/output type
2610   @param ieargs: Input/output arguments
2611
2612   """
2613   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2614
2615   env = None
2616   prefix = None
2617   suffix = None
2618   exp_size = None
2619
2620   if ieio == constants.IEIO_FILE:
2621     (filename, ) = ieargs
2622
2623     if not utils.IsNormAbsPath(filename):
2624       _Fail("Path '%s' is not normalized or absolute", filename)
2625
2626     directory = os.path.normpath(os.path.dirname(filename))
2627
2628     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2629         constants.EXPORT_DIR):
2630       _Fail("File '%s' is not under exports directory '%s'",
2631             filename, constants.EXPORT_DIR)
2632
2633     # Create directory
2634     utils.Makedirs(directory, mode=0750)
2635
2636     quoted_filename = utils.ShellQuote(filename)
2637
2638     if mode == constants.IEM_IMPORT:
2639       suffix = "> %s" % quoted_filename
2640     elif mode == constants.IEM_EXPORT:
2641       suffix = "< %s" % quoted_filename
2642
2643       # Retrieve file size
2644       try:
2645         st = os.stat(filename)
2646       except EnvironmentError, err:
2647         logging.error("Can't stat(2) %s: %s", filename, err)
2648       else:
2649         exp_size = utils.BytesToMebibyte(st.st_size)
2650
2651   elif ieio == constants.IEIO_RAW_DISK:
2652     (disk, ) = ieargs
2653
2654     real_disk = _OpenRealBD(disk)
2655
2656     if mode == constants.IEM_IMPORT:
2657       # we set here a smaller block size as, due to transport buffering, more
2658       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2659       # is not already there or we pass a wrong path; we use notrunc to no
2660       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2661       # much memory; this means that at best, we flush every 64k, which will
2662       # not be very fast
2663       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2664                                     " bs=%s oflag=dsync"),
2665                                     real_disk.dev_path,
2666                                     str(64 * 1024))
2667
2668     elif mode == constants.IEM_EXPORT:
2669       # the block size on the read dd is 1MiB to match our units
2670       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2671                                    real_disk.dev_path,
2672                                    str(1024 * 1024), # 1 MB
2673                                    str(disk.size))
2674       exp_size = disk.size
2675
2676   elif ieio == constants.IEIO_SCRIPT:
2677     (disk, disk_index, ) = ieargs
2678
2679     assert isinstance(disk_index, (int, long))
2680
2681     real_disk = _OpenRealBD(disk)
2682
2683     inst_os = OSFromDisk(instance.os)
2684     env = OSEnvironment(instance, inst_os)
2685
2686     if mode == constants.IEM_IMPORT:
2687       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2688       env["IMPORT_INDEX"] = str(disk_index)
2689       script = inst_os.import_script
2690
2691     elif mode == constants.IEM_EXPORT:
2692       env["EXPORT_DEVICE"] = real_disk.dev_path
2693       env["EXPORT_INDEX"] = str(disk_index)
2694       script = inst_os.export_script
2695
2696     # TODO: Pass special environment only to script
2697     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2698
2699     if mode == constants.IEM_IMPORT:
2700       suffix = "| %s" % script_cmd
2701
2702     elif mode == constants.IEM_EXPORT:
2703       prefix = "%s |" % script_cmd
2704
2705     # Let script predict size
2706     exp_size = constants.IE_CUSTOM_SIZE
2707
2708   else:
2709     _Fail("Invalid %s I/O mode %r", mode, ieio)
2710
2711   return (env, prefix, suffix, exp_size)
2712
2713
2714 def _CreateImportExportStatusDir(prefix):
2715   """Creates status directory for import/export.
2716
2717   """
2718   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2719                           prefix=("%s-%s-" %
2720                                   (prefix, utils.TimestampForFilename())))
2721
2722
2723 def StartImportExportDaemon(mode, opts, host, port, instance, ieio, ieioargs):
2724   """Starts an import or export daemon.
2725
2726   @param mode: Import/output mode
2727   @type opts: L{objects.ImportExportOptions}
2728   @param opts: Daemon options
2729   @type host: string
2730   @param host: Remote host for export (None for import)
2731   @type port: int
2732   @param port: Remote port for export (None for import)
2733   @type instance: L{objects.Instance}
2734   @param instance: Instance object
2735   @param ieio: Input/output type
2736   @param ieioargs: Input/output arguments
2737
2738   """
2739   if mode == constants.IEM_IMPORT:
2740     prefix = "import"
2741
2742     if not (host is None and port is None):
2743       _Fail("Can not specify host or port on import")
2744
2745   elif mode == constants.IEM_EXPORT:
2746     prefix = "export"
2747
2748     if host is None or port is None:
2749       _Fail("Host and port must be specified for an export")
2750
2751   else:
2752     _Fail("Invalid mode %r", mode)
2753
2754   if (opts.key_name is None) ^ (opts.ca_pem is None):
2755     _Fail("Cluster certificate can only be used for both key and CA")
2756
2757   (cmd_env, cmd_prefix, cmd_suffix, exp_size) = \
2758     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2759
2760   if opts.key_name is None:
2761     # Use server.pem
2762     key_path = constants.NODED_CERT_FILE
2763     cert_path = constants.NODED_CERT_FILE
2764     assert opts.ca_pem is None
2765   else:
2766     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2767                                                  opts.key_name)
2768     assert opts.ca_pem is not None
2769
2770   for i in [key_path, cert_path]:
2771     if not os.path.exists(i):
2772       _Fail("File '%s' does not exist" % i)
2773
2774   status_dir = _CreateImportExportStatusDir(prefix)
2775   try:
2776     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2777     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2778     ca_file = utils.PathJoin(status_dir, _IES_CA_FILE)
2779
2780     if opts.ca_pem is None:
2781       # Use server.pem
2782       ca = utils.ReadFile(constants.NODED_CERT_FILE)
2783     else:
2784       ca = opts.ca_pem
2785
2786     # Write CA file
2787     utils.WriteFile(ca_file, data=ca, mode=0400)
2788
2789     cmd = [
2790       constants.IMPORT_EXPORT_DAEMON,
2791       status_file, mode,
2792       "--key=%s" % key_path,
2793       "--cert=%s" % cert_path,
2794       "--ca=%s" % ca_file,
2795       ]
2796
2797     if host:
2798       cmd.append("--host=%s" % host)
2799
2800     if port:
2801       cmd.append("--port=%s" % port)
2802
2803     if opts.compress:
2804       cmd.append("--compress=%s" % opts.compress)
2805
2806     if opts.magic:
2807       cmd.append("--magic=%s" % opts.magic)
2808
2809     if exp_size is not None:
2810       cmd.append("--expected-size=%s" % exp_size)
2811
2812     if cmd_prefix:
2813       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2814
2815     if cmd_suffix:
2816       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2817
2818     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2819
2820     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2821     # support for receiving a file descriptor for output
2822     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2823                       output=logfile)
2824
2825     # The import/export name is simply the status directory name
2826     return os.path.basename(status_dir)
2827
2828   except Exception:
2829     shutil.rmtree(status_dir, ignore_errors=True)
2830     raise
2831
2832
2833 def GetImportExportStatus(names):
2834   """Returns import/export daemon status.
2835
2836   @type names: sequence
2837   @param names: List of names
2838   @rtype: List of dicts
2839   @return: Returns a list of the state of each named import/export or None if a
2840            status couldn't be read
2841
2842   """
2843   result = []
2844
2845   for name in names:
2846     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2847                                  _IES_STATUS_FILE)
2848
2849     try:
2850       data = utils.ReadFile(status_file)
2851     except EnvironmentError, err:
2852       if err.errno != errno.ENOENT:
2853         raise
2854       data = None
2855
2856     if not data:
2857       result.append(None)
2858       continue
2859
2860     result.append(serializer.LoadJson(data))
2861
2862   return result
2863
2864
2865 def AbortImportExport(name):
2866   """Sends SIGTERM to a running import/export daemon.
2867
2868   """
2869   logging.info("Abort import/export %s", name)
2870
2871   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2872   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2873
2874   if pid:
2875     logging.info("Import/export %s is running with PID %s, sending SIGTERM",
2876                  name, pid)
2877     utils.IgnoreProcessNotFound(os.kill, pid, signal.SIGTERM)
2878
2879
2880 def CleanupImportExport(name):
2881   """Cleanup after an import or export.
2882
2883   If the import/export daemon is still running it's killed. Afterwards the
2884   whole status directory is removed.
2885
2886   """
2887   logging.info("Finalizing import/export %s", name)
2888
2889   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2890
2891   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2892
2893   if pid:
2894     logging.info("Import/export %s is still running with PID %s",
2895                  name, pid)
2896     utils.KillProcess(pid, waitpid=False)
2897
2898   shutil.rmtree(status_dir, ignore_errors=True)
2899
2900
2901 def _FindDisks(nodes_ip, disks):
2902   """Sets the physical ID on disks and returns the block devices.
2903
2904   """
2905   # set the correct physical ID
2906   my_name = netutils.Hostname.GetSysName()
2907   for cf in disks:
2908     cf.SetPhysicalID(my_name, nodes_ip)
2909
2910   bdevs = []
2911
2912   for cf in disks:
2913     rd = _RecursiveFindBD(cf)
2914     if rd is None:
2915       _Fail("Can't find device %s", cf)
2916     bdevs.append(rd)
2917   return bdevs
2918
2919
2920 def DrbdDisconnectNet(nodes_ip, disks):
2921   """Disconnects the network on a list of drbd devices.
2922
2923   """
2924   bdevs = _FindDisks(nodes_ip, disks)
2925
2926   # disconnect disks
2927   for rd in bdevs:
2928     try:
2929       rd.DisconnectNet()
2930     except errors.BlockDeviceError, err:
2931       _Fail("Can't change network configuration to standalone mode: %s",
2932             err, exc=True)
2933
2934
2935 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2936   """Attaches the network on a list of drbd devices.
2937
2938   """
2939   bdevs = _FindDisks(nodes_ip, disks)
2940
2941   if multimaster:
2942     for idx, rd in enumerate(bdevs):
2943       try:
2944         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2945       except EnvironmentError, err:
2946         _Fail("Can't create symlink: %s", err)
2947   # reconnect disks, switch to new master configuration and if
2948   # needed primary mode
2949   for rd in bdevs:
2950     try:
2951       rd.AttachNet(multimaster)
2952     except errors.BlockDeviceError, err:
2953       _Fail("Can't change network configuration: %s", err)
2954
2955   # wait until the disks are connected; we need to retry the re-attach
2956   # if the device becomes standalone, as this might happen if the one
2957   # node disconnects and reconnects in a different mode before the
2958   # other node reconnects; in this case, one or both of the nodes will
2959   # decide it has wrong configuration and switch to standalone
2960
2961   def _Attach():
2962     all_connected = True
2963
2964     for rd in bdevs:
2965       stats = rd.GetProcStatus()
2966
2967       all_connected = (all_connected and
2968                        (stats.is_connected or stats.is_in_resync))
2969
2970       if stats.is_standalone:
2971         # peer had different config info and this node became
2972         # standalone, even though this should not happen with the
2973         # new staged way of changing disk configs
2974         try:
2975           rd.AttachNet(multimaster)
2976         except errors.BlockDeviceError, err:
2977           _Fail("Can't change network configuration: %s", err)
2978
2979     if not all_connected:
2980       raise utils.RetryAgain()
2981
2982   try:
2983     # Start with a delay of 100 miliseconds and go up to 5 seconds
2984     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2985   except utils.RetryTimeout:
2986     _Fail("Timeout in disk reconnecting")
2987
2988   if multimaster:
2989     # change to primary mode
2990     for rd in bdevs:
2991       try:
2992         rd.Open()
2993       except errors.BlockDeviceError, err:
2994         _Fail("Can't change to primary mode: %s", err)
2995
2996
2997 def DrbdWaitSync(nodes_ip, disks):
2998   """Wait until DRBDs have synchronized.
2999
3000   """
3001   def _helper(rd):
3002     stats = rd.GetProcStatus()
3003     if not (stats.is_connected or stats.is_in_resync):
3004       raise utils.RetryAgain()
3005     return stats
3006
3007   bdevs = _FindDisks(nodes_ip, disks)
3008
3009   min_resync = 100
3010   alldone = True
3011   for rd in bdevs:
3012     try:
3013       # poll each second for 15 seconds
3014       stats = utils.Retry(_helper, 1, 15, args=[rd])
3015     except utils.RetryTimeout:
3016       stats = rd.GetProcStatus()
3017       # last check
3018       if not (stats.is_connected or stats.is_in_resync):
3019         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
3020     alldone = alldone and (not stats.is_in_resync)
3021     if stats.sync_percent is not None:
3022       min_resync = min(min_resync, stats.sync_percent)
3023
3024   return (alldone, min_resync)
3025
3026
3027 def GetDrbdUsermodeHelper():
3028   """Returns DRBD usermode helper currently configured.
3029
3030   """
3031   try:
3032     return bdev.BaseDRBD.GetUsermodeHelper()
3033   except errors.BlockDeviceError, err:
3034     _Fail(str(err))
3035
3036
3037 def PowercycleNode(hypervisor_type):
3038   """Hard-powercycle the node.
3039
3040   Because we need to return first, and schedule the powercycle in the
3041   background, we won't be able to report failures nicely.
3042
3043   """
3044   hyper = hypervisor.GetHypervisor(hypervisor_type)
3045   try:
3046     pid = os.fork()
3047   except OSError:
3048     # if we can't fork, we'll pretend that we're in the child process
3049     pid = 0
3050   if pid > 0:
3051     return "Reboot scheduled in 5 seconds"
3052   # ensure the child is running on ram
3053   try:
3054     utils.Mlockall()
3055   except Exception: # pylint: disable-msg=W0703
3056     pass
3057   time.sleep(5)
3058   hyper.PowercycleNode()
3059
3060
3061 class HooksRunner(object):
3062   """Hook runner.
3063
3064   This class is instantiated on the node side (ganeti-noded) and not
3065   on the master side.
3066
3067   """
3068   def __init__(self, hooks_base_dir=None):
3069     """Constructor for hooks runner.
3070
3071     @type hooks_base_dir: str or None
3072     @param hooks_base_dir: if not None, this overrides the
3073         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3074
3075     """
3076     if hooks_base_dir is None:
3077       hooks_base_dir = constants.HOOKS_BASE_DIR
3078     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3079     # constant
3080     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3081
3082   def RunHooks(self, hpath, phase, env):
3083     """Run the scripts in the hooks directory.
3084
3085     @type hpath: str
3086     @param hpath: the path to the hooks directory which
3087         holds the scripts
3088     @type phase: str
3089     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3090         L{constants.HOOKS_PHASE_POST}
3091     @type env: dict
3092     @param env: dictionary with the environment for the hook
3093     @rtype: list
3094     @return: list of 3-element tuples:
3095       - script path
3096       - script result, either L{constants.HKR_SUCCESS} or
3097         L{constants.HKR_FAIL}
3098       - output of the script
3099
3100     @raise errors.ProgrammerError: for invalid input
3101         parameters
3102
3103     """
3104     if phase == constants.HOOKS_PHASE_PRE:
3105       suffix = "pre"
3106     elif phase == constants.HOOKS_PHASE_POST:
3107       suffix = "post"
3108     else:
3109       _Fail("Unknown hooks phase '%s'", phase)
3110
3111
3112     subdir = "%s-%s.d" % (hpath, suffix)
3113     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3114
3115     results = []
3116
3117     if not os.path.isdir(dir_name):
3118       # for non-existing/non-dirs, we simply exit instead of logging a
3119       # warning at every operation
3120       return results
3121
3122     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3123
3124     for (relname, relstatus, runresult)  in runparts_results:
3125       if relstatus == constants.RUNPARTS_SKIP:
3126         rrval = constants.HKR_SKIP
3127         output = ""
3128       elif relstatus == constants.RUNPARTS_ERR:
3129         rrval = constants.HKR_FAIL
3130         output = "Hook script execution error: %s" % runresult
3131       elif relstatus == constants.RUNPARTS_RUN:
3132         if runresult.failed:
3133           rrval = constants.HKR_FAIL
3134         else:
3135           rrval = constants.HKR_SUCCESS
3136         output = utils.SafeEncode(runresult.output.strip())
3137       results.append(("%s/%s" % (subdir, relname), rrval, output))
3138
3139     return results
3140
3141
3142 class IAllocatorRunner(object):
3143   """IAllocator runner.
3144
3145   This class is instantiated on the node side (ganeti-noded) and not on
3146   the master side.
3147
3148   """
3149   @staticmethod
3150   def Run(name, idata):
3151     """Run an iallocator script.
3152
3153     @type name: str
3154     @param name: the iallocator script name
3155     @type idata: str
3156     @param idata: the allocator input data
3157
3158     @rtype: tuple
3159     @return: two element tuple of:
3160        - status
3161        - either error message or stdout of allocator (for success)
3162
3163     """
3164     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3165                                   os.path.isfile)
3166     if alloc_script is None:
3167       _Fail("iallocator module '%s' not found in the search path", name)
3168
3169     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3170     try:
3171       os.write(fd, idata)
3172       os.close(fd)
3173       result = utils.RunCmd([alloc_script, fin_name])
3174       if result.failed:
3175         _Fail("iallocator module '%s' failed: %s, output '%s'",
3176               name, result.fail_reason, result.output)
3177     finally:
3178       os.unlink(fin_name)
3179
3180     return result.stdout
3181
3182
3183 class DevCacheManager(object):
3184   """Simple class for managing a cache of block device information.
3185
3186   """
3187   _DEV_PREFIX = "/dev/"
3188   _ROOT_DIR = constants.BDEV_CACHE_DIR
3189
3190   @classmethod
3191   def _ConvertPath(cls, dev_path):
3192     """Converts a /dev/name path to the cache file name.
3193
3194     This replaces slashes with underscores and strips the /dev
3195     prefix. It then returns the full path to the cache file.
3196
3197     @type dev_path: str
3198     @param dev_path: the C{/dev/} path name
3199     @rtype: str
3200     @return: the converted path name
3201
3202     """
3203     if dev_path.startswith(cls._DEV_PREFIX):
3204       dev_path = dev_path[len(cls._DEV_PREFIX):]
3205     dev_path = dev_path.replace("/", "_")
3206     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3207     return fpath
3208
3209   @classmethod
3210   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3211     """Updates the cache information for a given device.
3212
3213     @type dev_path: str
3214     @param dev_path: the pathname of the device
3215     @type owner: str
3216     @param owner: the owner (instance name) of the device
3217     @type on_primary: bool
3218     @param on_primary: whether this is the primary
3219         node nor not
3220     @type iv_name: str
3221     @param iv_name: the instance-visible name of the
3222         device, as in objects.Disk.iv_name
3223
3224     @rtype: None
3225
3226     """
3227     if dev_path is None:
3228       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3229       return
3230     fpath = cls._ConvertPath(dev_path)
3231     if on_primary:
3232       state = "primary"
3233     else:
3234       state = "secondary"
3235     if iv_name is None:
3236       iv_name = "not_visible"
3237     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3238     try:
3239       utils.WriteFile(fpath, data=fdata)
3240     except EnvironmentError, err:
3241       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3242
3243   @classmethod
3244   def RemoveCache(cls, dev_path):
3245     """Remove data for a dev_path.
3246
3247     This is just a wrapper over L{utils.RemoveFile} with a converted
3248     path name and logging.
3249
3250     @type dev_path: str
3251     @param dev_path: the pathname of the device
3252
3253     @rtype: None
3254
3255     """
3256     if dev_path is None:
3257       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3258       return
3259     fpath = cls._ConvertPath(dev_path)
3260     try:
3261       utils.RemoveFile(fpath)
3262     except EnvironmentError, err:
3263       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)