Use ReadFile/WriteFile in more places
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon
23
24 @var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25      the L{UploadFile} function
26
27 """
28
29
30 import os
31 import os.path
32 import shutil
33 import time
34 import stat
35 import errno
36 import re
37 import subprocess
38 import random
39 import logging
40 import tempfile
41 import zlib
42 import base64
43
44 from ganeti import errors
45 from ganeti import utils
46 from ganeti import ssh
47 from ganeti import hypervisor
48 from ganeti import constants
49 from ganeti import bdev
50 from ganeti import objects
51 from ganeti import ssconf
52
53
54 _BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55
56
57 class RPCFail(Exception):
58   """Class denoting RPC failure.
59
60   Its argument is the error message.
61
62   """
63
64
65 def _Fail(msg, *args, **kwargs):
66   """Log an error and the raise an RPCFail exception.
67
68   This exception is then handled specially in the ganeti daemon and
69   turned into a 'failed' return type. As such, this function is a
70   useful shortcut for logging the error and returning it to the master
71   daemon.
72
73   @type msg: string
74   @param msg: the text of the exception
75   @raise RPCFail
76
77   """
78   if args:
79     msg = msg % args
80   if "log" not in kwargs or kwargs["log"]: # if we should log this error
81     if "exc" in kwargs and kwargs["exc"]:
82       logging.exception(msg)
83     else:
84       logging.error(msg)
85   raise RPCFail(msg)
86
87
88 def _GetConfig():
89   """Simple wrapper to return a SimpleStore.
90
91   @rtype: L{ssconf.SimpleStore}
92   @return: a SimpleStore instance
93
94   """
95   return ssconf.SimpleStore()
96
97
98 def _GetSshRunner(cluster_name):
99   """Simple wrapper to return an SshRunner.
100
101   @type cluster_name: str
102   @param cluster_name: the cluster name, which is needed
103       by the SshRunner constructor
104   @rtype: L{ssh.SshRunner}
105   @return: an SshRunner instance
106
107   """
108   return ssh.SshRunner(cluster_name)
109
110
111 def _Decompress(data):
112   """Unpacks data compressed by the RPC client.
113
114   @type data: list or tuple
115   @param data: Data sent by RPC client
116   @rtype: str
117   @return: Decompressed data
118
119   """
120   assert isinstance(data, (list, tuple))
121   assert len(data) == 2
122   (encoding, content) = data
123   if encoding == constants.RPC_ENCODING_NONE:
124     return content
125   elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126     return zlib.decompress(base64.b64decode(content))
127   else:
128     raise AssertionError("Unknown data encoding")
129
130
131 def _CleanDirectory(path, exclude=None):
132   """Removes all regular files in a directory.
133
134   @type path: str
135   @param path: the directory to clean
136   @type exclude: list
137   @param exclude: list of files to be excluded, defaults
138       to the empty list
139
140   """
141   if not os.path.isdir(path):
142     return
143   if exclude is None:
144     exclude = []
145   else:
146     # Normalize excluded paths
147     exclude = [os.path.normpath(i) for i in exclude]
148
149   for rel_name in utils.ListVisibleFiles(path):
150     full_name = os.path.normpath(os.path.join(path, rel_name))
151     if full_name in exclude:
152       continue
153     if os.path.isfile(full_name) and not os.path.islink(full_name):
154       utils.RemoveFile(full_name)
155
156
157 def _BuildUploadFileList():
158   """Build the list of allowed upload files.
159
160   This is abstracted so that it's built only once at module import time.
161
162   """
163   allowed_files = set([
164     constants.CLUSTER_CONF_FILE,
165     constants.ETC_HOSTS,
166     constants.SSH_KNOWN_HOSTS_FILE,
167     constants.VNC_PASSWORD_FILE,
168     constants.RAPI_CERT_FILE,
169     constants.RAPI_USERS_FILE,
170     constants.HMAC_CLUSTER_KEY,
171     ])
172
173   for hv_name in constants.HYPER_TYPES:
174     hv_class = hypervisor.GetHypervisorClass(hv_name)
175     allowed_files.update(hv_class.GetAncillaryFiles())
176
177   return frozenset(allowed_files)
178
179
180 _ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181
182
183 def JobQueuePurge():
184   """Removes job queue files and archived jobs.
185
186   @rtype: tuple
187   @return: True, None
188
189   """
190   _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192
193
194 def GetMasterInfo():
195   """Returns master information.
196
197   This is an utility function to compute master information, either
198   for consumption here or from the node daemon.
199
200   @rtype: tuple
201   @return: master_netdev, master_ip, master_name
202   @raise RPCFail: in case of errors
203
204   """
205   try:
206     cfg = _GetConfig()
207     master_netdev = cfg.GetMasterNetdev()
208     master_ip = cfg.GetMasterIP()
209     master_node = cfg.GetMasterNode()
210   except errors.ConfigurationError, err:
211     _Fail("Cluster configuration incomplete: %s", err, exc=True)
212   return (master_netdev, master_ip, master_node)
213
214
215 def StartMaster(start_daemons, no_voting):
216   """Activate local node as master node.
217
218   The function will always try activate the IP address of the master
219   (unless someone else has it). It will also start the master daemons,
220   based on the start_daemons parameter.
221
222   @type start_daemons: boolean
223   @param start_daemons: whether to also start the master
224       daemons (ganeti-masterd and ganeti-rapi)
225   @type no_voting: boolean
226   @param no_voting: whether to start ganeti-masterd without a node vote
227       (if start_daemons is True), but still non-interactively
228   @rtype: None
229
230   """
231   # GetMasterInfo will raise an exception if not able to return data
232   master_netdev, master_ip, _ = GetMasterInfo()
233
234   err_msgs = []
235   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236     if utils.OwnIpAddress(master_ip):
237       # we already have the ip:
238       logging.debug("Master IP already configured, doing nothing")
239     else:
240       msg = "Someone else has the master ip, not activating"
241       logging.error(msg)
242       err_msgs.append(msg)
243   else:
244     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245                            "dev", master_netdev, "label",
246                            "%s:0" % master_netdev])
247     if result.failed:
248       msg = "Can't activate master IP: %s" % result.output
249       logging.error(msg)
250       err_msgs.append(msg)
251
252     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253                            "-s", master_ip, master_ip])
254     # we'll ignore the exit code of arping
255
256   # and now start the master and rapi daemons
257   if start_daemons:
258     daemons_params = {
259         'ganeti-masterd': [],
260         'ganeti-rapi': [],
261         }
262     if no_voting:
263       daemons_params['ganeti-masterd'].append('--no-voting')
264       daemons_params['ganeti-masterd'].append('--yes-do-it')
265     for daemon in daemons_params:
266       cmd = [daemon]
267       cmd.extend(daemons_params[daemon])
268       result = utils.RunCmd(cmd)
269       if result.failed:
270         msg = "Can't start daemon %s: %s" % (daemon, result.output)
271         logging.error(msg)
272         err_msgs.append(msg)
273
274   if err_msgs:
275     _Fail("; ".join(err_msgs))
276
277
278 def StopMaster(stop_daemons):
279   """Deactivate this node as master.
280
281   The function will always try to deactivate the IP address of the
282   master. It will also stop the master daemons depending on the
283   stop_daemons parameter.
284
285   @type stop_daemons: boolean
286   @param stop_daemons: whether to also stop the master daemons
287       (ganeti-masterd and ganeti-rapi)
288   @rtype: None
289
290   """
291   # TODO: log and report back to the caller the error failures; we
292   # need to decide in which case we fail the RPC for this
293
294   # GetMasterInfo will raise an exception if not able to return data
295   master_netdev, master_ip, _ = GetMasterInfo()
296
297   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298                          "dev", master_netdev])
299   if result.failed:
300     logging.error("Can't remove the master IP, error: %s", result.output)
301     # but otherwise ignore the failure
302
303   if stop_daemons:
304     # stop/kill the rapi and the master daemon
305     for daemon in constants.RAPI, constants.MASTERD:
306       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307
308
309 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310   """Joins this node to the cluster.
311
312   This does the following:
313       - updates the hostkeys of the machine (rsa and dsa)
314       - adds the ssh private key to the user
315       - adds the ssh public key to the users' authorized_keys file
316
317   @type dsa: str
318   @param dsa: the DSA private key to write
319   @type dsapub: str
320   @param dsapub: the DSA public key to write
321   @type rsa: str
322   @param rsa: the RSA private key to write
323   @type rsapub: str
324   @param rsapub: the RSA public key to write
325   @type sshkey: str
326   @param sshkey: the SSH private key to write
327   @type sshpub: str
328   @param sshpub: the SSH public key to write
329   @rtype: boolean
330   @return: the success of the operation
331
332   """
333   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337   for name, content, mode in sshd_keys:
338     utils.WriteFile(name, data=content, mode=mode)
339
340   try:
341     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342                                                     mkdir=True)
343   except errors.OpExecError, err:
344     _Fail("Error while processing user ssh files: %s", err, exc=True)
345
346   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347     utils.WriteFile(name, data=content, mode=0600)
348
349   utils.AddAuthorizedKey(auth_keys, sshpub)
350
351   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352
353
354 def LeaveCluster():
355   """Cleans up and remove the current node.
356
357   This function cleans up and prepares the current node to be removed
358   from the cluster.
359
360   If processing is successful, then it raises an
361   L{errors.QuitGanetiException} which is used as a special case to
362   shutdown the node daemon.
363
364   """
365   _CleanDirectory(constants.DATA_DIR)
366   JobQueuePurge()
367
368   try:
369     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370
371     utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372
373     utils.RemoveFile(priv_key)
374     utils.RemoveFile(pub_key)
375   except errors.OpExecError:
376     logging.exception("Error while processing ssh files")
377
378   # Raise a custom exception (handled in ganeti-noded)
379   raise errors.QuitGanetiException(True, 'Shutdown scheduled')
380
381
382 def GetNodeInfo(vgname, hypervisor_type):
383   """Gives back a hash with different information about the node.
384
385   @type vgname: C{string}
386   @param vgname: the name of the volume group to ask for disk space information
387   @type hypervisor_type: C{str}
388   @param hypervisor_type: the name of the hypervisor to ask for
389       memory information
390   @rtype: C{dict}
391   @return: dictionary with the following keys:
392       - vg_size is the size of the configured volume group in MiB
393       - vg_free is the free size of the volume group in MiB
394       - memory_dom0 is the memory allocated for domain0 in MiB
395       - memory_free is the currently available (free) ram in MiB
396       - memory_total is the total number of ram in MiB
397
398   """
399   outputarray = {}
400   vginfo = _GetVGInfo(vgname)
401   outputarray['vg_size'] = vginfo['vg_size']
402   outputarray['vg_free'] = vginfo['vg_free']
403
404   hyper = hypervisor.GetHypervisor(hypervisor_type)
405   hyp_info = hyper.GetNodeInfo()
406   if hyp_info is not None:
407     outputarray.update(hyp_info)
408
409   outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
410
411   return outputarray
412
413
414 def VerifyNode(what, cluster_name):
415   """Verify the status of the local node.
416
417   Based on the input L{what} parameter, various checks are done on the
418   local node.
419
420   If the I{filelist} key is present, this list of
421   files is checksummed and the file/checksum pairs are returned.
422
423   If the I{nodelist} key is present, we check that we have
424   connectivity via ssh with the target nodes (and check the hostname
425   report).
426
427   If the I{node-net-test} key is present, we check that we have
428   connectivity to the given nodes via both primary IP and, if
429   applicable, secondary IPs.
430
431   @type what: C{dict}
432   @param what: a dictionary of things to check:
433       - filelist: list of files for which to compute checksums
434       - nodelist: list of nodes we should check ssh communication with
435       - node-net-test: list of nodes we should check node daemon port
436         connectivity with
437       - hypervisor: list with hypervisors to run the verify for
438   @rtype: dict
439   @return: a dictionary with the same keys as the input dict, and
440       values representing the result of the checks
441
442   """
443   result = {}
444
445   if constants.NV_HYPERVISOR in what:
446     result[constants.NV_HYPERVISOR] = tmp = {}
447     for hv_name in what[constants.NV_HYPERVISOR]:
448       tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
449
450   if constants.NV_FILELIST in what:
451     result[constants.NV_FILELIST] = utils.FingerprintFiles(
452       what[constants.NV_FILELIST])
453
454   if constants.NV_NODELIST in what:
455     result[constants.NV_NODELIST] = tmp = {}
456     random.shuffle(what[constants.NV_NODELIST])
457     for node in what[constants.NV_NODELIST]:
458       success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
459       if not success:
460         tmp[node] = message
461
462   if constants.NV_NODENETTEST in what:
463     result[constants.NV_NODENETTEST] = tmp = {}
464     my_name = utils.HostInfo().name
465     my_pip = my_sip = None
466     for name, pip, sip in what[constants.NV_NODENETTEST]:
467       if name == my_name:
468         my_pip = pip
469         my_sip = sip
470         break
471     if not my_pip:
472       tmp[my_name] = ("Can't find my own primary/secondary IP"
473                       " in the node list")
474     else:
475       port = utils.GetDaemonPort(constants.NODED)
476       for name, pip, sip in what[constants.NV_NODENETTEST]:
477         fail = []
478         if not utils.TcpPing(pip, port, source=my_pip):
479           fail.append("primary")
480         if sip != pip:
481           if not utils.TcpPing(sip, port, source=my_sip):
482             fail.append("secondary")
483         if fail:
484           tmp[name] = ("failure using the %s interface(s)" %
485                        " and ".join(fail))
486
487   if constants.NV_LVLIST in what:
488     result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
489
490   if constants.NV_INSTANCELIST in what:
491     result[constants.NV_INSTANCELIST] = GetInstanceList(
492       what[constants.NV_INSTANCELIST])
493
494   if constants.NV_VGLIST in what:
495     result[constants.NV_VGLIST] = utils.ListVolumeGroups()
496
497   if constants.NV_VERSION in what:
498     result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
499                                     constants.RELEASE_VERSION)
500
501   if constants.NV_HVINFO in what:
502     hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
503     result[constants.NV_HVINFO] = hyper.GetNodeInfo()
504
505   if constants.NV_DRBDLIST in what:
506     try:
507       used_minors = bdev.DRBD8.GetUsedDevs().keys()
508     except errors.BlockDeviceError, err:
509       logging.warning("Can't get used minors list", exc_info=True)
510       used_minors = str(err)
511     result[constants.NV_DRBDLIST] = used_minors
512
513   return result
514
515
516 def GetVolumeList(vg_name):
517   """Compute list of logical volumes and their size.
518
519   @type vg_name: str
520   @param vg_name: the volume group whose LVs we should list
521   @rtype: dict
522   @return:
523       dictionary of all partions (key) with value being a tuple of
524       their size (in MiB), inactive and online status::
525
526         {'test1': ('20.06', True, True)}
527
528       in case of errors, a string is returned with the error
529       details.
530
531   """
532   lvs = {}
533   sep = '|'
534   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
535                          "--separator=%s" % sep,
536                          "-olv_name,lv_size,lv_attr", vg_name])
537   if result.failed:
538     _Fail("Failed to list logical volumes, lvs output: %s", result.output)
539
540   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
541   for line in result.stdout.splitlines():
542     line = line.strip()
543     match = valid_line_re.match(line)
544     if not match:
545       logging.error("Invalid line returned from lvs output: '%s'", line)
546       continue
547     name, size, attr = match.groups()
548     inactive = attr[4] == '-'
549     online = attr[5] == 'o'
550     lvs[name] = (size, inactive, online)
551
552   return lvs
553
554
555 def ListVolumeGroups():
556   """List the volume groups and their size.
557
558   @rtype: dict
559   @return: dictionary with keys volume name and values the
560       size of the volume
561
562   """
563   return utils.ListVolumeGroups()
564
565
566 def NodeVolumes():
567   """List all volumes on this node.
568
569   @rtype: list
570   @return:
571     A list of dictionaries, each having four keys:
572       - name: the logical volume name,
573       - size: the size of the logical volume
574       - dev: the physical device on which the LV lives
575       - vg: the volume group to which it belongs
576
577     In case of errors, we return an empty list and log the
578     error.
579
580     Note that since a logical volume can live on multiple physical
581     volumes, the resulting list might include a logical volume
582     multiple times.
583
584   """
585   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
586                          "--separator=|",
587                          "--options=lv_name,lv_size,devices,vg_name"])
588   if result.failed:
589     _Fail("Failed to list logical volumes, lvs output: %s",
590           result.output)
591
592   def parse_dev(dev):
593     if '(' in dev:
594       return dev.split('(')[0]
595     else:
596       return dev
597
598   def map_line(line):
599     return {
600       'name': line[0].strip(),
601       'size': line[1].strip(),
602       'dev': parse_dev(line[2].strip()),
603       'vg': line[3].strip(),
604     }
605
606   return [map_line(line.split('|')) for line in result.stdout.splitlines()
607           if line.count('|') >= 3]
608
609
610 def BridgesExist(bridges_list):
611   """Check if a list of bridges exist on the current node.
612
613   @rtype: boolean
614   @return: C{True} if all of them exist, C{False} otherwise
615
616   """
617   missing = []
618   for bridge in bridges_list:
619     if not utils.BridgeExists(bridge):
620       missing.append(bridge)
621
622   if missing:
623     _Fail("Missing bridges %s", ", ".join(missing))
624
625
626 def GetInstanceList(hypervisor_list):
627   """Provides a list of instances.
628
629   @type hypervisor_list: list
630   @param hypervisor_list: the list of hypervisors to query information
631
632   @rtype: list
633   @return: a list of all running instances on the current node
634     - instance1.example.com
635     - instance2.example.com
636
637   """
638   results = []
639   for hname in hypervisor_list:
640     try:
641       names = hypervisor.GetHypervisor(hname).ListInstances()
642       results.extend(names)
643     except errors.HypervisorError, err:
644       _Fail("Error enumerating instances (hypervisor %s): %s",
645             hname, err, exc=True)
646
647   return results
648
649
650 def GetInstanceInfo(instance, hname):
651   """Gives back the information about an instance as a dictionary.
652
653   @type instance: string
654   @param instance: the instance name
655   @type hname: string
656   @param hname: the hypervisor type of the instance
657
658   @rtype: dict
659   @return: dictionary with the following keys:
660       - memory: memory size of instance (int)
661       - state: xen state of instance (string)
662       - time: cpu time of instance (float)
663
664   """
665   output = {}
666
667   iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
668   if iinfo is not None:
669     output['memory'] = iinfo[2]
670     output['state'] = iinfo[4]
671     output['time'] = iinfo[5]
672
673   return output
674
675
676 def GetInstanceMigratable(instance):
677   """Gives whether an instance can be migrated.
678
679   @type instance: L{objects.Instance}
680   @param instance: object representing the instance to be checked.
681
682   @rtype: tuple
683   @return: tuple of (result, description) where:
684       - result: whether the instance can be migrated or not
685       - description: a description of the issue, if relevant
686
687   """
688   hyper = hypervisor.GetHypervisor(instance.hypervisor)
689   iname = instance.name
690   if iname not in hyper.ListInstances():
691     _Fail("Instance %s is not running", iname)
692
693   for idx in range(len(instance.disks)):
694     link_name = _GetBlockDevSymlinkPath(iname, idx)
695     if not os.path.islink(link_name):
696       _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
697
698
699 def GetAllInstancesInfo(hypervisor_list):
700   """Gather data about all instances.
701
702   This is the equivalent of L{GetInstanceInfo}, except that it
703   computes data for all instances at once, thus being faster if one
704   needs data about more than one instance.
705
706   @type hypervisor_list: list
707   @param hypervisor_list: list of hypervisors to query for instance data
708
709   @rtype: dict
710   @return: dictionary of instance: data, with data having the following keys:
711       - memory: memory size of instance (int)
712       - state: xen state of instance (string)
713       - time: cpu time of instance (float)
714       - vcpus: the number of vcpus
715
716   """
717   output = {}
718
719   for hname in hypervisor_list:
720     iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
721     if iinfo:
722       for name, _, memory, vcpus, state, times in iinfo:
723         value = {
724           'memory': memory,
725           'vcpus': vcpus,
726           'state': state,
727           'time': times,
728           }
729         if name in output:
730           # we only check static parameters, like memory and vcpus,
731           # and not state and time which can change between the
732           # invocations of the different hypervisors
733           for key in 'memory', 'vcpus':
734             if value[key] != output[name][key]:
735               _Fail("Instance %s is running twice"
736                     " with different parameters", name)
737         output[name] = value
738
739   return output
740
741
742 def InstanceOsAdd(instance, reinstall):
743   """Add an OS to an instance.
744
745   @type instance: L{objects.Instance}
746   @param instance: Instance whose OS is to be installed
747   @type reinstall: boolean
748   @param reinstall: whether this is an instance reinstall
749   @rtype: None
750
751   """
752   inst_os = OSFromDisk(instance.os)
753
754   create_env = OSEnvironment(instance, inst_os)
755   if reinstall:
756     create_env['INSTANCE_REINSTALL'] = "1"
757
758   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
759                                      instance.name, int(time.time()))
760
761   result = utils.RunCmd([inst_os.create_script], env=create_env,
762                         cwd=inst_os.path, output=logfile,)
763   if result.failed:
764     logging.error("os create command '%s' returned error: %s, logfile: %s,"
765                   " output: %s", result.cmd, result.fail_reason, logfile,
766                   result.output)
767     lines = [utils.SafeEncode(val)
768              for val in utils.TailFile(logfile, lines=20)]
769     _Fail("OS create script failed (%s), last lines in the"
770           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
771
772
773 def RunRenameInstance(instance, old_name):
774   """Run the OS rename script for an instance.
775
776   @type instance: L{objects.Instance}
777   @param instance: Instance whose OS is to be installed
778   @type old_name: string
779   @param old_name: previous instance name
780   @rtype: boolean
781   @return: the success of the operation
782
783   """
784   inst_os = OSFromDisk(instance.os)
785
786   rename_env = OSEnvironment(instance, inst_os)
787   rename_env['OLD_INSTANCE_NAME'] = old_name
788
789   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
790                                            old_name,
791                                            instance.name, int(time.time()))
792
793   result = utils.RunCmd([inst_os.rename_script], env=rename_env,
794                         cwd=inst_os.path, output=logfile)
795
796   if result.failed:
797     logging.error("os create command '%s' returned error: %s output: %s",
798                   result.cmd, result.fail_reason, result.output)
799     lines = [utils.SafeEncode(val)
800              for val in utils.TailFile(logfile, lines=20)]
801     _Fail("OS rename script failed (%s), last lines in the"
802           " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
803
804
805 def _GetVGInfo(vg_name):
806   """Get information about the volume group.
807
808   @type vg_name: str
809   @param vg_name: the volume group which we query
810   @rtype: dict
811   @return:
812     A dictionary with the following keys:
813       - C{vg_size} is the total size of the volume group in MiB
814       - C{vg_free} is the free size of the volume group in MiB
815       - C{pv_count} are the number of physical disks in that VG
816
817     If an error occurs during gathering of data, we return the same dict
818     with keys all set to None.
819
820   """
821   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
822
823   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
824                          "--nosuffix", "--units=m", "--separator=:", vg_name])
825
826   if retval.failed:
827     logging.error("volume group %s not present", vg_name)
828     return retdic
829   valarr = retval.stdout.strip().rstrip(':').split(':')
830   if len(valarr) == 3:
831     try:
832       retdic = {
833         "vg_size": int(round(float(valarr[0]), 0)),
834         "vg_free": int(round(float(valarr[1]), 0)),
835         "pv_count": int(valarr[2]),
836         }
837     except ValueError, err:
838       logging.exception("Fail to parse vgs output: %s", err)
839   else:
840     logging.error("vgs output has the wrong number of fields (expected"
841                   " three): %s", str(valarr))
842   return retdic
843
844
845 def _GetBlockDevSymlinkPath(instance_name, idx):
846   return os.path.join(constants.DISK_LINKS_DIR,
847                       "%s:%d" % (instance_name, idx))
848
849
850 def _SymlinkBlockDev(instance_name, device_path, idx):
851   """Set up symlinks to a instance's block device.
852
853   This is an auxiliary function run when an instance is start (on the primary
854   node) or when an instance is migrated (on the target node).
855
856
857   @param instance_name: the name of the target instance
858   @param device_path: path of the physical block device, on the node
859   @param idx: the disk index
860   @return: absolute path to the disk's symlink
861
862   """
863   link_name = _GetBlockDevSymlinkPath(instance_name, idx)
864   try:
865     os.symlink(device_path, link_name)
866   except OSError, err:
867     if err.errno == errno.EEXIST:
868       if (not os.path.islink(link_name) or
869           os.readlink(link_name) != device_path):
870         os.remove(link_name)
871         os.symlink(device_path, link_name)
872     else:
873       raise
874
875   return link_name
876
877
878 def _RemoveBlockDevLinks(instance_name, disks):
879   """Remove the block device symlinks belonging to the given instance.
880
881   """
882   for idx, _ in enumerate(disks):
883     link_name = _GetBlockDevSymlinkPath(instance_name, idx)
884     if os.path.islink(link_name):
885       try:
886         os.remove(link_name)
887       except OSError:
888         logging.exception("Can't remove symlink '%s'", link_name)
889
890
891 def _GatherAndLinkBlockDevs(instance):
892   """Set up an instance's block device(s).
893
894   This is run on the primary node at instance startup. The block
895   devices must be already assembled.
896
897   @type instance: L{objects.Instance}
898   @param instance: the instance whose disks we shoul assemble
899   @rtype: list
900   @return: list of (disk_object, device_path)
901
902   """
903   block_devices = []
904   for idx, disk in enumerate(instance.disks):
905     device = _RecursiveFindBD(disk)
906     if device is None:
907       raise errors.BlockDeviceError("Block device '%s' is not set up." %
908                                     str(disk))
909     device.Open()
910     try:
911       link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
912     except OSError, e:
913       raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
914                                     e.strerror)
915
916     block_devices.append((disk, link_name))
917
918   return block_devices
919
920
921 def StartInstance(instance):
922   """Start an instance.
923
924   @type instance: L{objects.Instance}
925   @param instance: the instance object
926   @rtype: None
927
928   """
929   running_instances = GetInstanceList([instance.hypervisor])
930
931   if instance.name in running_instances:
932     logging.info("Instance %s already running, not starting", instance.name)
933     return
934
935   try:
936     block_devices = _GatherAndLinkBlockDevs(instance)
937     hyper = hypervisor.GetHypervisor(instance.hypervisor)
938     hyper.StartInstance(instance, block_devices)
939   except errors.BlockDeviceError, err:
940     _Fail("Block device error: %s", err, exc=True)
941   except errors.HypervisorError, err:
942     _RemoveBlockDevLinks(instance.name, instance.disks)
943     _Fail("Hypervisor error: %s", err, exc=True)
944
945
946 def InstanceShutdown(instance):
947   """Shut an instance down.
948
949   @note: this functions uses polling with a hardcoded timeout.
950
951   @type instance: L{objects.Instance}
952   @param instance: the instance object
953   @rtype: None
954
955   """
956   hv_name = instance.hypervisor
957   running_instances = GetInstanceList([hv_name])
958   iname = instance.name
959
960   if iname not in running_instances:
961     logging.info("Instance %s not running, doing nothing", iname)
962     return
963
964   hyper = hypervisor.GetHypervisor(hv_name)
965   try:
966     hyper.StopInstance(instance)
967   except errors.HypervisorError, err:
968     _Fail("Failed to stop instance %s: %s", iname, err)
969
970   # test every 10secs for 2min
971
972   time.sleep(1)
973   for _ in range(11):
974     if instance.name not in GetInstanceList([hv_name]):
975       break
976     time.sleep(10)
977   else:
978     # the shutdown did not succeed
979     logging.error("Shutdown of '%s' unsuccessful, using destroy", iname)
980
981     try:
982       hyper.StopInstance(instance, force=True)
983     except errors.HypervisorError, err:
984       _Fail("Failed to force stop instance %s: %s", iname, err)
985
986     time.sleep(1)
987     if instance.name in GetInstanceList([hv_name]):
988       _Fail("Could not shutdown instance %s even by destroy", iname)
989
990   _RemoveBlockDevLinks(iname, instance.disks)
991
992
993 def InstanceReboot(instance, reboot_type):
994   """Reboot an instance.
995
996   @type instance: L{objects.Instance}
997   @param instance: the instance object to reboot
998   @type reboot_type: str
999   @param reboot_type: the type of reboot, one the following
1000     constants:
1001       - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1002         instance OS, do not recreate the VM
1003       - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1004         restart the VM (at the hypervisor level)
1005       - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1006         not accepted here, since that mode is handled differently, in
1007         cmdlib, and translates into full stop and start of the
1008         instance (instead of a call_instance_reboot RPC)
1009   @rtype: None
1010
1011   """
1012   running_instances = GetInstanceList([instance.hypervisor])
1013
1014   if instance.name not in running_instances:
1015     _Fail("Cannot reboot instance %s that is not running", instance.name)
1016
1017   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1018   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1019     try:
1020       hyper.RebootInstance(instance)
1021     except errors.HypervisorError, err:
1022       _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1023   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1024     try:
1025       InstanceShutdown(instance)
1026       return StartInstance(instance)
1027     except errors.HypervisorError, err:
1028       _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1029   else:
1030     _Fail("Invalid reboot_type received: %s", reboot_type)
1031
1032
1033 def MigrationInfo(instance):
1034   """Gather information about an instance to be migrated.
1035
1036   @type instance: L{objects.Instance}
1037   @param instance: the instance definition
1038
1039   """
1040   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1041   try:
1042     info = hyper.MigrationInfo(instance)
1043   except errors.HypervisorError, err:
1044     _Fail("Failed to fetch migration information: %s", err, exc=True)
1045   return info
1046
1047
1048 def AcceptInstance(instance, info, target):
1049   """Prepare the node to accept an instance.
1050
1051   @type instance: L{objects.Instance}
1052   @param instance: the instance definition
1053   @type info: string/data (opaque)
1054   @param info: migration information, from the source node
1055   @type target: string
1056   @param target: target host (usually ip), on this node
1057
1058   """
1059   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1060   try:
1061     hyper.AcceptInstance(instance, info, target)
1062   except errors.HypervisorError, err:
1063     _Fail("Failed to accept instance: %s", err, exc=True)
1064
1065
1066 def FinalizeMigration(instance, info, success):
1067   """Finalize any preparation to accept an instance.
1068
1069   @type instance: L{objects.Instance}
1070   @param instance: the instance definition
1071   @type info: string/data (opaque)
1072   @param info: migration information, from the source node
1073   @type success: boolean
1074   @param success: whether the migration was a success or a failure
1075
1076   """
1077   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1078   try:
1079     hyper.FinalizeMigration(instance, info, success)
1080   except errors.HypervisorError, err:
1081     _Fail("Failed to finalize migration: %s", err, exc=True)
1082
1083
1084 def MigrateInstance(instance, target, live):
1085   """Migrates an instance to another node.
1086
1087   @type instance: L{objects.Instance}
1088   @param instance: the instance definition
1089   @type target: string
1090   @param target: the target node name
1091   @type live: boolean
1092   @param live: whether the migration should be done live or not (the
1093       interpretation of this parameter is left to the hypervisor)
1094   @rtype: tuple
1095   @return: a tuple of (success, msg) where:
1096       - succes is a boolean denoting the success/failure of the operation
1097       - msg is a string with details in case of failure
1098
1099   """
1100   hyper = hypervisor.GetHypervisor(instance.hypervisor)
1101
1102   try:
1103     hyper.MigrateInstance(instance.name, target, live)
1104   except errors.HypervisorError, err:
1105     _Fail("Failed to migrate instance: %s", err, exc=True)
1106
1107
1108 def BlockdevCreate(disk, size, owner, on_primary, info):
1109   """Creates a block device for an instance.
1110
1111   @type disk: L{objects.Disk}
1112   @param disk: the object describing the disk we should create
1113   @type size: int
1114   @param size: the size of the physical underlying device, in MiB
1115   @type owner: str
1116   @param owner: the name of the instance for which disk is created,
1117       used for device cache data
1118   @type on_primary: boolean
1119   @param on_primary:  indicates if it is the primary node or not
1120   @type info: string
1121   @param info: string that will be sent to the physical device
1122       creation, used for example to set (LVM) tags on LVs
1123
1124   @return: the new unique_id of the device (this can sometime be
1125       computed only after creation), or None. On secondary nodes,
1126       it's not required to return anything.
1127
1128   """
1129   clist = []
1130   if disk.children:
1131     for child in disk.children:
1132       try:
1133         crdev = _RecursiveAssembleBD(child, owner, on_primary)
1134       except errors.BlockDeviceError, err:
1135         _Fail("Can't assemble device %s: %s", child, err)
1136       if on_primary or disk.AssembleOnSecondary():
1137         # we need the children open in case the device itself has to
1138         # be assembled
1139         try:
1140           crdev.Open()
1141         except errors.BlockDeviceError, err:
1142           _Fail("Can't make child '%s' read-write: %s", child, err)
1143       clist.append(crdev)
1144
1145   try:
1146     device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1147   except errors.BlockDeviceError, err:
1148     _Fail("Can't create block device: %s", err)
1149
1150   if on_primary or disk.AssembleOnSecondary():
1151     try:
1152       device.Assemble()
1153     except errors.BlockDeviceError, err:
1154       _Fail("Can't assemble device after creation, unusual event: %s", err)
1155     device.SetSyncSpeed(constants.SYNC_SPEED)
1156     if on_primary or disk.OpenOnSecondary():
1157       try:
1158         device.Open(force=True)
1159       except errors.BlockDeviceError, err:
1160         _Fail("Can't make device r/w after creation, unusual event: %s", err)
1161     DevCacheManager.UpdateCache(device.dev_path, owner,
1162                                 on_primary, disk.iv_name)
1163
1164   device.SetInfo(info)
1165
1166   return device.unique_id
1167
1168
1169 def BlockdevRemove(disk):
1170   """Remove a block device.
1171
1172   @note: This is intended to be called recursively.
1173
1174   @type disk: L{objects.Disk}
1175   @param disk: the disk object we should remove
1176   @rtype: boolean
1177   @return: the success of the operation
1178
1179   """
1180   msgs = []
1181   try:
1182     rdev = _RecursiveFindBD(disk)
1183   except errors.BlockDeviceError, err:
1184     # probably can't attach
1185     logging.info("Can't attach to device %s in remove", disk)
1186     rdev = None
1187   if rdev is not None:
1188     r_path = rdev.dev_path
1189     try:
1190       rdev.Remove()
1191     except errors.BlockDeviceError, err:
1192       msgs.append(str(err))
1193     if not msgs:
1194       DevCacheManager.RemoveCache(r_path)
1195
1196   if disk.children:
1197     for child in disk.children:
1198       try:
1199         BlockdevRemove(child)
1200       except RPCFail, err:
1201         msgs.append(str(err))
1202
1203   if msgs:
1204     _Fail("; ".join(msgs))
1205
1206
1207 def _RecursiveAssembleBD(disk, owner, as_primary):
1208   """Activate a block device for an instance.
1209
1210   This is run on the primary and secondary nodes for an instance.
1211
1212   @note: this function is called recursively.
1213
1214   @type disk: L{objects.Disk}
1215   @param disk: the disk we try to assemble
1216   @type owner: str
1217   @param owner: the name of the instance which owns the disk
1218   @type as_primary: boolean
1219   @param as_primary: if we should make the block device
1220       read/write
1221
1222   @return: the assembled device or None (in case no device
1223       was assembled)
1224   @raise errors.BlockDeviceError: in case there is an error
1225       during the activation of the children or the device
1226       itself
1227
1228   """
1229   children = []
1230   if disk.children:
1231     mcn = disk.ChildrenNeeded()
1232     if mcn == -1:
1233       mcn = 0 # max number of Nones allowed
1234     else:
1235       mcn = len(disk.children) - mcn # max number of Nones
1236     for chld_disk in disk.children:
1237       try:
1238         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1239       except errors.BlockDeviceError, err:
1240         if children.count(None) >= mcn:
1241           raise
1242         cdev = None
1243         logging.error("Error in child activation (but continuing): %s",
1244                       str(err))
1245       children.append(cdev)
1246
1247   if as_primary or disk.AssembleOnSecondary():
1248     r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1249     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1250     result = r_dev
1251     if as_primary or disk.OpenOnSecondary():
1252       r_dev.Open()
1253     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1254                                 as_primary, disk.iv_name)
1255
1256   else:
1257     result = True
1258   return result
1259
1260
1261 def BlockdevAssemble(disk, owner, as_primary):
1262   """Activate a block device for an instance.
1263
1264   This is a wrapper over _RecursiveAssembleBD.
1265
1266   @rtype: str or boolean
1267   @return: a C{/dev/...} path for primary nodes, and
1268       C{True} for secondary nodes
1269
1270   """
1271   try:
1272     result = _RecursiveAssembleBD(disk, owner, as_primary)
1273     if isinstance(result, bdev.BlockDev):
1274       result = result.dev_path
1275   except errors.BlockDeviceError, err:
1276     _Fail("Error while assembling disk: %s", err, exc=True)
1277
1278   return result
1279
1280
1281 def BlockdevShutdown(disk):
1282   """Shut down a block device.
1283
1284   First, if the device is assembled (Attach() is successful), then
1285   the device is shutdown. Then the children of the device are
1286   shutdown.
1287
1288   This function is called recursively. Note that we don't cache the
1289   children or such, as oppossed to assemble, shutdown of different
1290   devices doesn't require that the upper device was active.
1291
1292   @type disk: L{objects.Disk}
1293   @param disk: the description of the disk we should
1294       shutdown
1295   @rtype: None
1296
1297   """
1298   msgs = []
1299   r_dev = _RecursiveFindBD(disk)
1300   if r_dev is not None:
1301     r_path = r_dev.dev_path
1302     try:
1303       r_dev.Shutdown()
1304       DevCacheManager.RemoveCache(r_path)
1305     except errors.BlockDeviceError, err:
1306       msgs.append(str(err))
1307
1308   if disk.children:
1309     for child in disk.children:
1310       try:
1311         BlockdevShutdown(child)
1312       except RPCFail, err:
1313         msgs.append(str(err))
1314
1315   if msgs:
1316     _Fail("; ".join(msgs))
1317
1318
1319 def BlockdevAddchildren(parent_cdev, new_cdevs):
1320   """Extend a mirrored block device.
1321
1322   @type parent_cdev: L{objects.Disk}
1323   @param parent_cdev: the disk to which we should add children
1324   @type new_cdevs: list of L{objects.Disk}
1325   @param new_cdevs: the list of children which we should add
1326   @rtype: None
1327
1328   """
1329   parent_bdev = _RecursiveFindBD(parent_cdev)
1330   if parent_bdev is None:
1331     _Fail("Can't find parent device '%s' in add children", parent_cdev)
1332   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1333   if new_bdevs.count(None) > 0:
1334     _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1335   parent_bdev.AddChildren(new_bdevs)
1336
1337
1338 def BlockdevRemovechildren(parent_cdev, new_cdevs):
1339   """Shrink a mirrored block device.
1340
1341   @type parent_cdev: L{objects.Disk}
1342   @param parent_cdev: the disk from which we should remove children
1343   @type new_cdevs: list of L{objects.Disk}
1344   @param new_cdevs: the list of children which we should remove
1345   @rtype: None
1346
1347   """
1348   parent_bdev = _RecursiveFindBD(parent_cdev)
1349   if parent_bdev is None:
1350     _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1351   devs = []
1352   for disk in new_cdevs:
1353     rpath = disk.StaticDevPath()
1354     if rpath is None:
1355       bd = _RecursiveFindBD(disk)
1356       if bd is None:
1357         _Fail("Can't find device %s while removing children", disk)
1358       else:
1359         devs.append(bd.dev_path)
1360     else:
1361       devs.append(rpath)
1362   parent_bdev.RemoveChildren(devs)
1363
1364
1365 def BlockdevGetmirrorstatus(disks):
1366   """Get the mirroring status of a list of devices.
1367
1368   @type disks: list of L{objects.Disk}
1369   @param disks: the list of disks which we should query
1370   @rtype: disk
1371   @return:
1372       a list of (mirror_done, estimated_time) tuples, which
1373       are the result of L{bdev.BlockDev.CombinedSyncStatus}
1374   @raise errors.BlockDeviceError: if any of the disks cannot be
1375       found
1376
1377   """
1378   stats = []
1379   for dsk in disks:
1380     rbd = _RecursiveFindBD(dsk)
1381     if rbd is None:
1382       _Fail("Can't find device %s", dsk)
1383
1384     stats.append(rbd.CombinedSyncStatus())
1385
1386   return stats
1387
1388
1389 def _RecursiveFindBD(disk):
1390   """Check if a device is activated.
1391
1392   If so, return information about the real device.
1393
1394   @type disk: L{objects.Disk}
1395   @param disk: the disk object we need to find
1396
1397   @return: None if the device can't be found,
1398       otherwise the device instance
1399
1400   """
1401   children = []
1402   if disk.children:
1403     for chdisk in disk.children:
1404       children.append(_RecursiveFindBD(chdisk))
1405
1406   return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1407
1408
1409 def BlockdevFind(disk):
1410   """Check if a device is activated.
1411
1412   If it is, return information about the real device.
1413
1414   @type disk: L{objects.Disk}
1415   @param disk: the disk to find
1416   @rtype: None or objects.BlockDevStatus
1417   @return: None if the disk cannot be found, otherwise a the current
1418            information
1419
1420   """
1421   try:
1422     rbd = _RecursiveFindBD(disk)
1423   except errors.BlockDeviceError, err:
1424     _Fail("Failed to find device: %s", err, exc=True)
1425
1426   if rbd is None:
1427     return None
1428
1429   return rbd.GetSyncStatus()
1430
1431
1432 def BlockdevGetsize(disks):
1433   """Computes the size of the given disks.
1434
1435   If a disk is not found, returns None instead.
1436
1437   @type disks: list of L{objects.Disk}
1438   @param disks: the list of disk to compute the size for
1439   @rtype: list
1440   @return: list with elements None if the disk cannot be found,
1441       otherwise the size
1442
1443   """
1444   result = []
1445   for cf in disks:
1446     try:
1447       rbd = _RecursiveFindBD(cf)
1448     except errors.BlockDeviceError, err:
1449       result.append(None)
1450       continue
1451     if rbd is None:
1452       result.append(None)
1453     else:
1454       result.append(rbd.GetActualSize())
1455   return result
1456
1457
1458 def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1459   """Export a block device to a remote node.
1460
1461   @type disk: L{objects.Disk}
1462   @param disk: the description of the disk to export
1463   @type dest_node: str
1464   @param dest_node: the destination node to export to
1465   @type dest_path: str
1466   @param dest_path: the destination path on the target node
1467   @type cluster_name: str
1468   @param cluster_name: the cluster name, needed for SSH hostalias
1469   @rtype: None
1470
1471   """
1472   real_disk = _RecursiveFindBD(disk)
1473   if real_disk is None:
1474     _Fail("Block device '%s' is not set up", disk)
1475
1476   real_disk.Open()
1477
1478   # the block size on the read dd is 1MiB to match our units
1479   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1480                                "dd if=%s bs=1048576 count=%s",
1481                                real_disk.dev_path, str(disk.size))
1482
1483   # we set here a smaller block size as, due to ssh buffering, more
1484   # than 64-128k will mostly ignored; we use nocreat to fail if the
1485   # device is not already there or we pass a wrong path; we use
1486   # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1487   # to not buffer too much memory; this means that at best, we flush
1488   # every 64k, which will not be very fast
1489   destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1490                                 " oflag=dsync", dest_path)
1491
1492   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1493                                                    constants.GANETI_RUNAS,
1494                                                    destcmd)
1495
1496   # all commands have been checked, so we're safe to combine them
1497   command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1498
1499   result = utils.RunCmd(["bash", "-c", command])
1500
1501   if result.failed:
1502     _Fail("Disk copy command '%s' returned error: %s"
1503           " output: %s", command, result.fail_reason, result.output)
1504
1505
1506 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1507   """Write a file to the filesystem.
1508
1509   This allows the master to overwrite(!) a file. It will only perform
1510   the operation if the file belongs to a list of configuration files.
1511
1512   @type file_name: str
1513   @param file_name: the target file name
1514   @type data: str
1515   @param data: the new contents of the file
1516   @type mode: int
1517   @param mode: the mode to give the file (can be None)
1518   @type uid: int
1519   @param uid: the owner of the file (can be -1 for default)
1520   @type gid: int
1521   @param gid: the group of the file (can be -1 for default)
1522   @type atime: float
1523   @param atime: the atime to set on the file (can be None)
1524   @type mtime: float
1525   @param mtime: the mtime to set on the file (can be None)
1526   @rtype: None
1527
1528   """
1529   if not os.path.isabs(file_name):
1530     _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1531
1532   if file_name not in _ALLOWED_UPLOAD_FILES:
1533     _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1534           file_name)
1535
1536   raw_data = _Decompress(data)
1537
1538   utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1539                   atime=atime, mtime=mtime)
1540
1541
1542 def WriteSsconfFiles(values):
1543   """Update all ssconf files.
1544
1545   Wrapper around the SimpleStore.WriteFiles.
1546
1547   """
1548   ssconf.SimpleStore().WriteFiles(values)
1549
1550
1551 def _ErrnoOrStr(err):
1552   """Format an EnvironmentError exception.
1553
1554   If the L{err} argument has an errno attribute, it will be looked up
1555   and converted into a textual C{E...} description. Otherwise the
1556   string representation of the error will be returned.
1557
1558   @type err: L{EnvironmentError}
1559   @param err: the exception to format
1560
1561   """
1562   if hasattr(err, 'errno'):
1563     detail = errno.errorcode[err.errno]
1564   else:
1565     detail = str(err)
1566   return detail
1567
1568
1569 def _OSOndiskAPIVersion(name, os_dir):
1570   """Compute and return the API version of a given OS.
1571
1572   This function will try to read the API version of the OS given by
1573   the 'name' parameter and residing in the 'os_dir' directory.
1574
1575   @type name: str
1576   @param name: the OS name we should look for
1577   @type os_dir: str
1578   @param os_dir: the directory inwhich we should look for the OS
1579   @rtype: tuple
1580   @return: tuple (status, data) with status denoting the validity and
1581       data holding either the vaid versions or an error message
1582
1583   """
1584   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1585
1586   try:
1587     st = os.stat(api_file)
1588   except EnvironmentError, err:
1589     return False, ("Required file 'ganeti_api_version' file not"
1590                    " found under path %s: %s" % (os_dir, _ErrnoOrStr(err)))
1591
1592   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1593     return False, ("File 'ganeti_api_version' file at %s is not"
1594                    " a regular file" % os_dir)
1595
1596   try:
1597     api_versions = utils.ReadFile(api_file).splitlines()
1598   except EnvironmentError, err:
1599     return False, ("Error while reading the API version file at %s: %s" %
1600                    (api_file, _ErrnoOrStr(err)))
1601
1602   try:
1603     api_versions = [int(version.strip()) for version in api_versions]
1604   except (TypeError, ValueError), err:
1605     return False, ("API version(s) can't be converted to integer: %s" %
1606                    str(err))
1607
1608   return True, api_versions
1609
1610
1611 def DiagnoseOS(top_dirs=None):
1612   """Compute the validity for all OSes.
1613
1614   @type top_dirs: list
1615   @param top_dirs: the list of directories in which to
1616       search (if not given defaults to
1617       L{constants.OS_SEARCH_PATH})
1618   @rtype: list of L{objects.OS}
1619   @return: a list of tuples (name, path, status, diagnose)
1620       for all (potential) OSes under all search paths, where:
1621           - name is the (potential) OS name
1622           - path is the full path to the OS
1623           - status True/False is the validity of the OS
1624           - diagnose is the error message for an invalid OS, otherwise empty
1625
1626   """
1627   if top_dirs is None:
1628     top_dirs = constants.OS_SEARCH_PATH
1629
1630   result = []
1631   for dir_name in top_dirs:
1632     if os.path.isdir(dir_name):
1633       try:
1634         f_names = utils.ListVisibleFiles(dir_name)
1635       except EnvironmentError, err:
1636         logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1637         break
1638       for name in f_names:
1639         os_path = os.path.sep.join([dir_name, name])
1640         status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1641         if status:
1642           diagnose = ""
1643         else:
1644           diagnose = os_inst
1645         result.append((name, os_path, status, diagnose))
1646
1647   return result
1648
1649
1650 def _TryOSFromDisk(name, base_dir=None):
1651   """Create an OS instance from disk.
1652
1653   This function will return an OS instance if the given name is a
1654   valid OS name.
1655
1656   @type base_dir: string
1657   @keyword base_dir: Base directory containing OS installations.
1658                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1659   @rtype: tuple
1660   @return: success and either the OS instance if we find a valid one,
1661       or error message
1662
1663   """
1664   if base_dir is None:
1665     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1666     if os_dir is None:
1667       return False, "Directory for OS %s not found in search path" % name
1668   else:
1669     os_dir = os.path.sep.join([base_dir, name])
1670
1671   status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1672   if not status:
1673     # push the error up
1674     return status, api_versions
1675
1676   if not constants.OS_API_VERSIONS.intersection(api_versions):
1677     return False, ("API version mismatch for path '%s': found %s, want %s." %
1678                    (os_dir, api_versions, constants.OS_API_VERSIONS))
1679
1680   # OS Scripts dictionary, we will populate it with the actual script names
1681   os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1682
1683   for script in os_scripts:
1684     os_scripts[script] = os.path.sep.join([os_dir, script])
1685
1686     try:
1687       st = os.stat(os_scripts[script])
1688     except EnvironmentError, err:
1689       return False, ("Script '%s' under path '%s' is missing (%s)" %
1690                      (script, os_dir, _ErrnoOrStr(err)))
1691
1692     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1693       return False, ("Script '%s' under path '%s' is not executable" %
1694                      (script, os_dir))
1695
1696     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1697       return False, ("Script '%s' under path '%s' is not a regular file" %
1698                      (script, os_dir))
1699
1700   os_obj = objects.OS(name=name, path=os_dir,
1701                       create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1702                       export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1703                       import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1704                       rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1705                       api_versions=api_versions)
1706   return True, os_obj
1707
1708
1709 def OSFromDisk(name, base_dir=None):
1710   """Create an OS instance from disk.
1711
1712   This function will return an OS instance if the given name is a
1713   valid OS name. Otherwise, it will raise an appropriate
1714   L{RPCFail} exception, detailing why this is not a valid OS.
1715
1716   This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1717   an exception but returns true/false status data.
1718
1719   @type base_dir: string
1720   @keyword base_dir: Base directory containing OS installations.
1721                      Defaults to a search in all the OS_SEARCH_PATH dirs.
1722   @rtype: L{objects.OS}
1723   @return: the OS instance if we find a valid one
1724   @raise RPCFail: if we don't find a valid OS
1725
1726   """
1727   status, payload = _TryOSFromDisk(name, base_dir)
1728
1729   if not status:
1730     _Fail(payload)
1731
1732   return payload
1733
1734
1735 def OSEnvironment(instance, os, debug=0):
1736   """Calculate the environment for an os script.
1737
1738   @type instance: L{objects.Instance}
1739   @param instance: target instance for the os script run
1740   @type os: L{objects.OS}
1741   @param os: operating system for which the environment is being built
1742   @type debug: integer
1743   @param debug: debug level (0 or 1, for OS Api 10)
1744   @rtype: dict
1745   @return: dict of environment variables
1746   @raise errors.BlockDeviceError: if the block device
1747       cannot be found
1748
1749   """
1750   result = {}
1751   api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1752   result['OS_API_VERSION'] = '%d' % api_version
1753   result['INSTANCE_NAME'] = instance.name
1754   result['INSTANCE_OS'] = instance.os
1755   result['HYPERVISOR'] = instance.hypervisor
1756   result['DISK_COUNT'] = '%d' % len(instance.disks)
1757   result['NIC_COUNT'] = '%d' % len(instance.nics)
1758   result['DEBUG_LEVEL'] = '%d' % debug
1759   for idx, disk in enumerate(instance.disks):
1760     real_disk = _RecursiveFindBD(disk)
1761     if real_disk is None:
1762       raise errors.BlockDeviceError("Block device '%s' is not set up" %
1763                                     str(disk))
1764     real_disk.Open()
1765     result['DISK_%d_PATH' % idx] = real_disk.dev_path
1766     result['DISK_%d_ACCESS' % idx] = disk.mode
1767     if constants.HV_DISK_TYPE in instance.hvparams:
1768       result['DISK_%d_FRONTEND_TYPE' % idx] = \
1769         instance.hvparams[constants.HV_DISK_TYPE]
1770     if disk.dev_type in constants.LDS_BLOCK:
1771       result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1772     elif disk.dev_type == constants.LD_FILE:
1773       result['DISK_%d_BACKEND_TYPE' % idx] = \
1774         'file:%s' % disk.physical_id[0]
1775   for idx, nic in enumerate(instance.nics):
1776     result['NIC_%d_MAC' % idx] = nic.mac
1777     if nic.ip:
1778       result['NIC_%d_IP' % idx] = nic.ip
1779     result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1780     if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1781       result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1782     if nic.nicparams[constants.NIC_LINK]:
1783       result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1784     if constants.HV_NIC_TYPE in instance.hvparams:
1785       result['NIC_%d_FRONTEND_TYPE' % idx] = \
1786         instance.hvparams[constants.HV_NIC_TYPE]
1787
1788   for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1789     for key, value in source.items():
1790       result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1791
1792   return result
1793
1794 def BlockdevGrow(disk, amount):
1795   """Grow a stack of block devices.
1796
1797   This function is called recursively, with the childrens being the
1798   first ones to resize.
1799
1800   @type disk: L{objects.Disk}
1801   @param disk: the disk to be grown
1802   @rtype: (status, result)
1803   @return: a tuple with the status of the operation
1804       (True/False), and the errors message if status
1805       is False
1806
1807   """
1808   r_dev = _RecursiveFindBD(disk)
1809   if r_dev is None:
1810     _Fail("Cannot find block device %s", disk)
1811
1812   try:
1813     r_dev.Grow(amount)
1814   except errors.BlockDeviceError, err:
1815     _Fail("Failed to grow block device: %s", err, exc=True)
1816
1817
1818 def BlockdevSnapshot(disk):
1819   """Create a snapshot copy of a block device.
1820
1821   This function is called recursively, and the snapshot is actually created
1822   just for the leaf lvm backend device.
1823
1824   @type disk: L{objects.Disk}
1825   @param disk: the disk to be snapshotted
1826   @rtype: string
1827   @return: snapshot disk path
1828
1829   """
1830   if disk.children:
1831     if len(disk.children) == 1:
1832       # only one child, let's recurse on it
1833       return BlockdevSnapshot(disk.children[0])
1834     else:
1835       # more than one child, choose one that matches
1836       for child in disk.children:
1837         if child.size == disk.size:
1838           # return implies breaking the loop
1839           return BlockdevSnapshot(child)
1840   elif disk.dev_type == constants.LD_LV:
1841     r_dev = _RecursiveFindBD(disk)
1842     if r_dev is not None:
1843       # let's stay on the safe side and ask for the full size, for now
1844       return r_dev.Snapshot(disk.size)
1845     else:
1846       _Fail("Cannot find block device %s", disk)
1847   else:
1848     _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1849           disk.unique_id, disk.dev_type)
1850
1851
1852 def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1853   """Export a block device snapshot to a remote node.
1854
1855   @type disk: L{objects.Disk}
1856   @param disk: the description of the disk to export
1857   @type dest_node: str
1858   @param dest_node: the destination node to export to
1859   @type instance: L{objects.Instance}
1860   @param instance: the instance object to whom the disk belongs
1861   @type cluster_name: str
1862   @param cluster_name: the cluster name, needed for SSH hostalias
1863   @type idx: int
1864   @param idx: the index of the disk in the instance's disk list,
1865       used to export to the OS scripts environment
1866   @rtype: None
1867
1868   """
1869   inst_os = OSFromDisk(instance.os)
1870   export_env = OSEnvironment(instance, inst_os)
1871
1872   export_script = inst_os.export_script
1873
1874   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1875                                      instance.name, int(time.time()))
1876   if not os.path.exists(constants.LOG_OS_DIR):
1877     os.mkdir(constants.LOG_OS_DIR, 0750)
1878   real_disk = _RecursiveFindBD(disk)
1879   if real_disk is None:
1880     _Fail("Block device '%s' is not set up", disk)
1881
1882   real_disk.Open()
1883
1884   export_env['EXPORT_DEVICE'] = real_disk.dev_path
1885   export_env['EXPORT_INDEX'] = str(idx)
1886
1887   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1888   destfile = disk.physical_id[1]
1889
1890   # the target command is built out of three individual commands,
1891   # which are joined by pipes; we check each individual command for
1892   # valid parameters
1893   expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1894                                inst_os.path, export_script, logfile)
1895
1896   comprcmd = "gzip"
1897
1898   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1899                                 destdir, destdir, destfile)
1900   remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1901                                                    constants.GANETI_RUNAS,
1902                                                    destcmd)
1903
1904   # all commands have been checked, so we're safe to combine them
1905   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1906
1907   result = utils.RunCmd(["bash", "-c", command], env=export_env)
1908
1909   if result.failed:
1910     _Fail("OS snapshot export command '%s' returned error: %s"
1911           " output: %s", command, result.fail_reason, result.output)
1912
1913
1914 def FinalizeExport(instance, snap_disks):
1915   """Write out the export configuration information.
1916
1917   @type instance: L{objects.Instance}
1918   @param instance: the instance which we export, used for
1919       saving configuration
1920   @type snap_disks: list of L{objects.Disk}
1921   @param snap_disks: list of snapshot block devices, which
1922       will be used to get the actual name of the dump file
1923
1924   @rtype: None
1925
1926   """
1927   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1928   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1929
1930   config = objects.SerializableConfigParser()
1931
1932   config.add_section(constants.INISECT_EXP)
1933   config.set(constants.INISECT_EXP, 'version', '0')
1934   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1935   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1936   config.set(constants.INISECT_EXP, 'os', instance.os)
1937   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1938
1939   config.add_section(constants.INISECT_INS)
1940   config.set(constants.INISECT_INS, 'name', instance.name)
1941   config.set(constants.INISECT_INS, 'memory', '%d' %
1942              instance.beparams[constants.BE_MEMORY])
1943   config.set(constants.INISECT_INS, 'vcpus', '%d' %
1944              instance.beparams[constants.BE_VCPUS])
1945   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1946
1947   nic_total = 0
1948   for nic_count, nic in enumerate(instance.nics):
1949     nic_total += 1
1950     config.set(constants.INISECT_INS, 'nic%d_mac' %
1951                nic_count, '%s' % nic.mac)
1952     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1953     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1954                '%s' % nic.bridge)
1955   # TODO: redundant: on load can read nics until it doesn't exist
1956   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
1957
1958   disk_total = 0
1959   for disk_count, disk in enumerate(snap_disks):
1960     if disk:
1961       disk_total += 1
1962       config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1963                  ('%s' % disk.iv_name))
1964       config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1965                  ('%s' % disk.physical_id[1]))
1966       config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1967                  ('%d' % disk.size))
1968
1969   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1970
1971   utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1972                   data=config.Dumps())
1973   shutil.rmtree(finaldestdir, True)
1974   shutil.move(destdir, finaldestdir)
1975
1976
1977 def ExportInfo(dest):
1978   """Get export configuration information.
1979
1980   @type dest: str
1981   @param dest: directory containing the export
1982
1983   @rtype: L{objects.SerializableConfigParser}
1984   @return: a serializable config file containing the
1985       export info
1986
1987   """
1988   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1989
1990   config = objects.SerializableConfigParser()
1991   config.read(cff)
1992
1993   if (not config.has_section(constants.INISECT_EXP) or
1994       not config.has_section(constants.INISECT_INS)):
1995     _Fail("Export info file doesn't have the required fields")
1996
1997   return config.Dumps()
1998
1999
2000 def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2001   """Import an os image into an instance.
2002
2003   @type instance: L{objects.Instance}
2004   @param instance: instance to import the disks into
2005   @type src_node: string
2006   @param src_node: source node for the disk images
2007   @type src_images: list of string
2008   @param src_images: absolute paths of the disk images
2009   @rtype: list of boolean
2010   @return: each boolean represent the success of importing the n-th disk
2011
2012   """
2013   inst_os = OSFromDisk(instance.os)
2014   import_env = OSEnvironment(instance, inst_os)
2015   import_script = inst_os.import_script
2016
2017   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2018                                         instance.name, int(time.time()))
2019   if not os.path.exists(constants.LOG_OS_DIR):
2020     os.mkdir(constants.LOG_OS_DIR, 0750)
2021
2022   comprcmd = "gunzip"
2023   impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2024                                import_script, logfile)
2025
2026   final_result = []
2027   for idx, image in enumerate(src_images):
2028     if image:
2029       destcmd = utils.BuildShellCmd('cat %s', image)
2030       remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2031                                                        constants.GANETI_RUNAS,
2032                                                        destcmd)
2033       command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2034       import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2035       import_env['IMPORT_INDEX'] = str(idx)
2036       result = utils.RunCmd(command, env=import_env)
2037       if result.failed:
2038         logging.error("Disk import command '%s' returned error: %s"
2039                       " output: %s", command, result.fail_reason,
2040                       result.output)
2041         final_result.append("error importing disk %d: %s, %s" %
2042                             (idx, result.fail_reason, result.output[-100]))
2043
2044   if final_result:
2045     _Fail("; ".join(final_result), log=False)
2046
2047
2048 def ListExports():
2049   """Return a list of exports currently available on this machine.
2050
2051   @rtype: list
2052   @return: list of the exports
2053
2054   """
2055   if os.path.isdir(constants.EXPORT_DIR):
2056     return utils.ListVisibleFiles(constants.EXPORT_DIR)
2057   else:
2058     _Fail("No exports directory")
2059
2060
2061 def RemoveExport(export):
2062   """Remove an existing export from the node.
2063
2064   @type export: str
2065   @param export: the name of the export to remove
2066   @rtype: None
2067
2068   """
2069   target = os.path.join(constants.EXPORT_DIR, export)
2070
2071   try:
2072     shutil.rmtree(target)
2073   except EnvironmentError, err:
2074     _Fail("Error while removing the export: %s", err, exc=True)
2075
2076
2077 def BlockdevRename(devlist):
2078   """Rename a list of block devices.
2079
2080   @type devlist: list of tuples
2081   @param devlist: list of tuples of the form  (disk,
2082       new_logical_id, new_physical_id); disk is an
2083       L{objects.Disk} object describing the current disk,
2084       and new logical_id/physical_id is the name we
2085       rename it to
2086   @rtype: boolean
2087   @return: True if all renames succeeded, False otherwise
2088
2089   """
2090   msgs = []
2091   result = True
2092   for disk, unique_id in devlist:
2093     dev = _RecursiveFindBD(disk)
2094     if dev is None:
2095       msgs.append("Can't find device %s in rename" % str(disk))
2096       result = False
2097       continue
2098     try:
2099       old_rpath = dev.dev_path
2100       dev.Rename(unique_id)
2101       new_rpath = dev.dev_path
2102       if old_rpath != new_rpath:
2103         DevCacheManager.RemoveCache(old_rpath)
2104         # FIXME: we should add the new cache information here, like:
2105         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2106         # but we don't have the owner here - maybe parse from existing
2107         # cache? for now, we only lose lvm data when we rename, which
2108         # is less critical than DRBD or MD
2109     except errors.BlockDeviceError, err:
2110       msgs.append("Can't rename device '%s' to '%s': %s" %
2111                   (dev, unique_id, err))
2112       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2113       result = False
2114   if not result:
2115     _Fail("; ".join(msgs))
2116
2117
2118 def _TransformFileStorageDir(file_storage_dir):
2119   """Checks whether given file_storage_dir is valid.
2120
2121   Checks wheter the given file_storage_dir is within the cluster-wide
2122   default file_storage_dir stored in SimpleStore. Only paths under that
2123   directory are allowed.
2124
2125   @type file_storage_dir: str
2126   @param file_storage_dir: the path to check
2127
2128   @return: the normalized path if valid, None otherwise
2129
2130   """
2131   cfg = _GetConfig()
2132   file_storage_dir = os.path.normpath(file_storage_dir)
2133   base_file_storage_dir = cfg.GetFileStorageDir()
2134   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2135       base_file_storage_dir):
2136     _Fail("File storage directory '%s' is not under base file"
2137           " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2138   return file_storage_dir
2139
2140
2141 def CreateFileStorageDir(file_storage_dir):
2142   """Create file storage directory.
2143
2144   @type file_storage_dir: str
2145   @param file_storage_dir: directory to create
2146
2147   @rtype: tuple
2148   @return: tuple with first element a boolean indicating wheter dir
2149       creation was successful or not
2150
2151   """
2152   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2153   if os.path.exists(file_storage_dir):
2154     if not os.path.isdir(file_storage_dir):
2155       _Fail("Specified storage dir '%s' is not a directory",
2156             file_storage_dir)
2157   else:
2158     try:
2159       os.makedirs(file_storage_dir, 0750)
2160     except OSError, err:
2161       _Fail("Cannot create file storage directory '%s': %s",
2162             file_storage_dir, err, exc=True)
2163
2164
2165 def RemoveFileStorageDir(file_storage_dir):
2166   """Remove file storage directory.
2167
2168   Remove it only if it's empty. If not log an error and return.
2169
2170   @type file_storage_dir: str
2171   @param file_storage_dir: the directory we should cleanup
2172   @rtype: tuple (success,)
2173   @return: tuple of one element, C{success}, denoting
2174       whether the operation was successful
2175
2176   """
2177   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2178   if os.path.exists(file_storage_dir):
2179     if not os.path.isdir(file_storage_dir):
2180       _Fail("Specified Storage directory '%s' is not a directory",
2181             file_storage_dir)
2182     # deletes dir only if empty, otherwise we want to fail the rpc call
2183     try:
2184       os.rmdir(file_storage_dir)
2185     except OSError, err:
2186       _Fail("Cannot remove file storage directory '%s': %s",
2187             file_storage_dir, err)
2188
2189
2190 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2191   """Rename the file storage directory.
2192
2193   @type old_file_storage_dir: str
2194   @param old_file_storage_dir: the current path
2195   @type new_file_storage_dir: str
2196   @param new_file_storage_dir: the name we should rename to
2197   @rtype: tuple (success,)
2198   @return: tuple of one element, C{success}, denoting
2199       whether the operation was successful
2200
2201   """
2202   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2203   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2204   if not os.path.exists(new_file_storage_dir):
2205     if os.path.isdir(old_file_storage_dir):
2206       try:
2207         os.rename(old_file_storage_dir, new_file_storage_dir)
2208       except OSError, err:
2209         _Fail("Cannot rename '%s' to '%s': %s",
2210               old_file_storage_dir, new_file_storage_dir, err)
2211     else:
2212       _Fail("Specified storage dir '%s' is not a directory",
2213             old_file_storage_dir)
2214   else:
2215     if os.path.exists(old_file_storage_dir):
2216       _Fail("Cannot rename '%s' to '%s': both locations exist",
2217             old_file_storage_dir, new_file_storage_dir)
2218
2219
2220 def _EnsureJobQueueFile(file_name):
2221   """Checks whether the given filename is in the queue directory.
2222
2223   @type file_name: str
2224   @param file_name: the file name we should check
2225   @rtype: None
2226   @raises RPCFail: if the file is not valid
2227
2228   """
2229   queue_dir = os.path.normpath(constants.QUEUE_DIR)
2230   result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2231
2232   if not result:
2233     _Fail("Passed job queue file '%s' does not belong to"
2234           " the queue directory '%s'", file_name, queue_dir)
2235
2236
2237 def JobQueueUpdate(file_name, content):
2238   """Updates a file in the queue directory.
2239
2240   This is just a wrapper over L{utils.WriteFile}, with proper
2241   checking.
2242
2243   @type file_name: str
2244   @param file_name: the job file name
2245   @type content: str
2246   @param content: the new job contents
2247   @rtype: boolean
2248   @return: the success of the operation
2249
2250   """
2251   _EnsureJobQueueFile(file_name)
2252
2253   # Write and replace the file atomically
2254   utils.WriteFile(file_name, data=_Decompress(content))
2255
2256
2257 def JobQueueRename(old, new):
2258   """Renames a job queue file.
2259
2260   This is just a wrapper over os.rename with proper checking.
2261
2262   @type old: str
2263   @param old: the old (actual) file name
2264   @type new: str
2265   @param new: the desired file name
2266   @rtype: tuple
2267   @return: the success of the operation and payload
2268
2269   """
2270   _EnsureJobQueueFile(old)
2271   _EnsureJobQueueFile(new)
2272
2273   utils.RenameFile(old, new, mkdir=True)
2274
2275
2276 def JobQueueSetDrainFlag(drain_flag):
2277   """Set the drain flag for the queue.
2278
2279   This will set or unset the queue drain flag.
2280
2281   @type drain_flag: boolean
2282   @param drain_flag: if True, will set the drain flag, otherwise reset it.
2283   @rtype: truple
2284   @return: always True, None
2285   @warning: the function always returns True
2286
2287   """
2288   if drain_flag:
2289     utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2290   else:
2291     utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2292
2293
2294 def BlockdevClose(instance_name, disks):
2295   """Closes the given block devices.
2296
2297   This means they will be switched to secondary mode (in case of
2298   DRBD).
2299
2300   @param instance_name: if the argument is not empty, the symlinks
2301       of this instance will be removed
2302   @type disks: list of L{objects.Disk}
2303   @param disks: the list of disks to be closed
2304   @rtype: tuple (success, message)
2305   @return: a tuple of success and message, where success
2306       indicates the succes of the operation, and message
2307       which will contain the error details in case we
2308       failed
2309
2310   """
2311   bdevs = []
2312   for cf in disks:
2313     rd = _RecursiveFindBD(cf)
2314     if rd is None:
2315       _Fail("Can't find device %s", cf)
2316     bdevs.append(rd)
2317
2318   msg = []
2319   for rd in bdevs:
2320     try:
2321       rd.Close()
2322     except errors.BlockDeviceError, err:
2323       msg.append(str(err))
2324   if msg:
2325     _Fail("Can't make devices secondary: %s", ",".join(msg))
2326   else:
2327     if instance_name:
2328       _RemoveBlockDevLinks(instance_name, disks)
2329
2330
2331 def ValidateHVParams(hvname, hvparams):
2332   """Validates the given hypervisor parameters.
2333
2334   @type hvname: string
2335   @param hvname: the hypervisor name
2336   @type hvparams: dict
2337   @param hvparams: the hypervisor parameters to be validated
2338   @rtype: None
2339
2340   """
2341   try:
2342     hv_type = hypervisor.GetHypervisor(hvname)
2343     hv_type.ValidateParameters(hvparams)
2344   except errors.HypervisorError, err:
2345     _Fail(str(err), log=False)
2346
2347
2348 def DemoteFromMC():
2349   """Demotes the current node from master candidate role.
2350
2351   """
2352   # try to ensure we're not the master by mistake
2353   master, myself = ssconf.GetMasterAndMyself()
2354   if master == myself:
2355     _Fail("ssconf status shows I'm the master node, will not demote")
2356   pid_file = utils.DaemonPidFileName(constants.MASTERD)
2357   if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2358     _Fail("The master daemon is running, will not demote")
2359   try:
2360     if os.path.isfile(constants.CLUSTER_CONF_FILE):
2361       utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2362   except EnvironmentError, err:
2363     if err.errno != errno.ENOENT:
2364       _Fail("Error while backing up cluster file: %s", err, exc=True)
2365   utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2366
2367
2368 def _FindDisks(nodes_ip, disks):
2369   """Sets the physical ID on disks and returns the block devices.
2370
2371   """
2372   # set the correct physical ID
2373   my_name = utils.HostInfo().name
2374   for cf in disks:
2375     cf.SetPhysicalID(my_name, nodes_ip)
2376
2377   bdevs = []
2378
2379   for cf in disks:
2380     rd = _RecursiveFindBD(cf)
2381     if rd is None:
2382       _Fail("Can't find device %s", cf)
2383     bdevs.append(rd)
2384   return bdevs
2385
2386
2387 def DrbdDisconnectNet(nodes_ip, disks):
2388   """Disconnects the network on a list of drbd devices.
2389
2390   """
2391   bdevs = _FindDisks(nodes_ip, disks)
2392
2393   # disconnect disks
2394   for rd in bdevs:
2395     try:
2396       rd.DisconnectNet()
2397     except errors.BlockDeviceError, err:
2398       _Fail("Can't change network configuration to standalone mode: %s",
2399             err, exc=True)
2400
2401
2402 def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2403   """Attaches the network on a list of drbd devices.
2404
2405   """
2406   bdevs = _FindDisks(nodes_ip, disks)
2407
2408   if multimaster:
2409     for idx, rd in enumerate(bdevs):
2410       try:
2411         _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2412       except EnvironmentError, err:
2413         _Fail("Can't create symlink: %s", err)
2414   # reconnect disks, switch to new master configuration and if
2415   # needed primary mode
2416   for rd in bdevs:
2417     try:
2418       rd.AttachNet(multimaster)
2419     except errors.BlockDeviceError, err:
2420       _Fail("Can't change network configuration: %s", err)
2421   # wait until the disks are connected; we need to retry the re-attach
2422   # if the device becomes standalone, as this might happen if the one
2423   # node disconnects and reconnects in a different mode before the
2424   # other node reconnects; in this case, one or both of the nodes will
2425   # decide it has wrong configuration and switch to standalone
2426   RECONNECT_TIMEOUT = 2 * 60
2427   sleep_time = 0.100 # start with 100 miliseconds
2428   timeout_limit = time.time() + RECONNECT_TIMEOUT
2429   while time.time() < timeout_limit:
2430     all_connected = True
2431     for rd in bdevs:
2432       stats = rd.GetProcStatus()
2433       if not (stats.is_connected or stats.is_in_resync):
2434         all_connected = False
2435       if stats.is_standalone:
2436         # peer had different config info and this node became
2437         # standalone, even though this should not happen with the
2438         # new staged way of changing disk configs
2439         try:
2440           rd.AttachNet(multimaster)
2441         except errors.BlockDeviceError, err:
2442           _Fail("Can't change network configuration: %s", err)
2443     if all_connected:
2444       break
2445     time.sleep(sleep_time)
2446     sleep_time = min(5, sleep_time * 1.5)
2447   if not all_connected:
2448     _Fail("Timeout in disk reconnecting")
2449   if multimaster:
2450     # change to primary mode
2451     for rd in bdevs:
2452       try:
2453         rd.Open()
2454       except errors.BlockDeviceError, err:
2455         _Fail("Can't change to primary mode: %s", err)
2456
2457
2458 def DrbdWaitSync(nodes_ip, disks):
2459   """Wait until DRBDs have synchronized.
2460
2461   """
2462   bdevs = _FindDisks(nodes_ip, disks)
2463
2464   min_resync = 100
2465   alldone = True
2466   for rd in bdevs:
2467     stats = rd.GetProcStatus()
2468     if not (stats.is_connected or stats.is_in_resync):
2469       _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2470     alldone = alldone and (not stats.is_in_resync)
2471     if stats.sync_percent is not None:
2472       min_resync = min(min_resync, stats.sync_percent)
2473
2474   return (alldone, min_resync)
2475
2476
2477 def PowercycleNode(hypervisor_type):
2478   """Hard-powercycle the node.
2479
2480   Because we need to return first, and schedule the powercycle in the
2481   background, we won't be able to report failures nicely.
2482
2483   """
2484   hyper = hypervisor.GetHypervisor(hypervisor_type)
2485   try:
2486     pid = os.fork()
2487   except OSError:
2488     # if we can't fork, we'll pretend that we're in the child process
2489     pid = 0
2490   if pid > 0:
2491     return "Reboot scheduled in 5 seconds"
2492   time.sleep(5)
2493   hyper.PowercycleNode()
2494
2495
2496 class HooksRunner(object):
2497   """Hook runner.
2498
2499   This class is instantiated on the node side (ganeti-noded) and not
2500   on the master side.
2501
2502   """
2503   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2504
2505   def __init__(self, hooks_base_dir=None):
2506     """Constructor for hooks runner.
2507
2508     @type hooks_base_dir: str or None
2509     @param hooks_base_dir: if not None, this overrides the
2510         L{constants.HOOKS_BASE_DIR} (useful for unittests)
2511
2512     """
2513     if hooks_base_dir is None:
2514       hooks_base_dir = constants.HOOKS_BASE_DIR
2515     self._BASE_DIR = hooks_base_dir
2516
2517   @staticmethod
2518   def ExecHook(script, env):
2519     """Exec one hook script.
2520
2521     @type script: str
2522     @param script: the full path to the script
2523     @type env: dict
2524     @param env: the environment with which to exec the script
2525     @rtype: tuple (success, message)
2526     @return: a tuple of success and message, where success
2527         indicates the succes of the operation, and message
2528         which will contain the error details in case we
2529         failed
2530
2531     """
2532     # exec the process using subprocess and log the output
2533     fdstdin = None
2534     try:
2535       fdstdin = open("/dev/null", "r")
2536       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2537                                stderr=subprocess.STDOUT, close_fds=True,
2538                                shell=False, cwd="/", env=env)
2539       output = ""
2540       try:
2541         output = child.stdout.read(4096)
2542         child.stdout.close()
2543       except EnvironmentError, err:
2544         output += "Hook script error: %s" % str(err)
2545
2546       while True:
2547         try:
2548           result = child.wait()
2549           break
2550         except EnvironmentError, err:
2551           if err.errno == errno.EINTR:
2552             continue
2553           raise
2554     finally:
2555       # try not to leak fds
2556       for fd in (fdstdin, ):
2557         if fd is not None:
2558           try:
2559             fd.close()
2560           except EnvironmentError, err:
2561             # just log the error
2562             #logging.exception("Error while closing fd %s", fd)
2563             pass
2564
2565     return result == 0, utils.SafeEncode(output.strip())
2566
2567   def RunHooks(self, hpath, phase, env):
2568     """Run the scripts in the hooks directory.
2569
2570     @type hpath: str
2571     @param hpath: the path to the hooks directory which
2572         holds the scripts
2573     @type phase: str
2574     @param phase: either L{constants.HOOKS_PHASE_PRE} or
2575         L{constants.HOOKS_PHASE_POST}
2576     @type env: dict
2577     @param env: dictionary with the environment for the hook
2578     @rtype: list
2579     @return: list of 3-element tuples:
2580       - script path
2581       - script result, either L{constants.HKR_SUCCESS} or
2582         L{constants.HKR_FAIL}
2583       - output of the script
2584
2585     @raise errors.ProgrammerError: for invalid input
2586         parameters
2587
2588     """
2589     if phase == constants.HOOKS_PHASE_PRE:
2590       suffix = "pre"
2591     elif phase == constants.HOOKS_PHASE_POST:
2592       suffix = "post"
2593     else:
2594       _Fail("Unknown hooks phase '%s'", phase)
2595
2596     rr = []
2597
2598     subdir = "%s-%s.d" % (hpath, suffix)
2599     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2600     try:
2601       dir_contents = utils.ListVisibleFiles(dir_name)
2602     except OSError:
2603       # FIXME: must log output in case of failures
2604       return rr
2605
2606     # we use the standard python sort order,
2607     # so 00name is the recommended naming scheme
2608     dir_contents.sort()
2609     for relname in dir_contents:
2610       fname = os.path.join(dir_name, relname)
2611       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2612           self.RE_MASK.match(relname) is not None):
2613         rrval = constants.HKR_SKIP
2614         output = ""
2615       else:
2616         result, output = self.ExecHook(fname, env)
2617         if not result:
2618           rrval = constants.HKR_FAIL
2619         else:
2620           rrval = constants.HKR_SUCCESS
2621       rr.append(("%s/%s" % (subdir, relname), rrval, output))
2622
2623     return rr
2624
2625
2626 class IAllocatorRunner(object):
2627   """IAllocator runner.
2628
2629   This class is instantiated on the node side (ganeti-noded) and not on
2630   the master side.
2631
2632   """
2633   def Run(self, name, idata):
2634     """Run an iallocator script.
2635
2636     @type name: str
2637     @param name: the iallocator script name
2638     @type idata: str
2639     @param idata: the allocator input data
2640
2641     @rtype: tuple
2642     @return: two element tuple of:
2643        - status
2644        - either error message or stdout of allocator (for success)
2645
2646     """
2647     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2648                                   os.path.isfile)
2649     if alloc_script is None:
2650       _Fail("iallocator module '%s' not found in the search path", name)
2651
2652     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2653     try:
2654       os.write(fd, idata)
2655       os.close(fd)
2656       result = utils.RunCmd([alloc_script, fin_name])
2657       if result.failed:
2658         _Fail("iallocator module '%s' failed: %s, output '%s'",
2659               name, result.fail_reason, result.output)
2660     finally:
2661       os.unlink(fin_name)
2662
2663     return result.stdout
2664
2665
2666 class DevCacheManager(object):
2667   """Simple class for managing a cache of block device information.
2668
2669   """
2670   _DEV_PREFIX = "/dev/"
2671   _ROOT_DIR = constants.BDEV_CACHE_DIR
2672
2673   @classmethod
2674   def _ConvertPath(cls, dev_path):
2675     """Converts a /dev/name path to the cache file name.
2676
2677     This replaces slashes with underscores and strips the /dev
2678     prefix. It then returns the full path to the cache file.
2679
2680     @type dev_path: str
2681     @param dev_path: the C{/dev/} path name
2682     @rtype: str
2683     @return: the converted path name
2684
2685     """
2686     if dev_path.startswith(cls._DEV_PREFIX):
2687       dev_path = dev_path[len(cls._DEV_PREFIX):]
2688     dev_path = dev_path.replace("/", "_")
2689     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2690     return fpath
2691
2692   @classmethod
2693   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2694     """Updates the cache information for a given device.
2695
2696     @type dev_path: str
2697     @param dev_path: the pathname of the device
2698     @type owner: str
2699     @param owner: the owner (instance name) of the device
2700     @type on_primary: bool
2701     @param on_primary: whether this is the primary
2702         node nor not
2703     @type iv_name: str
2704     @param iv_name: the instance-visible name of the
2705         device, as in objects.Disk.iv_name
2706
2707     @rtype: None
2708
2709     """
2710     if dev_path is None:
2711       logging.error("DevCacheManager.UpdateCache got a None dev_path")
2712       return
2713     fpath = cls._ConvertPath(dev_path)
2714     if on_primary:
2715       state = "primary"
2716     else:
2717       state = "secondary"
2718     if iv_name is None:
2719       iv_name = "not_visible"
2720     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2721     try:
2722       utils.WriteFile(fpath, data=fdata)
2723     except EnvironmentError, err:
2724       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2725
2726   @classmethod
2727   def RemoveCache(cls, dev_path):
2728     """Remove data for a dev_path.
2729
2730     This is just a wrapper over L{utils.RemoveFile} with a converted
2731     path name and logging.
2732
2733     @type dev_path: str
2734     @param dev_path: the pathname of the device
2735
2736     @rtype: None
2737
2738     """
2739     if dev_path is None:
2740       logging.error("DevCacheManager.RemoveCache got a None dev_path")
2741       return
2742     fpath = cls._ConvertPath(dev_path)
2743     try:
2744       utils.RemoveFile(fpath)
2745     except EnvironmentError, err:
2746       logging.exception("Can't update bdev cache for %s: %s", dev_path, err)