utils: Add function to read locked PID file
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26 @var _ALLOWED_CLEAN_DIRS: denotes which directories are accepted
27      in the L{_CleanDirectory} function
28
29 """
30
31 # pylint: disable-msg=E1103
32
33 # E1103: %s %r has no %r member (but some types could not be
34 # inferred), because the _TryOSFromDisk returns either (True, os_obj)
35 # or (False, "string") which confuses pylint
36
37
38 import os
39 import os.path
40 import shutil
41 import time
42 import stat
43 import errno
44 import re
45 import random
46 import logging
47 import tempfile
48 import zlib
49 import base64
50
51 from ganeti import errors
52 from ganeti import utils
53 from ganeti import ssh
54 from ganeti import hypervisor
55 from ganeti import constants
56 from ganeti import bdev
57 from ganeti import objects
58 from ganeti import ssconf
59 from ganeti import serializer
60
61
62 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
63 _ALLOWED_CLEAN_DIRS = frozenset([
64   constants.DATA_DIR,
65   constants.JOB_QUEUE_ARCHIVE_DIR,
66   constants.QUEUE_DIR,
67   constants.CRYPTO_KEYS_DIR,
68   ])
69 _MAX_SSL_CERT_VALIDITY = 7 * 24 * 60 * 60
70 _X509_KEY_FILE = "key"
71 _X509_CERT_FILE = "cert"
72 _IES_STATUS_FILE = "status"
73 _IES_PID_FILE = "pid"
74 _IES_CA_FILE = "ca"
75
76
77 class RPCFail(Exception):
78   """Class denoting RPC failure.
79
80   Its argument is the error message.
81
82   """
83
84
85 def _Fail(msg, *args, **kwargs):
86   """Log an error and the raise an RPCFail exception.
87
88   This exception is then handled specially in the ganeti daemon and
89   turned into a 'failed' return type. As such, this function is a
90   useful shortcut for logging the error and returning it to the master
91   daemon.
92
93   @type msg: string
94   @param msg: the text of the exception
95   @raise RPCFail
96
97   """
98   if args:
99     msg = msg % args
100   if "log" not in kwargs or kwargs["log"]: # if we should log this error
101     if "exc" in kwargs and kwargs["exc"]:
102       logging.exception(msg)
103     else:
104       logging.error(msg)
105   raise RPCFail(msg)
106
107
108 def _GetConfig():
109   """Simple wrapper to return a SimpleStore.
110
111   @rtype: L{ssconf.SimpleStore}
112   @return: a SimpleStore instance
113
114   """
115   return ssconf.SimpleStore()
116
117
118 def _GetSshRunner(cluster_name):
119   """Simple wrapper to return an SshRunner.
120
121   @type cluster_name: str
122   @param cluster_name: the cluster name, which is needed
123       by the SshRunner constructor
124   @rtype: L{ssh.SshRunner}
125   @return: an SshRunner instance
126
127   """
128   return ssh.SshRunner(cluster_name)
129
130
131 def _Decompress(data):
132   """Unpacks data compressed by the RPC client.
133
134   @type data: list or tuple
135   @param data: Data sent by RPC client
136   @rtype: str
137   @return: Decompressed data
138
139   """
140   assert isinstance(data, (list, tuple))
141   assert len(data) == 2
142   (encoding, content) = data
143   if encoding == constants.RPC_ENCODING_NONE:
144     return content
145   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
146     return zlib.decompress(base64.b64decode(content))
147   else:
148     raise AssertionError("Unknown data encoding")
149
150
151 def _CleanDirectory(path, exclude=None):
152   """Removes all regular files in a directory.
153
154   @type path: str
155   @param path: the directory to clean
156   @type exclude: list
157   @param exclude: list of files to be excluded, defaults
158       to the empty list
159
160   """
161   if path not in _ALLOWED_CLEAN_DIRS:
162     _Fail("Path passed to _CleanDirectory not in allowed clean targets: '%s'",
163           path)
164
165   if not os.path.isdir(path):
166     return
167   if exclude is None:
168     exclude = []
169   else:
170     # Normalize excluded paths
171     exclude = [os.path.normpath(i) for i in exclude]
172
173   for rel_name in utils.ListVisibleFiles(path):
174     full_name = utils.PathJoin(path, rel_name)
175     if full_name in exclude:
176       continue
177     if os.path.isfile(full_name) and not os.path.islink(full_name):
178       utils.RemoveFile(full_name)
179
180
181 def _BuildUploadFileList():
182   """Build the list of allowed upload files.
183
184   This is abstracted so that it's built only once at module import time.
185
186   """
187   allowed_files = set([
188     constants.CLUSTER_CONF_FILE,
189     constants.ETC_HOSTS,
190     constants.SSH_KNOWN_HOSTS_FILE,
191     constants.VNC_PASSWORD_FILE,
192     constants.RAPI_CERT_FILE,
193     constants.RAPI_USERS_FILE,
194     constants.CONFD_HMAC_KEY,
195     ])
196
197   for hv_name in constants.HYPER_TYPES:
198     hv_class = hypervisor.GetHypervisorClass(hv_name)
199     allowed_files.update(hv_class.GetAncillaryFiles())
200
201   return frozenset(allowed_files)
202
203
204 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
205
206
207 def JobQueuePurge():
208   """Removes job queue files and archived jobs.
209
210   @rtype: tuple
211   @return: True, None
212
213   """
214   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
215   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
216
217
218 def GetMasterInfo():
219   """Returns master information.
220
221   This is an utility function to compute master information, either
222   for consumption here or from the node daemon.
223
224   @rtype: tuple
225   @return: master_netdev, master_ip, master_name
226   @raise RPCFail: in case of errors
227
228   """
229   try:
230     cfg = _GetConfig()
231     master_netdev = cfg.GetMasterNetdev()
232     master_ip = cfg.GetMasterIP()
233     master_node = cfg.GetMasterNode()
234   except errors.ConfigurationError, err:
235     _Fail("Cluster configuration incomplete: %s", err, exc=True)
236   return (master_netdev, master_ip, master_node)
237
238
239 def StartMaster(start_daemons, no_voting):
240   """Activate local node as master node.
241
242   The function will always try activate the IP address of the master
243   (unless someone else has it). It will also start the master daemons,
244   based on the start_daemons parameter.
245
246   @type start_daemons: boolean
247   @param start_daemons: whether to also start the master
248       daemons (ganeti-masterd and ganeti-rapi)
249   @type no_voting: boolean
250   @param no_voting: whether to start ganeti-masterd without a node vote
251       (if start_daemons is True), but still non-interactively
252   @rtype: None
253
254   """
255   # GetMasterInfo will raise an exception if not able to return data
256   master_netdev, master_ip, _ = GetMasterInfo()
257
258   err_msgs = []
259   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
260     if utils.OwnIpAddress(master_ip):
261       # we already have the ip:
262       logging.debug("Master IP already configured, doing nothing")
263     else:
264       msg = "Someone else has the master ip, not activating"
265       logging.error(msg)
266       err_msgs.append(msg)
267   else:
268     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
269                            "dev", master_netdev, "label",
270                            "%s:0" % master_netdev])
271     if result.failed:
272       msg = "Can't activate master IP: %s" % result.output
273       logging.error(msg)
274       err_msgs.append(msg)
275
276     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
277                            "-s", master_ip, master_ip])
278     # we'll ignore the exit code of arping
279
280   # and now start the master and rapi daemons
281   if start_daemons:
282     if no_voting:
283       masterd_args = "--no-voting --yes-do-it"
284     else:
285       masterd_args = ""
286
287     env = {
288       "EXTRA_MASTERD_ARGS": masterd_args,
289       }
290
291     result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
292     if result.failed:
293       msg = "Can't start Ganeti master: %s" % result.output
294       logging.error(msg)
295       err_msgs.append(msg)
296
297   if err_msgs:
298     _Fail("; ".join(err_msgs))
299
300
301 def StopMaster(stop_daemons):
302   """Deactivate this node as master.
303
304   The function will always try to deactivate the IP address of the
305   master. It will also stop the master daemons depending on the
306   stop_daemons parameter.
307
308   @type stop_daemons: boolean
309   @param stop_daemons: whether to also stop the master daemons
310       (ganeti-masterd and ganeti-rapi)
311   @rtype: None
312
313   """
314   # TODO: log and report back to the caller the error failures; we
315   # need to decide in which case we fail the RPC for this
316
317   # GetMasterInfo will raise an exception if not able to return data
318   master_netdev, master_ip, _ = GetMasterInfo()
319
320   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
321                          "dev", master_netdev])
322   if result.failed:
323     logging.error("Can't remove the master IP, error: %s", result.output)
324     # but otherwise ignore the failure
325
326   if stop_daemons:
327     result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
328     if result.failed:
329       logging.error("Could not stop Ganeti master, command %s had exitcode %s"
330                     " and error %s",
331                     result.cmd, result.exit_code, result.output)
332
333
334 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
335   """Joins this node to the cluster.
336
337   This does the following:
338       - updates the hostkeys of the machine (rsa and dsa)
339       - adds the ssh private key to the user
340       - adds the ssh public key to the users' authorized_keys file
341
342   @type dsa: str
343   @param dsa: the DSA private key to write
344   @type dsapub: str
345   @param dsapub: the DSA public key to write
346   @type rsa: str
347   @param rsa: the RSA private key to write
348   @type rsapub: str
349   @param rsapub: the RSA public key to write
350   @type sshkey: str
351   @param sshkey: the SSH private key to write
352   @type sshpub: str
353   @param sshpub: the SSH public key to write
354   @rtype: boolean
355   @return: the success of the operation
356
357   """
358   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
359                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
360                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
361                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
362   for name, content, mode in sshd_keys:
363     utils.WriteFile(name, data=content, mode=mode)
364
365   try:
366     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
367                                                     mkdir=True)
368   except errors.OpExecError, err:
369     _Fail("Error while processing user ssh files: %s", err, exc=True)
370
371   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
372     utils.WriteFile(name, data=content, mode=0600)
373
374   utils.AddAuthorizedKey(auth_keys, sshpub)
375
376   result = utils.RunCmd([constants.DAEMON_UTIL, "reload-ssh-keys"])
377   if result.failed:
378     _Fail("Unable to reload SSH keys (command %r, exit code %s, output %r)",
379           result.cmd, result.exit_code, result.output)
380
381
382 def LeaveCluster(modify_ssh_setup):
383   """Cleans up and remove the current node.
384
385   This function cleans up and prepares the current node to be removed
386   from the cluster.
387
388   If processing is successful, then it raises an
389   L{errors.QuitGanetiException} which is used as a special case to
390   shutdown the node daemon.
391
392   @param modify_ssh_setup: boolean
393
394   """
395   _CleanDirectory(constants.DATA_DIR)
396   _CleanDirectory(constants.CRYPTO_KEYS_DIR)
397   JobQueuePurge()
398
399   if modify_ssh_setup:
400     try:
401       priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
402
403       utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
404
405       utils.RemoveFile(priv_key)
406       utils.RemoveFile(pub_key)
407     except errors.OpExecError:
408       logging.exception("Error while processing ssh files")
409
410   try:
411     utils.RemoveFile(constants.CONFD_HMAC_KEY)
412     utils.RemoveFile(constants.RAPI_CERT_FILE)
413     utils.RemoveFile(constants.NODED_CERT_FILE)
414   except: # pylint: disable-msg=W0702
415     logging.exception("Error while removing cluster secrets")
416
417   result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
418   if result.failed:
419     logging.error("Command %s failed with exitcode %s and error %s",
420                   result.cmd, result.exit_code, result.output)
421
422   # Raise a custom exception (handled in ganeti-noded)
423   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
424
425
426 def GetNodeInfo(vgname, hypervisor_type):
427   """Gives back a hash with different information about the node.
428
429   @type vgname: C{string}
430   @param vgname: the name of the volume group to ask for disk space information
431   @type hypervisor_type: C{str}
432   @param hypervisor_type: the name of the hypervisor to ask for
433       memory information
434   @rtype: C{dict}
435   @return: dictionary with the following keys:
436       - vg_size is the size of the configured volume group in MiB
437       - vg_free is the free size of the volume group in MiB
438       - memory_dom0 is the memory allocated for domain0 in MiB
439       - memory_free is the currently available (free) ram in MiB
440       - memory_total is the total number of ram in MiB
441
442   """
443   outputarray = {}
444   vginfo = _GetVGInfo(vgname)
445   outputarray['vg_size'] = vginfo['vg_size']
446   outputarray['vg_free'] = vginfo['vg_free']
447
448   hyper = hypervisor.GetHypervisor(hypervisor_type)
449   hyp_info = hyper.GetNodeInfo()
450   if hyp_info is not None:
451     outputarray.update(hyp_info)
452
453   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
454
455   return outputarray
456
457
458 def VerifyNode(what, cluster_name):
459   """Verify the status of the local node.
460
461   Based on the input L{what} parameter, various checks are done on the
462   local node.
463
464   If the I{filelist} key is present, this list of
465   files is checksummed and the file/checksum pairs are returned.
466
467   If the I{nodelist} key is present, we check that we have
468   connectivity via ssh with the target nodes (and check the hostname
469   report).
470
471   If the I{node-net-test} key is present, we check that we have
472   connectivity to the given nodes via both primary IP and, if
473   applicable, secondary IPs.
474
475   @type what: C{dict}
476   @param what: a dictionary of things to check:
477       - filelist: list of files for which to compute checksums
478       - nodelist: list of nodes we should check ssh communication with
479       - node-net-test: list of nodes we should check node daemon port
480         connectivity with
481       - hypervisor: list with hypervisors to run the verify for
482   @rtype: dict
483   @return: a dictionary with the same keys as the input dict, and
484       values representing the result of the checks
485
486   """
487   result = {}
488
489   if constants.NV_HYPERVISOR in what:
490     result[constants.NV_HYPERVISOR] = tmp = {}
491     for hv_name in what[constants.NV_HYPERVISOR]:
492       try:
493         val = hypervisor.GetHypervisor(hv_name).Verify()
494       except errors.HypervisorError, err:
495         val = "Error while checking hypervisor: %s" % str(err)
496       tmp[hv_name] = val
497
498   if constants.NV_FILELIST in what:
499     result[constants.NV_FILELIST] = utils.FingerprintFiles(
500       what[constants.NV_FILELIST])
501
502   if constants.NV_NODELIST in what:
503     result[constants.NV_NODELIST] = tmp = {}
504     random.shuffle(what[constants.NV_NODELIST])
505     for node in what[constants.NV_NODELIST]:
506       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
507       if not success:
508         tmp[node] = message
509
510   if constants.NV_NODENETTEST in what:
511     result[constants.NV_NODENETTEST] = tmp = {}
512     my_name = utils.HostInfo().name
513     my_pip = my_sip = None
514     for name, pip, sip in what[constants.NV_NODENETTEST]:
515       if name == my_name:
516         my_pip = pip
517         my_sip = sip
518         break
519     if not my_pip:
520       tmp[my_name] = ("Can't find my own primary/secondary IP"
521                       " in the node list")
522     else:
523       port = utils.GetDaemonPort(constants.NODED)
524       for name, pip, sip in what[constants.NV_NODENETTEST]:
525         fail = []
526         if not utils.TcpPing(pip, port, source=my_pip):
527           fail.append("primary")
528         if sip != pip:
529           if not utils.TcpPing(sip, port, source=my_sip):
530             fail.append("secondary")
531         if fail:
532           tmp[name] = ("failure using the %s interface(s)" %
533                        " and ".join(fail))
534
535   if constants.NV_LVLIST in what:
536     try:
537       val = GetVolumeList(what[constants.NV_LVLIST])
538     except RPCFail, err:
539       val = str(err)
540     result[constants.NV_LVLIST] = val
541
542   if constants.NV_INSTANCELIST in what:
543     # GetInstanceList can fail
544     try:
545       val = GetInstanceList(what[constants.NV_INSTANCELIST])
546     except RPCFail, err:
547       val = str(err)
548     result[constants.NV_INSTANCELIST] = val
549
550   if constants.NV_VGLIST in what:
551     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
552
553   if constants.NV_PVLIST in what:
554     result[constants.NV_PVLIST] = \
555       bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
556                                    filter_allocatable=False)
557
558   if constants.NV_VERSION in what:
559     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
560                                     constants.RELEASE_VERSION)
561
562   if constants.NV_HVINFO in what:
563     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
564     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
565
566   if constants.NV_DRBDLIST in what:
567     try:
568       used_minors = bdev.DRBD8.GetUsedDevs().keys()
569     except errors.BlockDeviceError, err:
570       logging.warning("Can't get used minors list", exc_info=True)
571       used_minors = str(err)
572     result[constants.NV_DRBDLIST] = used_minors
573
574   if constants.NV_NODESETUP in what:
575     result[constants.NV_NODESETUP] = tmpr = []
576     if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
577       tmpr.append("The sysfs filesytem doesn't seem to be mounted"
578                   " under /sys, missing required directories /sys/block"
579                   " and /sys/class/net")
580     if (not os.path.isdir("/proc/sys") or
581         not os.path.isfile("/proc/sysrq-trigger")):
582       tmpr.append("The procfs filesystem doesn't seem to be mounted"
583                   " under /proc, missing required directory /proc/sys and"
584                   " the file /proc/sysrq-trigger")
585
586   if constants.NV_TIME in what:
587     result[constants.NV_TIME] = utils.SplitTime(time.time())
588
589   return result
590
591
592 def GetVolumeList(vg_name):
593   """Compute list of logical volumes and their size.
594
595   @type vg_name: str
596   @param vg_name: the volume group whose LVs we should list
597   @rtype: dict
598   @return:
599       dictionary of all partions (key) with value being a tuple of
600       their size (in MiB), inactive and online status::
601
602         {'test1': ('20.06', True, True)}
603
604       in case of errors, a string is returned with the error
605       details.
606
607   """
608   lvs = {}
609   sep = '|'
610   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
611                          "--separator=%s" % sep,
612                          "-olv_name,lv_size,lv_attr", vg_name])
613   if result.failed:
614     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
615
616   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
617   for line in result.stdout.splitlines():
618     line = line.strip()
619     match = valid_line_re.match(line)
620     if not match:
621       logging.error("Invalid line returned from lvs output: '%s'", line)
622       continue
623     name, size, attr = match.groups()
624     inactive = attr[4] == '-'
625     online = attr[5] == 'o'
626     virtual = attr[0] == 'v'
627     if virtual:
628       # we don't want to report such volumes as existing, since they
629       # don't really hold data
630       continue
631     lvs[name] = (size, inactive, online)
632
633   return lvs
634
635
636 def ListVolumeGroups():
637   """List the volume groups and their size.
638
639   @rtype: dict
640   @return: dictionary with keys volume name and values the
641       size of the volume
642
643   """
644   return utils.ListVolumeGroups()
645
646
647 def NodeVolumes():
648   """List all volumes on this node.
649
650   @rtype: list
651   @return:
652     A list of dictionaries, each having four keys:
653       - name: the logical volume name,
654       - size: the size of the logical volume
655       - dev: the physical device on which the LV lives
656       - vg: the volume group to which it belongs
657
658     In case of errors, we return an empty list and log the
659     error.
660
661     Note that since a logical volume can live on multiple physical
662     volumes, the resulting list might include a logical volume
663     multiple times.
664
665   """
666   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
667                          "--separator=|",
668                          "--options=lv_name,lv_size,devices,vg_name"])
669   if result.failed:
670     _Fail("Failed to list logical volumes, lvs output: %s",
671           result.output)
672
673   def parse_dev(dev):
674     return dev.split('(')[0]
675
676   def handle_dev(dev):
677     return [parse_dev(x) for x in dev.split(",")]
678
679   def map_line(line):
680     line = [v.strip() for v in line]
681     return [{'name': line[0], 'size': line[1],
682              'dev': dev, 'vg': line[3]} for dev in handle_dev(line[2])]
683
684   all_devs = []
685   for line in result.stdout.splitlines():
686     if line.count('|') >= 3:
687       all_devs.extend(map_line(line.split('|')))
688     else:
689       logging.warning("Strange line in the output from lvs: '%s'", line)
690   return all_devs
691
692
693 def BridgesExist(bridges_list):
694   """Check if a list of bridges exist on the current node.
695
696   @rtype: boolean
697   @return: C{True} if all of them exist, C{False} otherwise
698
699   """
700   missing = []
701   for bridge in bridges_list:
702     if not utils.BridgeExists(bridge):
703       missing.append(bridge)
704
705   if missing:
706     _Fail("Missing bridges %s", utils.CommaJoin(missing))
707
708
709 def GetInstanceList(hypervisor_list):
710   """Provides a list of instances.
711
712   @type hypervisor_list: list
713   @param hypervisor_list: the list of hypervisors to query information
714
715   @rtype: list
716   @return: a list of all running instances on the current node
717     - instance1.example.com
718     - instance2.example.com
719
720   """
721   results = []
722   for hname in hypervisor_list:
723     try:
724       names = hypervisor.GetHypervisor(hname).ListInstances()
725       results.extend(names)
726     except errors.HypervisorError, err:
727       _Fail("Error enumerating instances (hypervisor %s): %s",
728             hname, err, exc=True)
729
730   return results
731
732
733 def GetInstanceInfo(instance, hname):
734   """Gives back the information about an instance as a dictionary.
735
736   @type instance: string
737   @param instance: the instance name
738   @type hname: string
739   @param hname: the hypervisor type of the instance
740
741   @rtype: dict
742   @return: dictionary with the following keys:
743       - memory: memory size of instance (int)
744       - state: xen state of instance (string)
745       - time: cpu time of instance (float)
746
747   """
748   output = {}
749
750   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
751   if iinfo is not None:
752     output['memory'] = iinfo[2]
753     output['state'] = iinfo[4]
754     output['time'] = iinfo[5]
755
756   return output
757
758
759 def GetInstanceMigratable(instance):
760   """Gives whether an instance can be migrated.
761
762   @type instance: L{objects.Instance}
763   @param instance: object representing the instance to be checked.
764
765   @rtype: tuple
766   @return: tuple of (result, description) where:
767       - result: whether the instance can be migrated or not
768       - description: a description of the issue, if relevant
769
770   """
771   hyper = hypervisor.GetHypervisor(instance.hypervisor)
772   iname = instance.name
773   if iname not in hyper.ListInstances():
774     _Fail("Instance %s is not running", iname)
775
776   for idx in range(len(instance.disks)):
777     link_name = _GetBlockDevSymlinkPath(iname, idx)
778     if not os.path.islink(link_name):
779       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
780
781
782 def GetAllInstancesInfo(hypervisor_list):
783   """Gather data about all instances.
784
785   This is the equivalent of L{GetInstanceInfo}, except that it
786   computes data for all instances at once, thus being faster if one
787   needs data about more than one instance.
788
789   @type hypervisor_list: list
790   @param hypervisor_list: list of hypervisors to query for instance data
791
792   @rtype: dict
793   @return: dictionary of instance: data, with data having the following keys:
794       - memory: memory size of instance (int)
795       - state: xen state of instance (string)
796       - time: cpu time of instance (float)
797       - vcpus: the number of vcpus
798
799   """
800   output = {}
801
802   for hname in hypervisor_list:
803     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
804     if iinfo:
805       for name, _, memory, vcpus, state, times in iinfo:
806         value = {
807           'memory': memory,
808           'vcpus': vcpus,
809           'state': state,
810           'time': times,
811           }
812         if name in output:
813           # we only check static parameters, like memory and vcpus,
814           # and not state and time which can change between the
815           # invocations of the different hypervisors
816           for key in 'memory', 'vcpus':
817             if value[key] != output[name][key]:
818               _Fail("Instance %s is running twice"
819                     " with different parameters", name)
820         output[name] = value
821
822   return output
823
824
825 def _InstanceLogName(kind, os_name, instance):
826   """Compute the OS log filename for a given instance and operation.
827
828   The instance name and os name are passed in as strings since not all
829   operations have these as part of an instance object.
830
831   @type kind: string
832   @param kind: the operation type (e.g. add, import, etc.)
833   @type os_name: string
834   @param os_name: the os name
835   @type instance: string
836   @param instance: the name of the instance being imported/added/etc.
837
838   """
839   # TODO: Use tempfile.mkstemp to create unique filename
840   base = ("%s-%s-%s-%s.log" %
841           (kind, os_name, instance, utils.TimestampForFilename()))
842   return utils.PathJoin(constants.LOG_OS_DIR, base)
843
844
845 def InstanceOsAdd(instance, reinstall, debug):
846   """Add an OS to an instance.
847
848   @type instance: L{objects.Instance}
849   @param instance: Instance whose OS is to be installed
850   @type reinstall: boolean
851   @param reinstall: whether this is an instance reinstall
852   @type debug: integer
853   @param debug: debug level, passed to the OS scripts
854   @rtype: None
855
856   """
857   inst_os = OSFromDisk(instance.os)
858
859   create_env = OSEnvironment(instance, inst_os, debug)
860   if reinstall:
861     create_env['INSTANCE_REINSTALL'] = "1"
862
863   logfile = _InstanceLogName("add", instance.os, instance.name)
864
865   result = utils.RunCmd([inst_os.create_script], env=create_env,
866                         cwd=inst_os.path, output=logfile,)
867   if result.failed:
868     logging.error("os create command '%s' returned error: %s, logfile: %s,"
869                   " output: %s", result.cmd, result.fail_reason, logfile,
870                   result.output)
871     lines = [utils.SafeEncode(val)
872              for val in utils.TailFile(logfile, lines=20)]
873     _Fail("OS create script failed (%s), last lines in the"
874           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
875
876
877 def RunRenameInstance(instance, old_name, debug):
878   """Run the OS rename script for an instance.
879
880   @type instance: L{objects.Instance}
881   @param instance: Instance whose OS is to be installed
882   @type old_name: string
883   @param old_name: previous instance name
884   @type debug: integer
885   @param debug: debug level, passed to the OS scripts
886   @rtype: boolean
887   @return: the success of the operation
888
889   """
890   inst_os = OSFromDisk(instance.os)
891
892   rename_env = OSEnvironment(instance, inst_os, debug)
893   rename_env['OLD_INSTANCE_NAME'] = old_name
894
895   logfile = _InstanceLogName("rename", instance.os,
896                              "%s-%s" % (old_name, instance.name))
897
898   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
899                         cwd=inst_os.path, output=logfile)
900
901   if result.failed:
902     logging.error("os create command '%s' returned error: %s output: %s",
903                   result.cmd, result.fail_reason, result.output)
904     lines = [utils.SafeEncode(val)
905              for val in utils.TailFile(logfile, lines=20)]
906     _Fail("OS rename script failed (%s), last lines in the"
907           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
908
909
910 def _GetVGInfo(vg_name):
911   """Get information about the volume group.
912
913   @type vg_name: str
914   @param vg_name: the volume group which we query
915   @rtype: dict
916   @return:
917     A dictionary with the following keys:
918       - C{vg_size} is the total size of the volume group in MiB
919       - C{vg_free} is the free size of the volume group in MiB
920       - C{pv_count} are the number of physical disks in that VG
921
922     If an error occurs during gathering of data, we return the same dict
923     with keys all set to None.
924
925   """
926   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
927
928   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
929                          "--nosuffix", "--units=m", "--separator=:", vg_name])
930
931   if retval.failed:
932     logging.error("volume group %s not present", vg_name)
933     return retdic
934   valarr = retval.stdout.strip().rstrip(':').split(':')
935   if len(valarr) == 3:
936     try:
937       retdic = {
938         "vg_size": int(round(float(valarr[0]), 0)),
939         "vg_free": int(round(float(valarr[1]), 0)),
940         "pv_count": int(valarr[2]),
941         }
942     except (TypeError, ValueError), err:
943       logging.exception("Fail to parse vgs output: %s", err)
944   else:
945     logging.error("vgs output has the wrong number of fields (expected"
946                   " three): %s", str(valarr))
947   return retdic
948
949
950 def _GetBlockDevSymlinkPath(instance_name, idx):
951   return utils.PathJoin(constants.DISK_LINKS_DIR,
952                         "%s:%d" % (instance_name, idx))
953
954
955 def _SymlinkBlockDev(instance_name, device_path, idx):
956   """Set up symlinks to a instance's block device.
957
958   This is an auxiliary function run when an instance is start (on the primary
959   node) or when an instance is migrated (on the target node).
960
961
962   @param instance_name: the name of the target instance
963   @param device_path: path of the physical block device, on the node
964   @param idx: the disk index
965   @return: absolute path to the disk's symlink
966
967   """
968   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
969   try:
970     os.symlink(device_path, link_name)
971   except OSError, err:
972     if err.errno == errno.EEXIST:
973       if (not os.path.islink(link_name) or
974           os.readlink(link_name) != device_path):
975         os.remove(link_name)
976         os.symlink(device_path, link_name)
977     else:
978       raise
979
980   return link_name
981
982
983 def _RemoveBlockDevLinks(instance_name, disks):
984   """Remove the block device symlinks belonging to the given instance.
985
986   """
987   for idx, _ in enumerate(disks):
988     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
989     if os.path.islink(link_name):
990       try:
991         os.remove(link_name)
992       except OSError:
993         logging.exception("Can't remove symlink '%s'", link_name)
994
995
996 def _GatherAndLinkBlockDevs(instance):
997   """Set up an instance's block device(s).
998
999   This is run on the primary node at instance startup. The block
1000   devices must be already assembled.
1001
1002   @type instance: L{objects.Instance}
1003   @param instance: the instance whose disks we shoul assemble
1004   @rtype: list
1005   @return: list of (disk_object, device_path)
1006
1007   """
1008   block_devices = []
1009   for idx, disk in enumerate(instance.disks):
1010     device = _RecursiveFindBD(disk)
1011     if device is None:
1012       raise errors.BlockDeviceError("Block device '%s' is not set up." %
1013                                     str(disk))
1014     device.Open()
1015     try:
1016       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
1017     except OSError, e:
1018       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
1019                                     e.strerror)
1020
1021     block_devices.append((disk, link_name))
1022
1023   return block_devices
1024
1025
1026 def StartInstance(instance):
1027   """Start an instance.
1028
1029   @type instance: L{objects.Instance}
1030   @param instance: the instance object
1031   @rtype: None
1032
1033   """
1034   running_instances = GetInstanceList([instance.hypervisor])
1035
1036   if instance.name in running_instances:
1037     logging.info("Instance %s already running, not starting", instance.name)
1038     return
1039
1040   try:
1041     block_devices = _GatherAndLinkBlockDevs(instance)
1042     hyper = hypervisor.GetHypervisor(instance.hypervisor)
1043     hyper.StartInstance(instance, block_devices)
1044   except errors.BlockDeviceError, err:
1045     _Fail("Block device error: %s", err, exc=True)
1046   except errors.HypervisorError, err:
1047     _RemoveBlockDevLinks(instance.name, instance.disks)
1048     _Fail("Hypervisor error: %s", err, exc=True)
1049
1050
1051 def InstanceShutdown(instance, timeout):
1052   """Shut an instance down.
1053
1054   @note: this functions uses polling with a hardcoded timeout.
1055
1056   @type instance: L{objects.Instance}
1057   @param instance: the instance object
1058   @type timeout: integer
1059   @param timeout: maximum timeout for soft shutdown
1060   @rtype: None
1061
1062   """
1063   hv_name = instance.hypervisor
1064   hyper = hypervisor.GetHypervisor(hv_name)
1065   iname = instance.name
1066
1067   if instance.name not in hyper.ListInstances():
1068     logging.info("Instance %s not running, doing nothing", iname)
1069     return
1070
1071   class _TryShutdown:
1072     def __init__(self):
1073       self.tried_once = False
1074
1075     def __call__(self):
1076       if iname not in hyper.ListInstances():
1077         return
1078
1079       try:
1080         hyper.StopInstance(instance, retry=self.tried_once)
1081       except errors.HypervisorError, err:
1082         if iname not in hyper.ListInstances():
1083           # if the instance is no longer existing, consider this a
1084           # success and go to cleanup
1085           return
1086
1087         _Fail("Failed to stop instance %s: %s", iname, err)
1088
1089       self.tried_once = True
1090
1091       raise utils.RetryAgain()
1092
1093   try:
1094     utils.Retry(_TryShutdown(), 5, timeout)
1095   except utils.RetryTimeout:
1096     # the shutdown did not succeed
1097     logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1098
1099     try:
1100       hyper.StopInstance(instance, force=True)
1101     except errors.HypervisorError, err:
1102       if iname in hyper.ListInstances():
1103         # only raise an error if the instance still exists, otherwise
1104         # the error could simply be "instance ... unknown"!
1105         _Fail("Failed to force stop instance %s: %s", iname, err)
1106
1107     time.sleep(1)
1108
1109     if iname in hyper.ListInstances():
1110       _Fail("Could not shutdown instance %s even by destroy", iname)
1111
1112   try:
1113     hyper.CleanupInstance(instance.name)
1114   except errors.HypervisorError, err:
1115     logging.warning("Failed to execute post-shutdown cleanup step: %s", err)
1116
1117   _RemoveBlockDevLinks(iname, instance.disks)
1118
1119
1120 def InstanceReboot(instance, reboot_type, shutdown_timeout):
1121   """Reboot an instance.
1122
1123   @type instance: L{objects.Instance}
1124   @param instance: the instance object to reboot
1125   @type reboot_type: str
1126   @param reboot_type: the type of reboot, one the following
1127     constants:
1128       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1129         instance OS, do not recreate the VM
1130       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1131         restart the VM (at the hypervisor level)
1132       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1133         not accepted here, since that mode is handled differently, in
1134         cmdlib, and translates into full stop and start of the
1135         instance (instead of a call_instance_reboot RPC)
1136   @type shutdown_timeout: integer
1137   @param shutdown_timeout: maximum timeout for soft shutdown
1138   @rtype: None
1139
1140   """
1141   running_instances = GetInstanceList([instance.hypervisor])
1142
1143   if instance.name not in running_instances:
1144     _Fail("Cannot reboot instance %s that is not running", instance.name)
1145
1146   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1147   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1148     try:
1149       hyper.RebootInstance(instance)
1150     except errors.HypervisorError, err:
1151       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1152   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1153     try:
1154       InstanceShutdown(instance, shutdown_timeout)
1155       return StartInstance(instance)
1156     except errors.HypervisorError, err:
1157       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1158   else:
1159     _Fail("Invalid reboot_type received: %s", reboot_type)
1160
1161
1162 def MigrationInfo(instance):
1163   """Gather information about an instance to be migrated.
1164
1165   @type instance: L{objects.Instance}
1166   @param instance: the instance definition
1167
1168   """
1169   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1170   try:
1171     info = hyper.MigrationInfo(instance)
1172   except errors.HypervisorError, err:
1173     _Fail("Failed to fetch migration information: %s", err, exc=True)
1174   return info
1175
1176
1177 def AcceptInstance(instance, info, target):
1178   """Prepare the node to accept an instance.
1179
1180   @type instance: L{objects.Instance}
1181   @param instance: the instance definition
1182   @type info: string/data (opaque)
1183   @param info: migration information, from the source node
1184   @type target: string
1185   @param target: target host (usually ip), on this node
1186
1187   """
1188   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1189   try:
1190     hyper.AcceptInstance(instance, info, target)
1191   except errors.HypervisorError, err:
1192     _Fail("Failed to accept instance: %s", err, exc=True)
1193
1194
1195 def FinalizeMigration(instance, info, success):
1196   """Finalize any preparation to accept an instance.
1197
1198   @type instance: L{objects.Instance}
1199   @param instance: the instance definition
1200   @type info: string/data (opaque)
1201   @param info: migration information, from the source node
1202   @type success: boolean
1203   @param success: whether the migration was a success or a failure
1204
1205   """
1206   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1207   try:
1208     hyper.FinalizeMigration(instance, info, success)
1209   except errors.HypervisorError, err:
1210     _Fail("Failed to finalize migration: %s", err, exc=True)
1211
1212
1213 def MigrateInstance(instance, target, live):
1214   """Migrates an instance to another node.
1215
1216   @type instance: L{objects.Instance}
1217   @param instance: the instance definition
1218   @type target: string
1219   @param target: the target node name
1220   @type live: boolean
1221   @param live: whether the migration should be done live or not (the
1222       interpretation of this parameter is left to the hypervisor)
1223   @rtype: tuple
1224   @return: a tuple of (success, msg) where:
1225       - succes is a boolean denoting the success/failure of the operation
1226       - msg is a string with details in case of failure
1227
1228   """
1229   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1230
1231   try:
1232     hyper.MigrateInstance(instance, target, live)
1233   except errors.HypervisorError, err:
1234     _Fail("Failed to migrate instance: %s", err, exc=True)
1235
1236
1237 def BlockdevCreate(disk, size, owner, on_primary, info):
1238   """Creates a block device for an instance.
1239
1240   @type disk: L{objects.Disk}
1241   @param disk: the object describing the disk we should create
1242   @type size: int
1243   @param size: the size of the physical underlying device, in MiB
1244   @type owner: str
1245   @param owner: the name of the instance for which disk is created,
1246       used for device cache data
1247   @type on_primary: boolean
1248   @param on_primary:  indicates if it is the primary node or not
1249   @type info: string
1250   @param info: string that will be sent to the physical device
1251       creation, used for example to set (LVM) tags on LVs
1252
1253   @return: the new unique_id of the device (this can sometime be
1254       computed only after creation), or None. On secondary nodes,
1255       it's not required to return anything.
1256
1257   """
1258   # TODO: remove the obsolete 'size' argument
1259   # pylint: disable-msg=W0613
1260   clist = []
1261   if disk.children:
1262     for child in disk.children:
1263       try:
1264         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1265       except errors.BlockDeviceError, err:
1266         _Fail("Can't assemble device %s: %s", child, err)
1267       if on_primary or disk.AssembleOnSecondary():
1268         # we need the children open in case the device itself has to
1269         # be assembled
1270         try:
1271           # pylint: disable-msg=E1103
1272           crdev.Open()
1273         except errors.BlockDeviceError, err:
1274           _Fail("Can't make child '%s' read-write: %s", child, err)
1275       clist.append(crdev)
1276
1277   try:
1278     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1279   except errors.BlockDeviceError, err:
1280     _Fail("Can't create block device: %s", err)
1281
1282   if on_primary or disk.AssembleOnSecondary():
1283     try:
1284       device.Assemble()
1285     except errors.BlockDeviceError, err:
1286       _Fail("Can't assemble device after creation, unusual event: %s", err)
1287     device.SetSyncSpeed(constants.SYNC_SPEED)
1288     if on_primary or disk.OpenOnSecondary():
1289       try:
1290         device.Open(force=True)
1291       except errors.BlockDeviceError, err:
1292         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1293     DevCacheManager.UpdateCache(device.dev_path, owner,
1294                                 on_primary, disk.iv_name)
1295
1296   device.SetInfo(info)
1297
1298   return device.unique_id
1299
1300
1301 def BlockdevRemove(disk):
1302   """Remove a block device.
1303
1304   @note: This is intended to be called recursively.
1305
1306   @type disk: L{objects.Disk}
1307   @param disk: the disk object we should remove
1308   @rtype: boolean
1309   @return: the success of the operation
1310
1311   """
1312   msgs = []
1313   try:
1314     rdev = _RecursiveFindBD(disk)
1315   except errors.BlockDeviceError, err:
1316     # probably can't attach
1317     logging.info("Can't attach to device %s in remove", disk)
1318     rdev = None
1319   if rdev is not None:
1320     r_path = rdev.dev_path
1321     try:
1322       rdev.Remove()
1323     except errors.BlockDeviceError, err:
1324       msgs.append(str(err))
1325     if not msgs:
1326       DevCacheManager.RemoveCache(r_path)
1327
1328   if disk.children:
1329     for child in disk.children:
1330       try:
1331         BlockdevRemove(child)
1332       except RPCFail, err:
1333         msgs.append(str(err))
1334
1335   if msgs:
1336     _Fail("; ".join(msgs))
1337
1338
1339 def _RecursiveAssembleBD(disk, owner, as_primary):
1340   """Activate a block device for an instance.
1341
1342   This is run on the primary and secondary nodes for an instance.
1343
1344   @note: this function is called recursively.
1345
1346   @type disk: L{objects.Disk}
1347   @param disk: the disk we try to assemble
1348   @type owner: str
1349   @param owner: the name of the instance which owns the disk
1350   @type as_primary: boolean
1351   @param as_primary: if we should make the block device
1352       read/write
1353
1354   @return: the assembled device or None (in case no device
1355       was assembled)
1356   @raise errors.BlockDeviceError: in case there is an error
1357       during the activation of the children or the device
1358       itself
1359
1360   """
1361   children = []
1362   if disk.children:
1363     mcn = disk.ChildrenNeeded()
1364     if mcn == -1:
1365       mcn = 0 # max number of Nones allowed
1366     else:
1367       mcn = len(disk.children) - mcn # max number of Nones
1368     for chld_disk in disk.children:
1369       try:
1370         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1371       except errors.BlockDeviceError, err:
1372         if children.count(None) >= mcn:
1373           raise
1374         cdev = None
1375         logging.error("Error in child activation (but continuing): %s",
1376                       str(err))
1377       children.append(cdev)
1378
1379   if as_primary or disk.AssembleOnSecondary():
1380     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1381     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1382     result = r_dev
1383     if as_primary or disk.OpenOnSecondary():
1384       r_dev.Open()
1385     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1386                                 as_primary, disk.iv_name)
1387
1388   else:
1389     result = True
1390   return result
1391
1392
1393 def BlockdevAssemble(disk, owner, as_primary):
1394   """Activate a block device for an instance.
1395
1396   This is a wrapper over _RecursiveAssembleBD.
1397
1398   @rtype: str or boolean
1399   @return: a C{/dev/...} path for primary nodes, and
1400       C{True} for secondary nodes
1401
1402   """
1403   try:
1404     result = _RecursiveAssembleBD(disk, owner, as_primary)
1405     if isinstance(result, bdev.BlockDev):
1406       # pylint: disable-msg=E1103
1407       result = result.dev_path
1408   except errors.BlockDeviceError, err:
1409     _Fail("Error while assembling disk: %s", err, exc=True)
1410
1411   return result
1412
1413
1414 def BlockdevShutdown(disk):
1415   """Shut down a block device.
1416
1417   First, if the device is assembled (Attach() is successful), then
1418   the device is shutdown. Then the children of the device are
1419   shutdown.
1420
1421   This function is called recursively. Note that we don't cache the
1422   children or such, as oppossed to assemble, shutdown of different
1423   devices doesn't require that the upper device was active.
1424
1425   @type disk: L{objects.Disk}
1426   @param disk: the description of the disk we should
1427       shutdown
1428   @rtype: None
1429
1430   """
1431   msgs = []
1432   r_dev = _RecursiveFindBD(disk)
1433   if r_dev is not None:
1434     r_path = r_dev.dev_path
1435     try:
1436       r_dev.Shutdown()
1437       DevCacheManager.RemoveCache(r_path)
1438     except errors.BlockDeviceError, err:
1439       msgs.append(str(err))
1440
1441   if disk.children:
1442     for child in disk.children:
1443       try:
1444         BlockdevShutdown(child)
1445       except RPCFail, err:
1446         msgs.append(str(err))
1447
1448   if msgs:
1449     _Fail("; ".join(msgs))
1450
1451
1452 def BlockdevAddchildren(parent_cdev, new_cdevs):
1453   """Extend a mirrored block device.
1454
1455   @type parent_cdev: L{objects.Disk}
1456   @param parent_cdev: the disk to which we should add children
1457   @type new_cdevs: list of L{objects.Disk}
1458   @param new_cdevs: the list of children which we should add
1459   @rtype: None
1460
1461   """
1462   parent_bdev = _RecursiveFindBD(parent_cdev)
1463   if parent_bdev is None:
1464     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1465   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1466   if new_bdevs.count(None) > 0:
1467     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1468   parent_bdev.AddChildren(new_bdevs)
1469
1470
1471 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1472   """Shrink a mirrored block device.
1473
1474   @type parent_cdev: L{objects.Disk}
1475   @param parent_cdev: the disk from which we should remove children
1476   @type new_cdevs: list of L{objects.Disk}
1477   @param new_cdevs: the list of children which we should remove
1478   @rtype: None
1479
1480   """
1481   parent_bdev = _RecursiveFindBD(parent_cdev)
1482   if parent_bdev is None:
1483     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1484   devs = []
1485   for disk in new_cdevs:
1486     rpath = disk.StaticDevPath()
1487     if rpath is None:
1488       bd = _RecursiveFindBD(disk)
1489       if bd is None:
1490         _Fail("Can't find device %s while removing children", disk)
1491       else:
1492         devs.append(bd.dev_path)
1493     else:
1494       if not utils.IsNormAbsPath(rpath):
1495         _Fail("Strange path returned from StaticDevPath: '%s'", rpath)
1496       devs.append(rpath)
1497   parent_bdev.RemoveChildren(devs)
1498
1499
1500 def BlockdevGetmirrorstatus(disks):
1501   """Get the mirroring status of a list of devices.
1502
1503   @type disks: list of L{objects.Disk}
1504   @param disks: the list of disks which we should query
1505   @rtype: disk
1506   @return:
1507       a list of (mirror_done, estimated_time) tuples, which
1508       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1509   @raise errors.BlockDeviceError: if any of the disks cannot be
1510       found
1511
1512   """
1513   stats = []
1514   for dsk in disks:
1515     rbd = _RecursiveFindBD(dsk)
1516     if rbd is None:
1517       _Fail("Can't find device %s", dsk)
1518
1519     stats.append(rbd.CombinedSyncStatus())
1520
1521   return stats
1522
1523
1524 def _RecursiveFindBD(disk):
1525   """Check if a device is activated.
1526
1527   If so, return information about the real device.
1528
1529   @type disk: L{objects.Disk}
1530   @param disk: the disk object we need to find
1531
1532   @return: None if the device can't be found,
1533       otherwise the device instance
1534
1535   """
1536   children = []
1537   if disk.children:
1538     for chdisk in disk.children:
1539       children.append(_RecursiveFindBD(chdisk))
1540
1541   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1542
1543
1544 def _OpenRealBD(disk):
1545   """Opens the underlying block device of a disk.
1546
1547   @type disk: L{objects.Disk}
1548   @param disk: the disk object we want to open
1549
1550   """
1551   real_disk = _RecursiveFindBD(disk)
1552   if real_disk is None:
1553     _Fail("Block device '%s' is not set up", disk)
1554
1555   real_disk.Open()
1556
1557   return real_disk
1558
1559
1560 def BlockdevFind(disk):
1561   """Check if a device is activated.
1562
1563   If it is, return information about the real device.
1564
1565   @type disk: L{objects.Disk}
1566   @param disk: the disk to find
1567   @rtype: None or objects.BlockDevStatus
1568   @return: None if the disk cannot be found, otherwise a the current
1569            information
1570
1571   """
1572   try:
1573     rbd = _RecursiveFindBD(disk)
1574   except errors.BlockDeviceError, err:
1575     _Fail("Failed to find device: %s", err, exc=True)
1576
1577   if rbd is None:
1578     return None
1579
1580   return rbd.GetSyncStatus()
1581
1582
1583 def BlockdevGetsize(disks):
1584   """Computes the size of the given disks.
1585
1586   If a disk is not found, returns None instead.
1587
1588   @type disks: list of L{objects.Disk}
1589   @param disks: the list of disk to compute the size for
1590   @rtype: list
1591   @return: list with elements None if the disk cannot be found,
1592       otherwise the size
1593
1594   """
1595   result = []
1596   for cf in disks:
1597     try:
1598       rbd = _RecursiveFindBD(cf)
1599     except errors.BlockDeviceError:
1600       result.append(None)
1601       continue
1602     if rbd is None:
1603       result.append(None)
1604     else:
1605       result.append(rbd.GetActualSize())
1606   return result
1607
1608
1609 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1610   """Export a block device to a remote node.
1611
1612   @type disk: L{objects.Disk}
1613   @param disk: the description of the disk to export
1614   @type dest_node: str
1615   @param dest_node: the destination node to export to
1616   @type dest_path: str
1617   @param dest_path: the destination path on the target node
1618   @type cluster_name: str
1619   @param cluster_name: the cluster name, needed for SSH hostalias
1620   @rtype: None
1621
1622   """
1623   real_disk = _OpenRealBD(disk)
1624
1625   # the block size on the read dd is 1MiB to match our units
1626   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1627                                "dd if=%s bs=1048576 count=%s",
1628                                real_disk.dev_path, str(disk.size))
1629
1630   # we set here a smaller block size as, due to ssh buffering, more
1631   # than 64-128k will mostly ignored; we use nocreat to fail if the
1632   # device is not already there or we pass a wrong path; we use
1633   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1634   # to not buffer too much memory; this means that at best, we flush
1635   # every 64k, which will not be very fast
1636   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1637                                 " oflag=dsync", dest_path)
1638
1639   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1640                                                    constants.GANETI_RUNAS,
1641                                                    destcmd)
1642
1643   # all commands have been checked, so we're safe to combine them
1644   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1645
1646   result = utils.RunCmd(["bash", "-c", command])
1647
1648   if result.failed:
1649     _Fail("Disk copy command '%s' returned error: %s"
1650           " output: %s", command, result.fail_reason, result.output)
1651
1652
1653 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1654   """Write a file to the filesystem.
1655
1656   This allows the master to overwrite(!) a file. It will only perform
1657   the operation if the file belongs to a list of configuration files.
1658
1659   @type file_name: str
1660   @param file_name: the target file name
1661   @type data: str
1662   @param data: the new contents of the file
1663   @type mode: int
1664   @param mode: the mode to give the file (can be None)
1665   @type uid: int
1666   @param uid: the owner of the file (can be -1 for default)
1667   @type gid: int
1668   @param gid: the group of the file (can be -1 for default)
1669   @type atime: float
1670   @param atime: the atime to set on the file (can be None)
1671   @type mtime: float
1672   @param mtime: the mtime to set on the file (can be None)
1673   @rtype: None
1674
1675   """
1676   if not os.path.isabs(file_name):
1677     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1678
1679   if file_name not in _ALLOWED_UPLOAD_FILES:
1680     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1681           file_name)
1682
1683   raw_data = _Decompress(data)
1684
1685   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1686                   atime=atime, mtime=mtime)
1687
1688
1689 def WriteSsconfFiles(values):
1690   """Update all ssconf files.
1691
1692   Wrapper around the SimpleStore.WriteFiles.
1693
1694   """
1695   ssconf.SimpleStore().WriteFiles(values)
1696
1697
1698 def _ErrnoOrStr(err):
1699   """Format an EnvironmentError exception.
1700
1701   If the L{err} argument has an errno attribute, it will be looked up
1702   and converted into a textual C{E...} description. Otherwise the
1703   string representation of the error will be returned.
1704
1705   @type err: L{EnvironmentError}
1706   @param err: the exception to format
1707
1708   """
1709   if hasattr(err, 'errno'):
1710     detail = errno.errorcode[err.errno]
1711   else:
1712     detail = str(err)
1713   return detail
1714
1715
1716 def _OSOndiskAPIVersion(os_dir):
1717   """Compute and return the API version of a given OS.
1718
1719   This function will try to read the API version of the OS residing in
1720   the 'os_dir' directory.
1721
1722   @type os_dir: str
1723   @param os_dir: the directory in which we should look for the OS
1724   @rtype: tuple
1725   @return: tuple (status, data) with status denoting the validity and
1726       data holding either the vaid versions or an error message
1727
1728   """
1729   api_file = utils.PathJoin(os_dir, constants.OS_API_FILE)
1730
1731   try:
1732     st = os.stat(api_file)
1733   except EnvironmentError, err:
1734     return False, ("Required file '%s' not found under path %s: %s" %
1735                    (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1736
1737   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1738     return False, ("File '%s' in %s is not a regular file" %
1739                    (constants.OS_API_FILE, os_dir))
1740
1741   try:
1742     api_versions = utils.ReadFile(api_file).splitlines()
1743   except EnvironmentError, err:
1744     return False, ("Error while reading the API version file at %s: %s" %
1745                    (api_file, _ErrnoOrStr(err)))
1746
1747   try:
1748     api_versions = [int(version.strip()) for version in api_versions]
1749   except (TypeError, ValueError), err:
1750     return False, ("API version(s) can't be converted to integer: %s" %
1751                    str(err))
1752
1753   return True, api_versions
1754
1755
1756 def DiagnoseOS(top_dirs=None):
1757   """Compute the validity for all OSes.
1758
1759   @type top_dirs: list
1760   @param top_dirs: the list of directories in which to
1761       search (if not given defaults to
1762       L{constants.OS_SEARCH_PATH})
1763   @rtype: list of L{objects.OS}
1764   @return: a list of tuples (name, path, status, diagnose, variants)
1765       for all (potential) OSes under all search paths, where:
1766           - name is the (potential) OS name
1767           - path is the full path to the OS
1768           - status True/False is the validity of the OS
1769           - diagnose is the error message for an invalid OS, otherwise empty
1770           - variants is a list of supported OS variants, if any
1771
1772   """
1773   if top_dirs is None:
1774     top_dirs = constants.OS_SEARCH_PATH
1775
1776   result = []
1777   for dir_name in top_dirs:
1778     if os.path.isdir(dir_name):
1779       try:
1780         f_names = utils.ListVisibleFiles(dir_name)
1781       except EnvironmentError, err:
1782         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1783         break
1784       for name in f_names:
1785         os_path = utils.PathJoin(dir_name, name)
1786         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1787         if status:
1788           diagnose = ""
1789           variants = os_inst.supported_variants
1790         else:
1791           diagnose = os_inst
1792           variants = []
1793         result.append((name, os_path, status, diagnose, variants))
1794
1795   return result
1796
1797
1798 def _TryOSFromDisk(name, base_dir=None):
1799   """Create an OS instance from disk.
1800
1801   This function will return an OS instance if the given name is a
1802   valid OS name.
1803
1804   @type base_dir: string
1805   @keyword base_dir: Base directory containing OS installations.
1806                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1807   @rtype: tuple
1808   @return: success and either the OS instance if we find a valid one,
1809       or error message
1810
1811   """
1812   if base_dir is None:
1813     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1814   else:
1815     os_dir = utils.FindFile(name, [base_dir], os.path.isdir)
1816
1817   if os_dir is None:
1818     return False, "Directory for OS %s not found in search path" % name
1819
1820   status, api_versions = _OSOndiskAPIVersion(os_dir)
1821   if not status:
1822     # push the error up
1823     return status, api_versions
1824
1825   if not constants.OS_API_VERSIONS.intersection(api_versions):
1826     return False, ("API version mismatch for path '%s': found %s, want %s." %
1827                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1828
1829   # OS Files dictionary, we will populate it with the absolute path names
1830   os_files = dict.fromkeys(constants.OS_SCRIPTS)
1831
1832   if max(api_versions) >= constants.OS_API_V15:
1833     os_files[constants.OS_VARIANTS_FILE] = ''
1834
1835   for filename in os_files:
1836     os_files[filename] = utils.PathJoin(os_dir, filename)
1837
1838     try:
1839       st = os.stat(os_files[filename])
1840     except EnvironmentError, err:
1841       return False, ("File '%s' under path '%s' is missing (%s)" %
1842                      (filename, os_dir, _ErrnoOrStr(err)))
1843
1844     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1845       return False, ("File '%s' under path '%s' is not a regular file" %
1846                      (filename, os_dir))
1847
1848     if filename in constants.OS_SCRIPTS:
1849       if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1850         return False, ("File '%s' under path '%s' is not executable" %
1851                        (filename, os_dir))
1852
1853   variants = None
1854   if constants.OS_VARIANTS_FILE in os_files:
1855     variants_file = os_files[constants.OS_VARIANTS_FILE]
1856     try:
1857       variants = utils.ReadFile(variants_file).splitlines()
1858     except EnvironmentError, err:
1859       return False, ("Error while reading the OS variants file at %s: %s" %
1860                      (variants_file, _ErrnoOrStr(err)))
1861     if not variants:
1862       return False, ("No supported os variant found")
1863
1864   os_obj = objects.OS(name=name, path=os_dir,
1865                       create_script=os_files[constants.OS_SCRIPT_CREATE],
1866                       export_script=os_files[constants.OS_SCRIPT_EXPORT],
1867                       import_script=os_files[constants.OS_SCRIPT_IMPORT],
1868                       rename_script=os_files[constants.OS_SCRIPT_RENAME],
1869                       supported_variants=variants,
1870                       api_versions=api_versions)
1871   return True, os_obj
1872
1873
1874 def OSFromDisk(name, base_dir=None):
1875   """Create an OS instance from disk.
1876
1877   This function will return an OS instance if the given name is a
1878   valid OS name. Otherwise, it will raise an appropriate
1879   L{RPCFail} exception, detailing why this is not a valid OS.
1880
1881   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1882   an exception but returns true/false status data.
1883
1884   @type base_dir: string
1885   @keyword base_dir: Base directory containing OS installations.
1886                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1887   @rtype: L{objects.OS}
1888   @return: the OS instance if we find a valid one
1889   @raise RPCFail: if we don't find a valid OS
1890
1891   """
1892   name_only = name.split("+", 1)[0]
1893   status, payload = _TryOSFromDisk(name_only, base_dir)
1894
1895   if not status:
1896     _Fail(payload)
1897
1898   return payload
1899
1900
1901 def OSEnvironment(instance, inst_os, debug=0):
1902   """Calculate the environment for an os script.
1903
1904   @type instance: L{objects.Instance}
1905   @param instance: target instance for the os script run
1906   @type inst_os: L{objects.OS}
1907   @param inst_os: operating system for which the environment is being built
1908   @type debug: integer
1909   @param debug: debug level (0 or 1, for OS Api 10)
1910   @rtype: dict
1911   @return: dict of environment variables
1912   @raise errors.BlockDeviceError: if the block device
1913       cannot be found
1914
1915   """
1916   result = {}
1917   api_version = \
1918     max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1919   result['OS_API_VERSION'] = '%d' % api_version
1920   result['INSTANCE_NAME'] = instance.name
1921   result['INSTANCE_OS'] = instance.os
1922   result['HYPERVISOR'] = instance.hypervisor
1923   result['DISK_COUNT'] = '%d' % len(instance.disks)
1924   result['NIC_COUNT'] = '%d' % len(instance.nics)
1925   result['DEBUG_LEVEL'] = '%d' % debug
1926   if api_version >= constants.OS_API_V15:
1927     try:
1928       variant = instance.os.split('+', 1)[1]
1929     except IndexError:
1930       variant = inst_os.supported_variants[0]
1931     result['OS_VARIANT'] = variant
1932   for idx, disk in enumerate(instance.disks):
1933     real_disk = _OpenRealBD(disk)
1934     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1935     result['DISK_%d_ACCESS' % idx] = disk.mode
1936     if constants.HV_DISK_TYPE in instance.hvparams:
1937       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1938         instance.hvparams[constants.HV_DISK_TYPE]
1939     if disk.dev_type in constants.LDS_BLOCK:
1940       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1941     elif disk.dev_type == constants.LD_FILE:
1942       result['DISK_%d_BACKEND_TYPE' % idx] = \
1943         'file:%s' % disk.physical_id[0]
1944   for idx, nic in enumerate(instance.nics):
1945     result['NIC_%d_MAC' % idx] = nic.mac
1946     if nic.ip:
1947       result['NIC_%d_IP' % idx] = nic.ip
1948     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1949     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1950       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1951     if nic.nicparams[constants.NIC_LINK]:
1952       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1953     if constants.HV_NIC_TYPE in instance.hvparams:
1954       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1955         instance.hvparams[constants.HV_NIC_TYPE]
1956
1957   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1958     for key, value in source.items():
1959       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1960
1961   return result
1962
1963
1964 def BlockdevGrow(disk, amount):
1965   """Grow a stack of block devices.
1966
1967   This function is called recursively, with the childrens being the
1968   first ones to resize.
1969
1970   @type disk: L{objects.Disk}
1971   @param disk: the disk to be grown
1972   @rtype: (status, result)
1973   @return: a tuple with the status of the operation
1974       (True/False), and the errors message if status
1975       is False
1976
1977   """
1978   r_dev = _RecursiveFindBD(disk)
1979   if r_dev is None:
1980     _Fail("Cannot find block device %s", disk)
1981
1982   try:
1983     r_dev.Grow(amount)
1984   except errors.BlockDeviceError, err:
1985     _Fail("Failed to grow block device: %s", err, exc=True)
1986
1987
1988 def BlockdevSnapshot(disk):
1989   """Create a snapshot copy of a block device.
1990
1991   This function is called recursively, and the snapshot is actually created
1992   just for the leaf lvm backend device.
1993
1994   @type disk: L{objects.Disk}
1995   @param disk: the disk to be snapshotted
1996   @rtype: string
1997   @return: snapshot disk path
1998
1999   """
2000   if disk.dev_type == constants.LD_DRBD8:
2001     if not disk.children:
2002       _Fail("DRBD device '%s' without backing storage cannot be snapshotted",
2003             disk.unique_id)
2004     return BlockdevSnapshot(disk.children[0])
2005   elif disk.dev_type == constants.LD_LV:
2006     r_dev = _RecursiveFindBD(disk)
2007     if r_dev is not None:
2008       # FIXME: choose a saner value for the snapshot size
2009       # let's stay on the safe side and ask for the full size, for now
2010       return r_dev.Snapshot(disk.size)
2011     else:
2012       _Fail("Cannot find block device %s", disk)
2013   else:
2014     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
2015           disk.unique_id, disk.dev_type)
2016
2017
2018 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx, debug):
2019   """Export a block device snapshot to a remote node.
2020
2021   @type disk: L{objects.Disk}
2022   @param disk: the description of the disk to export
2023   @type dest_node: str
2024   @param dest_node: the destination node to export to
2025   @type instance: L{objects.Instance}
2026   @param instance: the instance object to whom the disk belongs
2027   @type cluster_name: str
2028   @param cluster_name: the cluster name, needed for SSH hostalias
2029   @type idx: int
2030   @param idx: the index of the disk in the instance's disk list,
2031       used to export to the OS scripts environment
2032   @type debug: integer
2033   @param debug: debug level, passed to the OS scripts
2034   @rtype: None
2035
2036   """
2037   inst_os = OSFromDisk(instance.os)
2038   export_env = OSEnvironment(instance, inst_os, debug)
2039
2040   export_script = inst_os.export_script
2041
2042   logfile = _InstanceLogName("export", inst_os.name, instance.name)
2043   if not os.path.exists(constants.LOG_OS_DIR):
2044     os.mkdir(constants.LOG_OS_DIR, 0750)
2045
2046   real_disk = _OpenRealBD(disk)
2047
2048   export_env['EXPORT_DEVICE'] = real_disk.dev_path
2049   export_env['EXPORT_INDEX'] = str(idx)
2050
2051   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2052   destfile = disk.physical_id[1]
2053
2054   # the target command is built out of three individual commands,
2055   # which are joined by pipes; we check each individual command for
2056   # valid parameters
2057   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
2058                                inst_os.path, export_script, logfile)
2059
2060   comprcmd = "gzip"
2061
2062   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s",
2063                                 destdir, utils.PathJoin(destdir, destfile))
2064   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
2065                                                    constants.GANETI_RUNAS,
2066                                                    destcmd)
2067
2068   # all commands have been checked, so we're safe to combine them
2069   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
2070
2071   result = utils.RunCmd(["bash", "-c", command], env=export_env)
2072
2073   if result.failed:
2074     _Fail("OS snapshot export command '%s' returned error: %s"
2075           " output: %s", command, result.fail_reason, result.output)
2076
2077
2078 def FinalizeExport(instance, snap_disks):
2079   """Write out the export configuration information.
2080
2081   @type instance: L{objects.Instance}
2082   @param instance: the instance which we export, used for
2083       saving configuration
2084   @type snap_disks: list of L{objects.Disk}
2085   @param snap_disks: list of snapshot block devices, which
2086       will be used to get the actual name of the dump file
2087
2088   @rtype: None
2089
2090   """
2091   destdir = utils.PathJoin(constants.EXPORT_DIR, instance.name + ".new")
2092   finaldestdir = utils.PathJoin(constants.EXPORT_DIR, instance.name)
2093
2094   config = objects.SerializableConfigParser()
2095
2096   config.add_section(constants.INISECT_EXP)
2097   config.set(constants.INISECT_EXP, 'version', '0')
2098   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2099   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2100   config.set(constants.INISECT_EXP, 'os', instance.os)
2101   config.set(constants.INISECT_EXP, 'compression', 'gzip')
2102
2103   config.add_section(constants.INISECT_INS)
2104   config.set(constants.INISECT_INS, 'name', instance.name)
2105   config.set(constants.INISECT_INS, 'memory', '%d' %
2106              instance.beparams[constants.BE_MEMORY])
2107   config.set(constants.INISECT_INS, 'vcpus', '%d' %
2108              instance.beparams[constants.BE_VCPUS])
2109   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2110   config.set(constants.INISECT_INS, 'hypervisor', instance.hypervisor)
2111
2112   nic_total = 0
2113   for nic_count, nic in enumerate(instance.nics):
2114     nic_total += 1
2115     config.set(constants.INISECT_INS, 'nic%d_mac' %
2116                nic_count, '%s' % nic.mac)
2117     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2118     for param in constants.NICS_PARAMETER_TYPES:
2119       config.set(constants.INISECT_INS, 'nic%d_%s' % (nic_count, param),
2120                  '%s' % nic.nicparams.get(param, None))
2121   # TODO: redundant: on load can read nics until it doesn't exist
2122   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2123
2124   disk_total = 0
2125   for disk_count, disk in enumerate(snap_disks):
2126     if disk:
2127       disk_total += 1
2128       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2129                  ('%s' % disk.iv_name))
2130       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2131                  ('%s' % disk.physical_id[1]))
2132       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2133                  ('%d' % disk.size))
2134
2135   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2136
2137   # New-style hypervisor/backend parameters
2138
2139   config.add_section(constants.INISECT_HYP)
2140   for name, value in instance.hvparams.items():
2141     if name not in constants.HVC_GLOBALS:
2142       config.set(constants.INISECT_HYP, name, str(value))
2143
2144   config.add_section(constants.INISECT_BEP)
2145   for name, value in instance.beparams.items():
2146     config.set(constants.INISECT_BEP, name, str(value))
2147
2148   utils.WriteFile(utils.PathJoin(destdir, constants.EXPORT_CONF_FILE),
2149                   data=config.Dumps())
2150   shutil.rmtree(finaldestdir, ignore_errors=True)
2151   shutil.move(destdir, finaldestdir)
2152
2153
2154 def ExportInfo(dest):
2155   """Get export configuration information.
2156
2157   @type dest: str
2158   @param dest: directory containing the export
2159
2160   @rtype: L{objects.SerializableConfigParser}
2161   @return: a serializable config file containing the
2162       export info
2163
2164   """
2165   cff = utils.PathJoin(dest, constants.EXPORT_CONF_FILE)
2166
2167   config = objects.SerializableConfigParser()
2168   config.read(cff)
2169
2170   if (not config.has_section(constants.INISECT_EXP) or
2171       not config.has_section(constants.INISECT_INS)):
2172     _Fail("Export info file doesn't have the required fields")
2173
2174   return config.Dumps()
2175
2176
2177 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name, debug):
2178   """Import an os image into an instance.
2179
2180   @type instance: L{objects.Instance}
2181   @param instance: instance to import the disks into
2182   @type src_node: string
2183   @param src_node: source node for the disk images
2184   @type src_images: list of string
2185   @param src_images: absolute paths of the disk images
2186   @type debug: integer
2187   @param debug: debug level, passed to the OS scripts
2188   @rtype: list of boolean
2189   @return: each boolean represent the success of importing the n-th disk
2190
2191   """
2192   inst_os = OSFromDisk(instance.os)
2193   import_env = OSEnvironment(instance, inst_os, debug)
2194   import_script = inst_os.import_script
2195
2196   logfile = _InstanceLogName("import", instance.os, instance.name)
2197   if not os.path.exists(constants.LOG_OS_DIR):
2198     os.mkdir(constants.LOG_OS_DIR, 0750)
2199
2200   comprcmd = "gunzip"
2201   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2202                                import_script, logfile)
2203
2204   final_result = []
2205   for idx, image in enumerate(src_images):
2206     if image:
2207       destcmd = utils.BuildShellCmd('cat %s', image)
2208       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2209                                                        constants.GANETI_RUNAS,
2210                                                        destcmd)
2211       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2212       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2213       import_env['IMPORT_INDEX'] = str(idx)
2214       result = utils.RunCmd(command, env=import_env)
2215       if result.failed:
2216         logging.error("Disk import command '%s' returned error: %s"
2217                       " output: %s", command, result.fail_reason,
2218                       result.output)
2219         final_result.append("error importing disk %d: %s, %s" %
2220                             (idx, result.fail_reason, result.output[-100]))
2221
2222   if final_result:
2223     _Fail("; ".join(final_result), log=False)
2224
2225
2226 def ListExports():
2227   """Return a list of exports currently available on this machine.
2228
2229   @rtype: list
2230   @return: list of the exports
2231
2232   """
2233   if os.path.isdir(constants.EXPORT_DIR):
2234     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2235   else:
2236     _Fail("No exports directory")
2237
2238
2239 def RemoveExport(export):
2240   """Remove an existing export from the node.
2241
2242   @type export: str
2243   @param export: the name of the export to remove
2244   @rtype: None
2245
2246   """
2247   target = utils.PathJoin(constants.EXPORT_DIR, export)
2248
2249   try:
2250     shutil.rmtree(target)
2251   except EnvironmentError, err:
2252     _Fail("Error while removing the export: %s", err, exc=True)
2253
2254
2255 def BlockdevRename(devlist):
2256   """Rename a list of block devices.
2257
2258   @type devlist: list of tuples
2259   @param devlist: list of tuples of the form  (disk,
2260       new_logical_id, new_physical_id); disk is an
2261       L{objects.Disk} object describing the current disk,
2262       and new logical_id/physical_id is the name we
2263       rename it to
2264   @rtype: boolean
2265   @return: True if all renames succeeded, False otherwise
2266
2267   """
2268   msgs = []
2269   result = True
2270   for disk, unique_id in devlist:
2271     dev = _RecursiveFindBD(disk)
2272     if dev is None:
2273       msgs.append("Can't find device %s in rename" % str(disk))
2274       result = False
2275       continue
2276     try:
2277       old_rpath = dev.dev_path
2278       dev.Rename(unique_id)
2279       new_rpath = dev.dev_path
2280       if old_rpath != new_rpath:
2281         DevCacheManager.RemoveCache(old_rpath)
2282         # FIXME: we should add the new cache information here, like:
2283         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2284         # but we don't have the owner here - maybe parse from existing
2285         # cache? for now, we only lose lvm data when we rename, which
2286         # is less critical than DRBD or MD
2287     except errors.BlockDeviceError, err:
2288       msgs.append("Can't rename device '%s' to '%s': %s" %
2289                   (dev, unique_id, err))
2290       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2291       result = False
2292   if not result:
2293     _Fail("; ".join(msgs))
2294
2295
2296 def _TransformFileStorageDir(file_storage_dir):
2297   """Checks whether given file_storage_dir is valid.
2298
2299   Checks wheter the given file_storage_dir is within the cluster-wide
2300   default file_storage_dir stored in SimpleStore. Only paths under that
2301   directory are allowed.
2302
2303   @type file_storage_dir: str
2304   @param file_storage_dir: the path to check
2305
2306   @return: the normalized path if valid, None otherwise
2307
2308   """
2309   if not constants.ENABLE_FILE_STORAGE:
2310     _Fail("File storage disabled at configure time")
2311   cfg = _GetConfig()
2312   file_storage_dir = os.path.normpath(file_storage_dir)
2313   base_file_storage_dir = cfg.GetFileStorageDir()
2314   if (os.path.commonprefix([file_storage_dir, base_file_storage_dir]) !=
2315       base_file_storage_dir):
2316     _Fail("File storage directory '%s' is not under base file"
2317           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2318   return file_storage_dir
2319
2320
2321 def CreateFileStorageDir(file_storage_dir):
2322   """Create file storage directory.
2323
2324   @type file_storage_dir: str
2325   @param file_storage_dir: directory to create
2326
2327   @rtype: tuple
2328   @return: tuple with first element a boolean indicating wheter dir
2329       creation was successful or not
2330
2331   """
2332   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2333   if os.path.exists(file_storage_dir):
2334     if not os.path.isdir(file_storage_dir):
2335       _Fail("Specified storage dir '%s' is not a directory",
2336             file_storage_dir)
2337   else:
2338     try:
2339       os.makedirs(file_storage_dir, 0750)
2340     except OSError, err:
2341       _Fail("Cannot create file storage directory '%s': %s",
2342             file_storage_dir, err, exc=True)
2343
2344
2345 def RemoveFileStorageDir(file_storage_dir):
2346   """Remove file storage directory.
2347
2348   Remove it only if it's empty. If not log an error and return.
2349
2350   @type file_storage_dir: str
2351   @param file_storage_dir: the directory we should cleanup
2352   @rtype: tuple (success,)
2353   @return: tuple of one element, C{success}, denoting
2354       whether the operation was successful
2355
2356   """
2357   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2358   if os.path.exists(file_storage_dir):
2359     if not os.path.isdir(file_storage_dir):
2360       _Fail("Specified Storage directory '%s' is not a directory",
2361             file_storage_dir)
2362     # deletes dir only if empty, otherwise we want to fail the rpc call
2363     try:
2364       os.rmdir(file_storage_dir)
2365     except OSError, err:
2366       _Fail("Cannot remove file storage directory '%s': %s",
2367             file_storage_dir, err)
2368
2369
2370 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2371   """Rename the file storage directory.
2372
2373   @type old_file_storage_dir: str
2374   @param old_file_storage_dir: the current path
2375   @type new_file_storage_dir: str
2376   @param new_file_storage_dir: the name we should rename to
2377   @rtype: tuple (success,)
2378   @return: tuple of one element, C{success}, denoting
2379       whether the operation was successful
2380
2381   """
2382   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2383   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2384   if not os.path.exists(new_file_storage_dir):
2385     if os.path.isdir(old_file_storage_dir):
2386       try:
2387         os.rename(old_file_storage_dir, new_file_storage_dir)
2388       except OSError, err:
2389         _Fail("Cannot rename '%s' to '%s': %s",
2390               old_file_storage_dir, new_file_storage_dir, err)
2391     else:
2392       _Fail("Specified storage dir '%s' is not a directory",
2393             old_file_storage_dir)
2394   else:
2395     if os.path.exists(old_file_storage_dir):
2396       _Fail("Cannot rename '%s' to '%s': both locations exist",
2397             old_file_storage_dir, new_file_storage_dir)
2398
2399
2400 def _EnsureJobQueueFile(file_name):
2401   """Checks whether the given filename is in the queue directory.
2402
2403   @type file_name: str
2404   @param file_name: the file name we should check
2405   @rtype: None
2406   @raises RPCFail: if the file is not valid
2407
2408   """
2409   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2410   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2411
2412   if not result:
2413     _Fail("Passed job queue file '%s' does not belong to"
2414           " the queue directory '%s'", file_name, queue_dir)
2415
2416
2417 def JobQueueUpdate(file_name, content):
2418   """Updates a file in the queue directory.
2419
2420   This is just a wrapper over L{utils.WriteFile}, with proper
2421   checking.
2422
2423   @type file_name: str
2424   @param file_name: the job file name
2425   @type content: str
2426   @param content: the new job contents
2427   @rtype: boolean
2428   @return: the success of the operation
2429
2430   """
2431   _EnsureJobQueueFile(file_name)
2432
2433   # Write and replace the file atomically
2434   utils.WriteFile(file_name, data=_Decompress(content))
2435
2436
2437 def JobQueueRename(old, new):
2438   """Renames a job queue file.
2439
2440   This is just a wrapper over os.rename with proper checking.
2441
2442   @type old: str
2443   @param old: the old (actual) file name
2444   @type new: str
2445   @param new: the desired file name
2446   @rtype: tuple
2447   @return: the success of the operation and payload
2448
2449   """
2450   _EnsureJobQueueFile(old)
2451   _EnsureJobQueueFile(new)
2452
2453   utils.RenameFile(old, new, mkdir=True)
2454
2455
2456 def JobQueueSetDrainFlag(drain_flag):
2457   """Set the drain flag for the queue.
2458
2459   This will set or unset the queue drain flag.
2460
2461   @type drain_flag: boolean
2462   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2463   @rtype: truple
2464   @return: always True, None
2465   @warning: the function always returns True
2466
2467   """
2468   if drain_flag:
2469     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2470   else:
2471     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2472
2473
2474 def BlockdevClose(instance_name, disks):
2475   """Closes the given block devices.
2476
2477   This means they will be switched to secondary mode (in case of
2478   DRBD).
2479
2480   @param instance_name: if the argument is not empty, the symlinks
2481       of this instance will be removed
2482   @type disks: list of L{objects.Disk}
2483   @param disks: the list of disks to be closed
2484   @rtype: tuple (success, message)
2485   @return: a tuple of success and message, where success
2486       indicates the succes of the operation, and message
2487       which will contain the error details in case we
2488       failed
2489
2490   """
2491   bdevs = []
2492   for cf in disks:
2493     rd = _RecursiveFindBD(cf)
2494     if rd is None:
2495       _Fail("Can't find device %s", cf)
2496     bdevs.append(rd)
2497
2498   msg = []
2499   for rd in bdevs:
2500     try:
2501       rd.Close()
2502     except errors.BlockDeviceError, err:
2503       msg.append(str(err))
2504   if msg:
2505     _Fail("Can't make devices secondary: %s", ",".join(msg))
2506   else:
2507     if instance_name:
2508       _RemoveBlockDevLinks(instance_name, disks)
2509
2510
2511 def ValidateHVParams(hvname, hvparams):
2512   """Validates the given hypervisor parameters.
2513
2514   @type hvname: string
2515   @param hvname: the hypervisor name
2516   @type hvparams: dict
2517   @param hvparams: the hypervisor parameters to be validated
2518   @rtype: None
2519
2520   """
2521   try:
2522     hv_type = hypervisor.GetHypervisor(hvname)
2523     hv_type.ValidateParameters(hvparams)
2524   except errors.HypervisorError, err:
2525     _Fail(str(err), log=False)
2526
2527
2528 def DemoteFromMC():
2529   """Demotes the current node from master candidate role.
2530
2531   """
2532   # try to ensure we're not the master by mistake
2533   master, myself = ssconf.GetMasterAndMyself()
2534   if master == myself:
2535     _Fail("ssconf status shows I'm the master node, will not demote")
2536
2537   result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2538   if not result.failed:
2539     _Fail("The master daemon is running, will not demote")
2540
2541   try:
2542     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2543       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2544   except EnvironmentError, err:
2545     if err.errno != errno.ENOENT:
2546       _Fail("Error while backing up cluster file: %s", err, exc=True)
2547
2548   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2549
2550
2551 def _GetX509Filenames(cryptodir, name):
2552   """Returns the full paths for the private key and certificate.
2553
2554   """
2555   return (utils.PathJoin(cryptodir, name),
2556           utils.PathJoin(cryptodir, name, _X509_KEY_FILE),
2557           utils.PathJoin(cryptodir, name, _X509_CERT_FILE))
2558
2559
2560 def CreateX509Certificate(validity, cryptodir=constants.CRYPTO_KEYS_DIR):
2561   """Creates a new X509 certificate for SSL/TLS.
2562
2563   @type validity: int
2564   @param validity: Validity in seconds
2565   @rtype: tuple; (string, string)
2566   @return: Certificate name and public part
2567
2568   """
2569   (key_pem, cert_pem) = \
2570     utils.GenerateSelfSignedX509Cert(utils.HostInfo.SysName(),
2571                                      min(validity, _MAX_SSL_CERT_VALIDITY))
2572
2573   cert_dir = tempfile.mkdtemp(dir=cryptodir,
2574                               prefix="x509-%s-" % utils.TimestampForFilename())
2575   try:
2576     name = os.path.basename(cert_dir)
2577     assert len(name) > 5
2578
2579     (_, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2580
2581     utils.WriteFile(key_file, mode=0400, data=key_pem)
2582     utils.WriteFile(cert_file, mode=0400, data=cert_pem)
2583
2584     # Never return private key as it shouldn't leave the node
2585     return (name, cert_pem)
2586   except Exception:
2587     shutil.rmtree(cert_dir, ignore_errors=True)
2588     raise
2589
2590
2591 def RemoveX509Certificate(name, cryptodir=constants.CRYPTO_KEYS_DIR):
2592   """Removes a X509 certificate.
2593
2594   @type name: string
2595   @param name: Certificate name
2596
2597   """
2598   (cert_dir, key_file, cert_file) = _GetX509Filenames(cryptodir, name)
2599
2600   utils.RemoveFile(key_file)
2601   utils.RemoveFile(cert_file)
2602
2603   try:
2604     os.rmdir(cert_dir)
2605   except EnvironmentError, err:
2606     _Fail("Cannot remove certificate directory '%s': %s",
2607           cert_dir, err)
2608
2609
2610 def _GetImportExportIoCommand(instance, mode, ieio, ieargs):
2611   """Returns the command for the requested input/output.
2612
2613   @type instance: L{objects.Instance}
2614   @param instance: The instance object
2615   @param mode: Import/export mode
2616   @param ieio: Input/output type
2617   @param ieargs: Input/output arguments
2618
2619   """
2620   assert mode in (constants.IEM_IMPORT, constants.IEM_EXPORT)
2621
2622   env = None
2623   prefix = None
2624   suffix = None
2625
2626   if ieio == constants.IEIO_FILE:
2627     (filename, ) = ieargs
2628
2629     if not utils.IsNormAbsPath(filename):
2630       _Fail("Path '%s' is not normalized or absolute", filename)
2631
2632     directory = os.path.normpath(os.path.dirname(filename))
2633
2634     if (os.path.commonprefix([constants.EXPORT_DIR, directory]) !=
2635         constants.EXPORT_DIR):
2636       _Fail("File '%s' is not under exports directory '%s'",
2637             filename, constants.EXPORT_DIR)
2638
2639     # Create directory
2640     utils.Makedirs(directory, mode=0750)
2641
2642     quoted_filename = utils.ShellQuote(filename)
2643
2644     if mode == constants.IEM_IMPORT:
2645       suffix = "> %s" % quoted_filename
2646     elif mode == constants.IEM_EXPORT:
2647       suffix = "< %s" % quoted_filename
2648
2649   elif ieio == constants.IEIO_RAW_DISK:
2650     (disk, ) = ieargs
2651
2652     real_disk = _OpenRealBD(disk)
2653
2654     if mode == constants.IEM_IMPORT:
2655       # we set here a smaller block size as, due to transport buffering, more
2656       # than 64-128k will mostly ignored; we use nocreat to fail if the device
2657       # is not already there or we pass a wrong path; we use notrunc to no
2658       # attempt truncate on an LV device; we use oflag=dsync to not buffer too
2659       # much memory; this means that at best, we flush every 64k, which will
2660       # not be very fast
2661       suffix = utils.BuildShellCmd(("| dd of=%s conv=nocreat,notrunc"
2662                                     " bs=%s oflag=dsync"),
2663                                     real_disk.dev_path,
2664                                     str(64 * 1024))
2665
2666     elif mode == constants.IEM_EXPORT:
2667       # the block size on the read dd is 1MiB to match our units
2668       prefix = utils.BuildShellCmd("dd if=%s bs=%s count=%s |",
2669                                    real_disk.dev_path,
2670                                    str(1024 * 1024), # 1 MB
2671                                    str(disk.size))
2672
2673   elif ieio == constants.IEIO_SCRIPT:
2674     (disk, disk_index, ) = ieargs
2675
2676     assert isinstance(disk_index, (int, long))
2677
2678     real_disk = _OpenRealBD(disk)
2679
2680     inst_os = OSFromDisk(instance.os)
2681     env = OSEnvironment(instance, inst_os)
2682
2683     if mode == constants.IEM_IMPORT:
2684       env["IMPORT_DEVICE"] = env["DISK_%d_PATH" % disk_index]
2685       env["IMPORT_INDEX"] = str(disk_index)
2686       script = inst_os.import_script
2687
2688     elif mode == constants.IEM_EXPORT:
2689       env["EXPORT_DEVICE"] = real_disk.dev_path
2690       env["EXPORT_INDEX"] = str(disk_index)
2691       script = inst_os.export_script
2692
2693     # TODO: Pass special environment only to script
2694     script_cmd = utils.BuildShellCmd("( cd %s && %s; )", inst_os.path, script)
2695
2696     if mode == constants.IEM_IMPORT:
2697       suffix = "| %s" % script_cmd
2698
2699     elif mode == constants.IEM_EXPORT:
2700       prefix = "%s |" % script_cmd
2701
2702   else:
2703     _Fail("Invalid %s I/O mode %r", mode, ieio)
2704
2705   return (env, prefix, suffix)
2706
2707
2708 def _CreateImportExportStatusDir(prefix):
2709   """Creates status directory for import/export.
2710
2711   """
2712   return tempfile.mkdtemp(dir=constants.IMPORT_EXPORT_DIR,
2713                           prefix=("%s-%s-" %
2714                                   (prefix, utils.TimestampForFilename())))
2715
2716
2717 def StartImportExportDaemon(mode, key_name, ca, host, port, instance,
2718                             ieio, ieioargs):
2719   """Starts an import or export daemon.
2720
2721   @param mode: Import/output mode
2722   @type key_name: string
2723   @param key_name: RSA key name (None to use cluster certificate)
2724   @type ca: string:
2725   @param ca: Remote CA in PEM format (None to use cluster certificate)
2726   @type host: string
2727   @param host: Remote host for export (None for import)
2728   @type port: int
2729   @param port: Remote port for export (None for import)
2730   @type instance: L{objects.Instance}
2731   @param instance: Instance object
2732   @param ieio: Input/output type
2733   @param ieioargs: Input/output arguments
2734
2735   """
2736   if mode == constants.IEM_IMPORT:
2737     prefix = "import"
2738
2739     if not (host is None and port is None):
2740       _Fail("Can not specify host or port on import")
2741
2742   elif mode == constants.IEM_EXPORT:
2743     prefix = "export"
2744
2745     if host is None or port is None:
2746       _Fail("Host and port must be specified for an export")
2747
2748   else:
2749     _Fail("Invalid mode %r", mode)
2750
2751   if (key_name is None) ^ (ca is None):
2752     _Fail("Cluster certificate can only be used for both key and CA")
2753
2754   (cmd_env, cmd_prefix, cmd_suffix) = \
2755     _GetImportExportIoCommand(instance, mode, ieio, ieioargs)
2756
2757   if key_name is None:
2758     # Use server.pem
2759     key_path = constants.NODED_CERT_FILE
2760     cert_path = constants.NODED_CERT_FILE
2761     assert ca is None
2762   else:
2763     (_, key_path, cert_path) = _GetX509Filenames(constants.CRYPTO_KEYS_DIR,
2764                                                  key_name)
2765     assert ca is not None
2766
2767   status_dir = _CreateImportExportStatusDir(prefix)
2768   try:
2769     status_file = utils.PathJoin(status_dir, _IES_STATUS_FILE)
2770     pid_file = utils.PathJoin(status_dir, _IES_PID_FILE)
2771
2772     if ca is None:
2773       # Use server.pem
2774       # TODO: If socat runs as a non-root user, this might need to be copied to
2775       # a separate file
2776       ca_path = constants.NODED_CERT_FILE
2777     else:
2778       ca_path = utils.PathJoin(status_dir, _IES_CA_FILE)
2779       utils.WriteFile(ca_path, data=ca, mode=0400)
2780
2781     cmd = [
2782       constants.IMPORT_EXPORT_DAEMON,
2783       status_file, mode,
2784       "--key=%s" % key_path,
2785       "--cert=%s" % cert_path,
2786       "--ca=%s" % ca_path,
2787       ]
2788
2789     if host:
2790       cmd.append("--host=%s" % host)
2791
2792     if port:
2793       cmd.append("--port=%s" % port)
2794
2795     if cmd_prefix:
2796       cmd.append("--cmd-prefix=%s" % cmd_prefix)
2797
2798     if cmd_suffix:
2799       cmd.append("--cmd-suffix=%s" % cmd_suffix)
2800
2801     logfile = _InstanceLogName(prefix, instance.os, instance.name)
2802
2803     # TODO: Once _InstanceLogName uses tempfile.mkstemp, StartDaemon has
2804     # support for receiving a file descriptor for output
2805     utils.StartDaemon(cmd, env=cmd_env, pidfile=pid_file,
2806                       output=logfile)
2807
2808     # The import/export name is simply the status directory name
2809     return os.path.basename(status_dir)
2810
2811   except Exception:
2812     shutil.rmtree(status_dir, ignore_errors=True)
2813     raise
2814
2815
2816 def GetImportExportStatus(names):
2817   """Returns import/export daemon status.
2818
2819   @type names: sequence
2820   @param names: List of names
2821   @rtype: List of dicts
2822   @return: Returns a list of the state of each named import/export or None if a
2823            status couldn't be read
2824
2825   """
2826   result = []
2827
2828   for name in names:
2829     status_file = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name,
2830                                  _IES_STATUS_FILE)
2831
2832     try:
2833       data = utils.ReadFile(status_file)
2834     except EnvironmentError, err:
2835       if err.errno != errno.ENOENT:
2836         raise
2837       data = None
2838
2839     if not data:
2840       result.append(None)
2841       continue
2842
2843     result.append(serializer.LoadJson(data))
2844
2845   return result
2846
2847
2848 def CleanupImportExport(name):
2849   """Cleanup after an import or export.
2850
2851   If the import/export daemon is still running it's killed. Afterwards the
2852   whole status directory is removed.
2853
2854   """
2855   logging.info("Finalizing import/export %s", name)
2856
2857   status_dir = utils.PathJoin(constants.IMPORT_EXPORT_DIR, name)
2858
2859   pid = utils.ReadLockedPidFile(utils.PathJoin(status_dir, _IES_PID_FILE))
2860
2861   if pid:
2862     logging.info("Import/export %s is still running with PID %s",
2863                  name, pid)
2864     utils.KillProcess(pid, waitpid=False)
2865
2866   shutil.rmtree(status_dir, ignore_errors=True)
2867
2868
2869 def _FindDisks(nodes_ip, disks):
2870   """Sets the physical ID on disks and returns the block devices.
2871
2872   """
2873   # set the correct physical ID
2874   my_name = utils.HostInfo().name
2875   for cf in disks:
2876     cf.SetPhysicalID(my_name, nodes_ip)
2877
2878   bdevs = []
2879
2880   for cf in disks:
2881     rd = _RecursiveFindBD(cf)
2882     if rd is None:
2883       _Fail("Can't find device %s", cf)
2884     bdevs.append(rd)
2885   return bdevs
2886
2887
2888 def DrbdDisconnectNet(nodes_ip, disks):
2889   """Disconnects the network on a list of drbd devices.
2890
2891   """
2892   bdevs = _FindDisks(nodes_ip, disks)
2893
2894   # disconnect disks
2895   for rd in bdevs:
2896     try:
2897       rd.DisconnectNet()
2898     except errors.BlockDeviceError, err:
2899       _Fail("Can't change network configuration to standalone mode: %s",
2900             err, exc=True)
2901
2902
2903 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2904   """Attaches the network on a list of drbd devices.
2905
2906   """
2907   bdevs = _FindDisks(nodes_ip, disks)
2908
2909   if multimaster:
2910     for idx, rd in enumerate(bdevs):
2911       try:
2912         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2913       except EnvironmentError, err:
2914         _Fail("Can't create symlink: %s", err)
2915   # reconnect disks, switch to new master configuration and if
2916   # needed primary mode
2917   for rd in bdevs:
2918     try:
2919       rd.AttachNet(multimaster)
2920     except errors.BlockDeviceError, err:
2921       _Fail("Can't change network configuration: %s", err)
2922
2923   # wait until the disks are connected; we need to retry the re-attach
2924   # if the device becomes standalone, as this might happen if the one
2925   # node disconnects and reconnects in a different mode before the
2926   # other node reconnects; in this case, one or both of the nodes will
2927   # decide it has wrong configuration and switch to standalone
2928
2929   def _Attach():
2930     all_connected = True
2931
2932     for rd in bdevs:
2933       stats = rd.GetProcStatus()
2934
2935       all_connected = (all_connected and
2936                        (stats.is_connected or stats.is_in_resync))
2937
2938       if stats.is_standalone:
2939         # peer had different config info and this node became
2940         # standalone, even though this should not happen with the
2941         # new staged way of changing disk configs
2942         try:
2943           rd.AttachNet(multimaster)
2944         except errors.BlockDeviceError, err:
2945           _Fail("Can't change network configuration: %s", err)
2946
2947     if not all_connected:
2948       raise utils.RetryAgain()
2949
2950   try:
2951     # Start with a delay of 100 miliseconds and go up to 5 seconds
2952     utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2953   except utils.RetryTimeout:
2954     _Fail("Timeout in disk reconnecting")
2955
2956   if multimaster:
2957     # change to primary mode
2958     for rd in bdevs:
2959       try:
2960         rd.Open()
2961       except errors.BlockDeviceError, err:
2962         _Fail("Can't change to primary mode: %s", err)
2963
2964
2965 def DrbdWaitSync(nodes_ip, disks):
2966   """Wait until DRBDs have synchronized.
2967
2968   """
2969   def _helper(rd):
2970     stats = rd.GetProcStatus()
2971     if not (stats.is_connected or stats.is_in_resync):
2972       raise utils.RetryAgain()
2973     return stats
2974
2975   bdevs = _FindDisks(nodes_ip, disks)
2976
2977   min_resync = 100
2978   alldone = True
2979   for rd in bdevs:
2980     try:
2981       # poll each second for 15 seconds
2982       stats = utils.Retry(_helper, 1, 15, args=[rd])
2983     except utils.RetryTimeout:
2984       stats = rd.GetProcStatus()
2985       # last check
2986       if not (stats.is_connected or stats.is_in_resync):
2987         _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2988     alldone = alldone and (not stats.is_in_resync)
2989     if stats.sync_percent is not None:
2990       min_resync = min(min_resync, stats.sync_percent)
2991
2992   return (alldone, min_resync)
2993
2994
2995 def PowercycleNode(hypervisor_type):
2996   """Hard-powercycle the node.
2997
2998   Because we need to return first, and schedule the powercycle in the
2999   background, we won't be able to report failures nicely.
3000
3001   """
3002   hyper = hypervisor.GetHypervisor(hypervisor_type)
3003   try:
3004     pid = os.fork()
3005   except OSError:
3006     # if we can't fork, we'll pretend that we're in the child process
3007     pid = 0
3008   if pid > 0:
3009     return "Reboot scheduled in 5 seconds"
3010   time.sleep(5)
3011   hyper.PowercycleNode()
3012
3013
3014 class HooksRunner(object):
3015   """Hook runner.
3016
3017   This class is instantiated on the node side (ganeti-noded) and not
3018   on the master side.
3019
3020   """
3021   def __init__(self, hooks_base_dir=None):
3022     """Constructor for hooks runner.
3023
3024     @type hooks_base_dir: str or None
3025     @param hooks_base_dir: if not None, this overrides the
3026         L{constants.HOOKS_BASE_DIR} (useful for unittests)
3027
3028     """
3029     if hooks_base_dir is None:
3030       hooks_base_dir = constants.HOOKS_BASE_DIR
3031     # yeah, _BASE_DIR is not valid for attributes, we use it like a
3032     # constant
3033     self._BASE_DIR = hooks_base_dir # pylint: disable-msg=C0103
3034
3035   def RunHooks(self, hpath, phase, env):
3036     """Run the scripts in the hooks directory.
3037
3038     @type hpath: str
3039     @param hpath: the path to the hooks directory which
3040         holds the scripts
3041     @type phase: str
3042     @param phase: either L{constants.HOOKS_PHASE_PRE} or
3043         L{constants.HOOKS_PHASE_POST}
3044     @type env: dict
3045     @param env: dictionary with the environment for the hook
3046     @rtype: list
3047     @return: list of 3-element tuples:
3048       - script path
3049       - script result, either L{constants.HKR_SUCCESS} or
3050         L{constants.HKR_FAIL}
3051       - output of the script
3052
3053     @raise errors.ProgrammerError: for invalid input
3054         parameters
3055
3056     """
3057     if phase == constants.HOOKS_PHASE_PRE:
3058       suffix = "pre"
3059     elif phase == constants.HOOKS_PHASE_POST:
3060       suffix = "post"
3061     else:
3062       _Fail("Unknown hooks phase '%s'", phase)
3063
3064
3065     subdir = "%s-%s.d" % (hpath, suffix)
3066     dir_name = utils.PathJoin(self._BASE_DIR, subdir)
3067
3068     results = []
3069
3070     if not os.path.isdir(dir_name):
3071       # for non-existing/non-dirs, we simply exit instead of logging a
3072       # warning at every operation
3073       return results
3074
3075     runparts_results = utils.RunParts(dir_name, env=env, reset_env=True)
3076
3077     for (relname, relstatus, runresult)  in runparts_results:
3078       if relstatus == constants.RUNPARTS_SKIP:
3079         rrval = constants.HKR_SKIP
3080         output = ""
3081       elif relstatus == constants.RUNPARTS_ERR:
3082         rrval = constants.HKR_FAIL
3083         output = "Hook script execution error: %s" % runresult
3084       elif relstatus == constants.RUNPARTS_RUN:
3085         if runresult.failed:
3086           rrval = constants.HKR_FAIL
3087         else:
3088           rrval = constants.HKR_SUCCESS
3089         output = utils.SafeEncode(runresult.output.strip())
3090       results.append(("%s/%s" % (subdir, relname), rrval, output))
3091
3092     return results
3093
3094
3095 class IAllocatorRunner(object):
3096   """IAllocator runner.
3097
3098   This class is instantiated on the node side (ganeti-noded) and not on
3099   the master side.
3100
3101   """
3102   @staticmethod
3103   def Run(name, idata):
3104     """Run an iallocator script.
3105
3106     @type name: str
3107     @param name: the iallocator script name
3108     @type idata: str
3109     @param idata: the allocator input data
3110
3111     @rtype: tuple
3112     @return: two element tuple of:
3113        - status
3114        - either error message or stdout of allocator (for success)
3115
3116     """
3117     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
3118                                   os.path.isfile)
3119     if alloc_script is None:
3120       _Fail("iallocator module '%s' not found in the search path", name)
3121
3122     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
3123     try:
3124       os.write(fd, idata)
3125       os.close(fd)
3126       result = utils.RunCmd([alloc_script, fin_name])
3127       if result.failed:
3128         _Fail("iallocator module '%s' failed: %s, output '%s'",
3129               name, result.fail_reason, result.output)
3130     finally:
3131       os.unlink(fin_name)
3132
3133     return result.stdout
3134
3135
3136 class DevCacheManager(object):
3137   """Simple class for managing a cache of block device information.
3138
3139   """
3140   _DEV_PREFIX = "/dev/"
3141   _ROOT_DIR = constants.BDEV_CACHE_DIR
3142
3143   @classmethod
3144   def _ConvertPath(cls, dev_path):
3145     """Converts a /dev/name path to the cache file name.
3146
3147     This replaces slashes with underscores and strips the /dev
3148     prefix. It then returns the full path to the cache file.
3149
3150     @type dev_path: str
3151     @param dev_path: the C{/dev/} path name
3152     @rtype: str
3153     @return: the converted path name
3154
3155     """
3156     if dev_path.startswith(cls._DEV_PREFIX):
3157       dev_path = dev_path[len(cls._DEV_PREFIX):]
3158     dev_path = dev_path.replace("/", "_")
3159     fpath = utils.PathJoin(cls._ROOT_DIR, "bdev_%s" % dev_path)
3160     return fpath
3161
3162   @classmethod
3163   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
3164     """Updates the cache information for a given device.
3165
3166     @type dev_path: str
3167     @param dev_path: the pathname of the device
3168     @type owner: str
3169     @param owner: the owner (instance name) of the device
3170     @type on_primary: bool
3171     @param on_primary: whether this is the primary
3172         node nor not
3173     @type iv_name: str
3174     @param iv_name: the instance-visible name of the
3175         device, as in objects.Disk.iv_name
3176
3177     @rtype: None
3178
3179     """
3180     if dev_path is None:
3181       logging.error("DevCacheManager.UpdateCache got a None dev_path")
3182       return
3183     fpath = cls._ConvertPath(dev_path)
3184     if on_primary:
3185       state = "primary"
3186     else:
3187       state = "secondary"
3188     if iv_name is None:
3189       iv_name = "not_visible"
3190     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
3191     try:
3192       utils.WriteFile(fpath, data=fdata)
3193     except EnvironmentError, err:
3194       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
3195
3196   @classmethod
3197   def RemoveCache(cls, dev_path):
3198     """Remove data for a dev_path.
3199
3200     This is just a wrapper over L{utils.RemoveFile} with a converted
3201     path name and logging.
3202
3203     @type dev_path: str
3204     @param dev_path: the pathname of the device
3205
3206     @rtype: None
3207
3208     """
3209     if dev_path is None:
3210       logging.error("DevCacheManager.RemoveCache got a None dev_path")
3211       return
3212     fpath = cls._ConvertPath(dev_path)
3213     try:
3214       utils.RemoveFile(fpath)
3215     except EnvironmentError, err:
3216       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)