Fix cli.PollJob
[ganeti-local] / lib / backend.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Functions used by the node daemon"""
23
24
25 import os
26 import os.path
27 import shutil
28 import time
29 import stat
30 import errno
31 import re
32 import subprocess
33 import random
34 import logging
35 import tempfile
36
37 from ganeti import errors
38 from ganeti import utils
39 from ganeti import ssh
40 from ganeti import hypervisor
41 from ganeti import constants
42 from ganeti import bdev
43 from ganeti import objects
44 from ganeti import ssconf
45
46
47 def _GetSshRunner():
48   return ssh.SshRunner()
49
50
51 def _GetMasterInfo():
52   """Return the master ip and netdev.
53
54   """
55   try:
56     ss = ssconf.SimpleStore()
57     master_netdev = ss.GetMasterNetdev()
58     master_ip = ss.GetMasterIP()
59   except errors.ConfigurationError, err:
60     logging.exception("Cluster configuration incomplete")
61     return (None, None)
62   return (master_netdev, master_ip)
63
64
65 def StartMaster(start_daemons):
66   """Activate local node as master node.
67
68   The function will always try activate the IP address of the master
69   (if someone else has it, then it won't). Then, if the start_daemons
70   parameter is True, it will also start the master daemons
71   (ganet-masterd and ganeti-rapi).
72
73   """
74   ok = True
75   master_netdev, master_ip = _GetMasterInfo()
76   if not master_netdev:
77     return False
78
79   if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
80     if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
81                      source=constants.LOCALHOST_IP_ADDRESS):
82       # we already have the ip:
83       logging.debug("Already started")
84     else:
85       logging.error("Someone else has the master ip, not activating")
86       ok = False
87   else:
88     result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
89                            "dev", master_netdev, "label",
90                            "%s:0" % master_netdev])
91     if result.failed:
92       logging.error("Can't activate master IP: %s", result.output)
93       ok = False
94
95     result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
96                            "-s", master_ip, master_ip])
97     # we'll ignore the exit code of arping
98
99   # and now start the master and rapi daemons
100   if start_daemons:
101     for daemon in 'ganeti-masterd', 'ganeti-rapi':
102       result = utils.RunCmd([daemon])
103       if result.failed:
104         logging.error("Can't start daemon %s: %s", daemon, result.output)
105         ok = False
106   return ok
107
108
109 def StopMaster(stop_daemons):
110   """Deactivate this node as master.
111
112   The function will always try to deactivate the IP address of the
113   master. Then, if the stop_daemons parameter is True, it will also
114   stop the master daemons (ganet-masterd and ganeti-rapi).
115
116   """
117   master_netdev, master_ip = _GetMasterInfo()
118   if not master_netdev:
119     return False
120
121   result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
122                          "dev", master_netdev])
123   if result.failed:
124     logging.error("Can't remove the master IP, error: %s", result.output)
125     # but otherwise ignore the failure
126
127   if stop_daemons:
128     # stop/kill the rapi and the master daemon
129     for daemon in constants.RAPI_PID, constants.MASTERD_PID:
130       utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
131
132   return True
133
134
135 def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
136   """Joins this node to the cluster.
137
138   This does the following:
139       - updates the hostkeys of the machine (rsa and dsa)
140       - adds the ssh private key to the user
141       - adds the ssh public key to the users' authorized_keys file
142
143   """
144   sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
145                 (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
146                 (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
147                 (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
148   for name, content, mode in sshd_keys:
149     utils.WriteFile(name, data=content, mode=mode)
150
151   try:
152     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
153                                                     mkdir=True)
154   except errors.OpExecError, err:
155     logging.exception("Error while processing user ssh files")
156     return False
157
158   for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
159     utils.WriteFile(name, data=content, mode=0600)
160
161   utils.AddAuthorizedKey(auth_keys, sshpub)
162
163   utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
164
165   return True
166
167
168 def LeaveCluster():
169   """Cleans up the current node and prepares it to be removed from the cluster.
170
171   """
172   def _CleanDirectory(path):
173     if not os.path.isdir(path):
174       return
175     for rel_name in utils.ListVisibleFiles(path):
176       full_name = os.path.join(path, rel_name)
177       if os.path.isfile(full_name) and not os.path.islink(full_name):
178         utils.RemoveFile(full_name)
179
180   _CleanDirectory(constants.DATA_DIR)
181
182   # Remove job queue files and archived jobs
183   _CleanDirectory(constants.QUEUE_DIR)
184   _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
185
186   try:
187     priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
188   except errors.OpExecError:
189     logging.exception("Error while processing ssh files")
190     return
191
192   f = open(pub_key, 'r')
193   try:
194     utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
195   finally:
196     f.close()
197
198   utils.RemoveFile(priv_key)
199   utils.RemoveFile(pub_key)
200
201   # Return a reassuring string to the caller, and quit
202   raise errors.QuitGanetiException(False, 'Shutdown scheduled')
203
204
205 def GetNodeInfo(vgname):
206   """Gives back a hash with different informations about the node.
207
208   Returns:
209     { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
210       'memory_free' : xxx, 'memory_total' : xxx }
211     where
212     vg_size is the size of the configured volume group in MiB
213     vg_free is the free size of the volume group in MiB
214     memory_dom0 is the memory allocated for domain0 in MiB
215     memory_free is the currently available (free) ram in MiB
216     memory_total is the total number of ram in MiB
217
218   """
219   outputarray = {}
220   vginfo = _GetVGInfo(vgname)
221   outputarray['vg_size'] = vginfo['vg_size']
222   outputarray['vg_free'] = vginfo['vg_free']
223
224   hyper = hypervisor.GetHypervisor()
225   hyp_info = hyper.GetNodeInfo()
226   if hyp_info is not None:
227     outputarray.update(hyp_info)
228
229   f = open("/proc/sys/kernel/random/boot_id", 'r')
230   try:
231     outputarray["bootid"] = f.read(128).rstrip("\n")
232   finally:
233     f.close()
234
235   return outputarray
236
237
238 def VerifyNode(what):
239   """Verify the status of the local node.
240
241   Args:
242     what - a dictionary of things to check:
243       'filelist' : list of files for which to compute checksums
244       'nodelist' : list of nodes we should check communication with
245       'hypervisor': run the hypervisor-specific verify
246
247   Requested files on local node are checksummed and the result returned.
248
249   The nodelist is traversed, with the following checks being made
250   for each node:
251   - known_hosts key correct
252   - correct resolving of node name (target node returns its own hostname
253     by ssh-execution of 'hostname', result compared against name in list.
254
255   """
256   result = {}
257
258   if 'hypervisor' in what:
259     result['hypervisor'] = hypervisor.GetHypervisor().Verify()
260
261   if 'filelist' in what:
262     result['filelist'] = utils.FingerprintFiles(what['filelist'])
263
264   if 'nodelist' in what:
265     result['nodelist'] = {}
266     random.shuffle(what['nodelist'])
267     for node in what['nodelist']:
268       success, message = _GetSshRunner().VerifyNodeHostname(node)
269       if not success:
270         result['nodelist'][node] = message
271   if 'node-net-test' in what:
272     result['node-net-test'] = {}
273     my_name = utils.HostInfo().name
274     my_pip = my_sip = None
275     for name, pip, sip in what['node-net-test']:
276       if name == my_name:
277         my_pip = pip
278         my_sip = sip
279         break
280     if not my_pip:
281       result['node-net-test'][my_name] = ("Can't find my own"
282                                           " primary/secondary IP"
283                                           " in the node list")
284     else:
285       port = ssconf.SimpleStore().GetNodeDaemonPort()
286       for name, pip, sip in what['node-net-test']:
287         fail = []
288         if not utils.TcpPing(pip, port, source=my_pip):
289           fail.append("primary")
290         if sip != pip:
291           if not utils.TcpPing(sip, port, source=my_sip):
292             fail.append("secondary")
293         if fail:
294           result['node-net-test'][name] = ("failure using the %s"
295                                            " interface(s)" %
296                                            " and ".join(fail))
297
298   return result
299
300
301 def GetVolumeList(vg_name):
302   """Compute list of logical volumes and their size.
303
304   Returns:
305     dictionary of all partions (key) with their size (in MiB), inactive
306     and online status:
307     {'test1': ('20.06', True, True)}
308
309   """
310   lvs = {}
311   sep = '|'
312   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
313                          "--separator=%s" % sep,
314                          "-olv_name,lv_size,lv_attr", vg_name])
315   if result.failed:
316     logging.error("Failed to list logical volumes, lvs output: %s",
317                   result.output)
318     return result.output
319
320   valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
321   for line in result.stdout.splitlines():
322     line = line.strip()
323     match = valid_line_re.match(line)
324     if not match:
325       logging.error("Invalid line returned from lvs output: '%s'", line)
326       continue
327     name, size, attr = match.groups()
328     inactive = attr[4] == '-'
329     online = attr[5] == 'o'
330     lvs[name] = (size, inactive, online)
331
332   return lvs
333
334
335 def ListVolumeGroups():
336   """List the volume groups and their size.
337
338   Returns:
339     Dictionary with keys volume name and values the size of the volume
340
341   """
342   return utils.ListVolumeGroups()
343
344
345 def NodeVolumes():
346   """List all volumes on this node.
347
348   """
349   result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
350                          "--separator=|",
351                          "--options=lv_name,lv_size,devices,vg_name"])
352   if result.failed:
353     logging.error("Failed to list logical volumes, lvs output: %s",
354                   result.output)
355     return {}
356
357   def parse_dev(dev):
358     if '(' in dev:
359       return dev.split('(')[0]
360     else:
361       return dev
362
363   def map_line(line):
364     return {
365       'name': line[0].strip(),
366       'size': line[1].strip(),
367       'dev': parse_dev(line[2].strip()),
368       'vg': line[3].strip(),
369     }
370
371   return [map_line(line.split('|')) for line in result.stdout.splitlines()
372           if line.count('|') >= 3]
373
374
375 def BridgesExist(bridges_list):
376   """Check if a list of bridges exist on the current node.
377
378   Returns:
379     True if all of them exist, false otherwise
380
381   """
382   for bridge in bridges_list:
383     if not utils.BridgeExists(bridge):
384       return False
385
386   return True
387
388
389 def GetInstanceList():
390   """Provides a list of instances.
391
392   Returns:
393     A list of all running instances on the current node
394     - instance1.example.com
395     - instance2.example.com
396
397   """
398   try:
399     names = hypervisor.GetHypervisor().ListInstances()
400   except errors.HypervisorError, err:
401     logging.exception("Error enumerating instances")
402     raise
403
404   return names
405
406
407 def GetInstanceInfo(instance):
408   """Gives back the informations about an instance as a dictionary.
409
410   Args:
411     instance: name of the instance (ex. instance1.example.com)
412
413   Returns:
414     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
415     where
416     memory: memory size of instance (int)
417     state: xen state of instance (string)
418     time: cpu time of instance (float)
419
420   """
421   output = {}
422
423   iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
424   if iinfo is not None:
425     output['memory'] = iinfo[2]
426     output['state'] = iinfo[4]
427     output['time'] = iinfo[5]
428
429   return output
430
431
432 def GetAllInstancesInfo():
433   """Gather data about all instances.
434
435   This is the equivalent of `GetInstanceInfo()`, except that it
436   computes data for all instances at once, thus being faster if one
437   needs data about more than one instance.
438
439   Returns: a dictionary of dictionaries, keys being the instance name,
440     and with values:
441     { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
442     where
443     memory: memory size of instance (int)
444     state: xen state of instance (string)
445     time: cpu time of instance (float)
446     vcpus: the number of cpus
447
448   """
449   output = {}
450
451   iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
452   if iinfo:
453     for name, inst_id, memory, vcpus, state, times in iinfo:
454       output[name] = {
455         'memory': memory,
456         'vcpus': vcpus,
457         'state': state,
458         'time': times,
459         }
460
461   return output
462
463
464 def AddOSToInstance(instance, os_disk, swap_disk):
465   """Add an OS to an instance.
466
467   Args:
468     instance: the instance object
469     os_disk: the instance-visible name of the os device
470     swap_disk: the instance-visible name of the swap device
471
472   """
473   inst_os = OSFromDisk(instance.os)
474
475   create_script = inst_os.create_script
476
477   os_device = instance.FindDisk(os_disk)
478   if os_device is None:
479     logging.error("Can't find this device-visible name '%s'", os_disk)
480     return False
481
482   swap_device = instance.FindDisk(swap_disk)
483   if swap_device is None:
484     logging.error("Can't find this device-visible name '%s'", swap_disk)
485     return False
486
487   real_os_dev = _RecursiveFindBD(os_device)
488   if real_os_dev is None:
489     raise errors.BlockDeviceError("Block device '%s' is not set up" %
490                                   str(os_device))
491   real_os_dev.Open()
492
493   real_swap_dev = _RecursiveFindBD(swap_device)
494   if real_swap_dev is None:
495     raise errors.BlockDeviceError("Block device '%s' is not set up" %
496                                   str(swap_device))
497   real_swap_dev.Open()
498
499   logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
500                                      instance.name, int(time.time()))
501   if not os.path.exists(constants.LOG_OS_DIR):
502     os.mkdir(constants.LOG_OS_DIR, 0750)
503
504   command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
505                                 inst_os.path, create_script, instance.name,
506                                 real_os_dev.dev_path, real_swap_dev.dev_path,
507                                 logfile)
508
509   result = utils.RunCmd(command)
510   if result.failed:
511     logging.error("os create command '%s' returned error: %s, logfile: %s,"
512                   " output: %s", command, result.fail_reason, logfile,
513                   result.output)
514     return False
515
516   return True
517
518
519 def RunRenameInstance(instance, old_name, os_disk, swap_disk):
520   """Run the OS rename script for an instance.
521
522   Args:
523     instance: the instance object
524     old_name: the old name of the instance
525     os_disk: the instance-visible name of the os device
526     swap_disk: the instance-visible name of the swap device
527
528   """
529   inst_os = OSFromDisk(instance.os)
530
531   script = inst_os.rename_script
532
533   os_device = instance.FindDisk(os_disk)
534   if os_device is None:
535     logging.error("Can't find this device-visible name '%s'", os_disk)
536     return False
537
538   swap_device = instance.FindDisk(swap_disk)
539   if swap_device is None:
540     logging.error("Can't find this device-visible name '%s'", swap_disk)
541     return False
542
543   real_os_dev = _RecursiveFindBD(os_device)
544   if real_os_dev is None:
545     raise errors.BlockDeviceError("Block device '%s' is not set up" %
546                                   str(os_device))
547   real_os_dev.Open()
548
549   real_swap_dev = _RecursiveFindBD(swap_device)
550   if real_swap_dev is None:
551     raise errors.BlockDeviceError("Block device '%s' is not set up" %
552                                   str(swap_device))
553   real_swap_dev.Open()
554
555   logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
556                                            old_name,
557                                            instance.name, int(time.time()))
558   if not os.path.exists(constants.LOG_OS_DIR):
559     os.mkdir(constants.LOG_OS_DIR, 0750)
560
561   command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
562                                 inst_os.path, script, old_name, instance.name,
563                                 real_os_dev.dev_path, real_swap_dev.dev_path,
564                                 logfile)
565
566   result = utils.RunCmd(command)
567
568   if result.failed:
569     logging.error("os create command '%s' returned error: %s output: %s",
570                   command, result.fail_reason, result.output)
571     return False
572
573   return True
574
575
576 def _GetVGInfo(vg_name):
577   """Get informations about the volume group.
578
579   Args:
580     vg_name: the volume group
581
582   Returns:
583     { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
584     where
585     vg_size is the total size of the volume group in MiB
586     vg_free is the free size of the volume group in MiB
587     pv_count are the number of physical disks in that vg
588
589   If an error occurs during gathering of data, we return the same dict
590   with keys all set to None.
591
592   """
593   retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
594
595   retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
596                          "--nosuffix", "--units=m", "--separator=:", vg_name])
597
598   if retval.failed:
599     logging.error("volume group %s not present", vg_name)
600     return retdic
601   valarr = retval.stdout.strip().rstrip(':').split(':')
602   if len(valarr) == 3:
603     try:
604       retdic = {
605         "vg_size": int(round(float(valarr[0]), 0)),
606         "vg_free": int(round(float(valarr[1]), 0)),
607         "pv_count": int(valarr[2]),
608         }
609     except ValueError, err:
610       logging.exception("Fail to parse vgs output")
611   else:
612     logging.error("vgs output has the wrong number of fields (expected"
613                   " three): %s", str(valarr))
614   return retdic
615
616
617 def _GatherBlockDevs(instance):
618   """Set up an instance's block device(s).
619
620   This is run on the primary node at instance startup. The block
621   devices must be already assembled.
622
623   """
624   block_devices = []
625   for disk in instance.disks:
626     device = _RecursiveFindBD(disk)
627     if device is None:
628       raise errors.BlockDeviceError("Block device '%s' is not set up." %
629                                     str(disk))
630     device.Open()
631     block_devices.append((disk, device))
632   return block_devices
633
634
635 def StartInstance(instance, extra_args):
636   """Start an instance.
637
638   Args:
639     instance - name of instance to start.
640
641   """
642   running_instances = GetInstanceList()
643
644   if instance.name in running_instances:
645     return True
646
647   block_devices = _GatherBlockDevs(instance)
648   hyper = hypervisor.GetHypervisor()
649
650   try:
651     hyper.StartInstance(instance, block_devices, extra_args)
652   except errors.HypervisorError, err:
653     logging.exception("Failed to start instance")
654     return False
655
656   return True
657
658
659 def ShutdownInstance(instance):
660   """Shut an instance down.
661
662   Args:
663     instance - name of instance to shutdown.
664
665   """
666   running_instances = GetInstanceList()
667
668   if instance.name not in running_instances:
669     return True
670
671   hyper = hypervisor.GetHypervisor()
672   try:
673     hyper.StopInstance(instance)
674   except errors.HypervisorError, err:
675     logging.error("Failed to stop instance")
676     return False
677
678   # test every 10secs for 2min
679   shutdown_ok = False
680
681   time.sleep(1)
682   for dummy in range(11):
683     if instance.name not in GetInstanceList():
684       break
685     time.sleep(10)
686   else:
687     # the shutdown did not succeed
688     logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
689
690     try:
691       hyper.StopInstance(instance, force=True)
692     except errors.HypervisorError, err:
693       logging.exception("Failed to stop instance")
694       return False
695
696     time.sleep(1)
697     if instance.name in GetInstanceList():
698       logging.error("could not shutdown instance '%s' even by destroy",
699                     instance.name)
700       return False
701
702   return True
703
704
705 def RebootInstance(instance, reboot_type, extra_args):
706   """Reboot an instance.
707
708   Args:
709     instance    - name of instance to reboot
710     reboot_type - how to reboot [soft,hard,full]
711
712   """
713   running_instances = GetInstanceList()
714
715   if instance.name not in running_instances:
716     logging.error("Cannot reboot instance that is not running")
717     return False
718
719   hyper = hypervisor.GetHypervisor()
720   if reboot_type == constants.INSTANCE_REBOOT_SOFT:
721     try:
722       hyper.RebootInstance(instance)
723     except errors.HypervisorError, err:
724       logging.exception("Failed to soft reboot instance")
725       return False
726   elif reboot_type == constants.INSTANCE_REBOOT_HARD:
727     try:
728       ShutdownInstance(instance)
729       StartInstance(instance, extra_args)
730     except errors.HypervisorError, err:
731       logging.exception("Failed to hard reboot instance")
732       return False
733   else:
734     raise errors.ParameterError("reboot_type invalid")
735
736
737   return True
738
739
740 def MigrateInstance(instance, target, live):
741   """Migrates an instance to another node.
742
743   """
744   hyper = hypervisor.GetHypervisor()
745
746   try:
747     hyper.MigrateInstance(instance, target, live)
748   except errors.HypervisorError, err:
749     msg = "Failed to migrate instance: %s" % str(err)
750     logging.error(msg)
751     return (False, msg)
752   return (True, "Migration successfull")
753
754
755 def CreateBlockDevice(disk, size, owner, on_primary, info):
756   """Creates a block device for an instance.
757
758   Args:
759    disk: a ganeti.objects.Disk object
760    size: the size of the physical underlying device
761    owner: a string with the name of the instance
762    on_primary: a boolean indicating if it is the primary node or not
763    info: string that will be sent to the physical device creation
764
765   Returns:
766     the new unique_id of the device (this can sometime be
767     computed only after creation), or None. On secondary nodes,
768     it's not required to return anything.
769
770   """
771   clist = []
772   if disk.children:
773     for child in disk.children:
774       crdev = _RecursiveAssembleBD(child, owner, on_primary)
775       if on_primary or disk.AssembleOnSecondary():
776         # we need the children open in case the device itself has to
777         # be assembled
778         crdev.Open()
779       clist.append(crdev)
780   try:
781     device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
782     if device is not None:
783       logging.info("removing existing device %s", disk)
784       device.Remove()
785   except errors.BlockDeviceError, err:
786     pass
787
788   device = bdev.Create(disk.dev_type, disk.physical_id,
789                        clist, size)
790   if device is None:
791     raise ValueError("Can't create child device for %s, %s" %
792                      (disk, size))
793   if on_primary or disk.AssembleOnSecondary():
794     if not device.Assemble():
795       errorstring = "Can't assemble device after creation"
796       logging.error(errorstring)
797       raise errors.BlockDeviceError("%s, very unusual event - check the node"
798                                     " daemon logs" % errorstring)
799     device.SetSyncSpeed(constants.SYNC_SPEED)
800     if on_primary or disk.OpenOnSecondary():
801       device.Open(force=True)
802     DevCacheManager.UpdateCache(device.dev_path, owner,
803                                 on_primary, disk.iv_name)
804
805   device.SetInfo(info)
806
807   physical_id = device.unique_id
808   return physical_id
809
810
811 def RemoveBlockDevice(disk):
812   """Remove a block device.
813
814   This is intended to be called recursively.
815
816   """
817   try:
818     # since we are removing the device, allow a partial match
819     # this allows removal of broken mirrors
820     rdev = _RecursiveFindBD(disk, allow_partial=True)
821   except errors.BlockDeviceError, err:
822     # probably can't attach
823     logging.info("Can't attach to device %s in remove", disk)
824     rdev = None
825   if rdev is not None:
826     r_path = rdev.dev_path
827     result = rdev.Remove()
828     if result:
829       DevCacheManager.RemoveCache(r_path)
830   else:
831     result = True
832   if disk.children:
833     for child in disk.children:
834       result = result and RemoveBlockDevice(child)
835   return result
836
837
838 def _RecursiveAssembleBD(disk, owner, as_primary):
839   """Activate a block device for an instance.
840
841   This is run on the primary and secondary nodes for an instance.
842
843   This function is called recursively.
844
845   Args:
846     disk: a objects.Disk object
847     as_primary: if we should make the block device read/write
848
849   Returns:
850     the assembled device or None (in case no device was assembled)
851
852   If the assembly is not successful, an exception is raised.
853
854   """
855   children = []
856   if disk.children:
857     mcn = disk.ChildrenNeeded()
858     if mcn == -1:
859       mcn = 0 # max number of Nones allowed
860     else:
861       mcn = len(disk.children) - mcn # max number of Nones
862     for chld_disk in disk.children:
863       try:
864         cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
865       except errors.BlockDeviceError, err:
866         if children.count(None) >= mcn:
867           raise
868         cdev = None
869         logging.debug("Error in child activation: %s", str(err))
870       children.append(cdev)
871
872   if as_primary or disk.AssembleOnSecondary():
873     r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
874     r_dev.SetSyncSpeed(constants.SYNC_SPEED)
875     result = r_dev
876     if as_primary or disk.OpenOnSecondary():
877       r_dev.Open()
878     DevCacheManager.UpdateCache(r_dev.dev_path, owner,
879                                 as_primary, disk.iv_name)
880
881   else:
882     result = True
883   return result
884
885
886 def AssembleBlockDevice(disk, owner, as_primary):
887   """Activate a block device for an instance.
888
889   This is a wrapper over _RecursiveAssembleBD.
890
891   Returns:
892     a /dev path for primary nodes
893     True for secondary nodes
894
895   """
896   result = _RecursiveAssembleBD(disk, owner, as_primary)
897   if isinstance(result, bdev.BlockDev):
898     result = result.dev_path
899   return result
900
901
902 def ShutdownBlockDevice(disk):
903   """Shut down a block device.
904
905   First, if the device is assembled (can `Attach()`), then the device
906   is shutdown. Then the children of the device are shutdown.
907
908   This function is called recursively. Note that we don't cache the
909   children or such, as oppossed to assemble, shutdown of different
910   devices doesn't require that the upper device was active.
911
912   """
913   r_dev = _RecursiveFindBD(disk)
914   if r_dev is not None:
915     r_path = r_dev.dev_path
916     result = r_dev.Shutdown()
917     if result:
918       DevCacheManager.RemoveCache(r_path)
919   else:
920     result = True
921   if disk.children:
922     for child in disk.children:
923       result = result and ShutdownBlockDevice(child)
924   return result
925
926
927 def MirrorAddChildren(parent_cdev, new_cdevs):
928   """Extend a mirrored block device.
929
930   """
931   parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
932   if parent_bdev is None:
933     logging.error("Can't find parent device")
934     return False
935   new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
936   if new_bdevs.count(None) > 0:
937     logging.error("Can't find new device(s) to add: %s:%s",
938                   new_bdevs, new_cdevs)
939     return False
940   parent_bdev.AddChildren(new_bdevs)
941   return True
942
943
944 def MirrorRemoveChildren(parent_cdev, new_cdevs):
945   """Shrink a mirrored block device.
946
947   """
948   parent_bdev = _RecursiveFindBD(parent_cdev)
949   if parent_bdev is None:
950     logging.error("Can't find parent in remove children: %s", parent_cdev)
951     return False
952   devs = []
953   for disk in new_cdevs:
954     rpath = disk.StaticDevPath()
955     if rpath is None:
956       bd = _RecursiveFindBD(disk)
957       if bd is None:
958         logging.error("Can't find dynamic device %s while removing children",
959                       disk)
960         return False
961       else:
962         devs.append(bd.dev_path)
963     else:
964       devs.append(rpath)
965   parent_bdev.RemoveChildren(devs)
966   return True
967
968
969 def GetMirrorStatus(disks):
970   """Get the mirroring status of a list of devices.
971
972   Args:
973     disks: list of `objects.Disk`
974
975   Returns:
976     list of (mirror_done, estimated_time) tuples, which
977     are the result of bdev.BlockDevice.CombinedSyncStatus()
978
979   """
980   stats = []
981   for dsk in disks:
982     rbd = _RecursiveFindBD(dsk)
983     if rbd is None:
984       raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
985     stats.append(rbd.CombinedSyncStatus())
986   return stats
987
988
989 def _RecursiveFindBD(disk, allow_partial=False):
990   """Check if a device is activated.
991
992   If so, return informations about the real device.
993
994   Args:
995     disk: the objects.Disk instance
996     allow_partial: don't abort the find if a child of the
997                    device can't be found; this is intended to be
998                    used when repairing mirrors
999
1000   Returns:
1001     None if the device can't be found
1002     otherwise the device instance
1003
1004   """
1005   children = []
1006   if disk.children:
1007     for chdisk in disk.children:
1008       children.append(_RecursiveFindBD(chdisk))
1009
1010   return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1011
1012
1013 def FindBlockDevice(disk):
1014   """Check if a device is activated.
1015
1016   If so, return informations about the real device.
1017
1018   Args:
1019     disk: the objects.Disk instance
1020   Returns:
1021     None if the device can't be found
1022     (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1023
1024   """
1025   rbd = _RecursiveFindBD(disk)
1026   if rbd is None:
1027     return rbd
1028   return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1029
1030 def _IsJobQueueFile(file_name):
1031   queue_dir = os.path.normpath(constants.QUEUE_DIR)
1032   return os.path.commonprefix([queue_dir, file_name]) == queue_dir
1033
1034 def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1035   """Write a file to the filesystem.
1036
1037   This allows the master to overwrite(!) a file. It will only perform
1038   the operation if the file belongs to a list of configuration files.
1039
1040   """
1041   if not os.path.isabs(file_name):
1042     logging.error("Filename passed to UploadFile is not absolute: '%s'",
1043                   file_name)
1044     return False
1045
1046   allowed_files = [
1047     constants.CLUSTER_CONF_FILE,
1048     constants.ETC_HOSTS,
1049     constants.SSH_KNOWN_HOSTS_FILE,
1050     constants.VNC_PASSWORD_FILE,
1051     ]
1052   allowed_files.extend(ssconf.SimpleStore().GetFileList())
1053
1054   if not (file_name in allowed_files or _IsJobQueueFile(file_name)):
1055     logging.error("Filename passed to UploadFile not in allowed"
1056                  " upload targets: '%s'", file_name)
1057     return False
1058
1059   utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1060                   atime=atime, mtime=mtime)
1061   return True
1062
1063
1064 def _ErrnoOrStr(err):
1065   """Format an EnvironmentError exception.
1066
1067   If the `err` argument has an errno attribute, it will be looked up
1068   and converted into a textual EXXXX description. Otherwise the string
1069   representation of the error will be returned.
1070
1071   """
1072   if hasattr(err, 'errno'):
1073     detail = errno.errorcode[err.errno]
1074   else:
1075     detail = str(err)
1076   return detail
1077
1078
1079 def _OSOndiskVersion(name, os_dir):
1080   """Compute and return the API version of a given OS.
1081
1082   This function will try to read the API version of the os given by
1083   the 'name' parameter and residing in the 'os_dir' directory.
1084
1085   Return value will be either an integer denoting the version or None in the
1086   case when this is not a valid OS name.
1087
1088   """
1089   api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1090
1091   try:
1092     st = os.stat(api_file)
1093   except EnvironmentError, err:
1094     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1095                            " found (%s)" % _ErrnoOrStr(err))
1096
1097   if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1098     raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1099                            " a regular file")
1100
1101   try:
1102     f = open(api_file)
1103     try:
1104       api_version = f.read(256)
1105     finally:
1106       f.close()
1107   except EnvironmentError, err:
1108     raise errors.InvalidOS(name, os_dir, "error while reading the"
1109                            " API version (%s)" % _ErrnoOrStr(err))
1110
1111   api_version = api_version.strip()
1112   try:
1113     api_version = int(api_version)
1114   except (TypeError, ValueError), err:
1115     raise errors.InvalidOS(name, os_dir,
1116                            "API version is not integer (%s)" % str(err))
1117
1118   return api_version
1119
1120
1121 def DiagnoseOS(top_dirs=None):
1122   """Compute the validity for all OSes.
1123
1124   Returns an OS object for each name in all the given top directories
1125   (if not given defaults to constants.OS_SEARCH_PATH)
1126
1127   Returns:
1128     list of OS objects
1129
1130   """
1131   if top_dirs is None:
1132     top_dirs = constants.OS_SEARCH_PATH
1133
1134   result = []
1135   for dir_name in top_dirs:
1136     if os.path.isdir(dir_name):
1137       try:
1138         f_names = utils.ListVisibleFiles(dir_name)
1139       except EnvironmentError, err:
1140         logging.exception("Can't list the OS directory %s", dir_name)
1141         break
1142       for name in f_names:
1143         try:
1144           os_inst = OSFromDisk(name, base_dir=dir_name)
1145           result.append(os_inst)
1146         except errors.InvalidOS, err:
1147           result.append(objects.OS.FromInvalidOS(err))
1148
1149   return result
1150
1151
1152 def OSFromDisk(name, base_dir=None):
1153   """Create an OS instance from disk.
1154
1155   This function will return an OS instance if the given name is a
1156   valid OS name. Otherwise, it will raise an appropriate
1157   `errors.InvalidOS` exception, detailing why this is not a valid
1158   OS.
1159
1160   Args:
1161     os_dir: Directory containing the OS scripts. Defaults to a search
1162             in all the OS_SEARCH_PATH directories.
1163
1164   """
1165
1166   if base_dir is None:
1167     os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1168     if os_dir is None:
1169       raise errors.InvalidOS(name, None, "OS dir not found in search path")
1170   else:
1171     os_dir = os.path.sep.join([base_dir, name])
1172
1173   api_version = _OSOndiskVersion(name, os_dir)
1174
1175   if api_version != constants.OS_API_VERSION:
1176     raise errors.InvalidOS(name, os_dir, "API version mismatch"
1177                            " (found %s want %s)"
1178                            % (api_version, constants.OS_API_VERSION))
1179
1180   # OS Scripts dictionary, we will populate it with the actual script names
1181   os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1182
1183   for script in os_scripts:
1184     os_scripts[script] = os.path.sep.join([os_dir, script])
1185
1186     try:
1187       st = os.stat(os_scripts[script])
1188     except EnvironmentError, err:
1189       raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1190                              (script, _ErrnoOrStr(err)))
1191
1192     if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1193       raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1194                              script)
1195
1196     if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1197       raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1198                              script)
1199
1200
1201   return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1202                     create_script=os_scripts['create'],
1203                     export_script=os_scripts['export'],
1204                     import_script=os_scripts['import'],
1205                     rename_script=os_scripts['rename'],
1206                     api_version=api_version)
1207
1208
1209 def GrowBlockDevice(disk, amount):
1210   """Grow a stack of block devices.
1211
1212   This function is called recursively, with the childrens being the
1213   first one resize.
1214
1215   Args:
1216     disk: the disk to be grown
1217
1218   Returns: a tuple of (status, result), with:
1219     status: the result (true/false) of the operation
1220     result: the error message if the operation failed, otherwise not used
1221
1222   """
1223   r_dev = _RecursiveFindBD(disk)
1224   if r_dev is None:
1225     return False, "Cannot find block device %s" % (disk,)
1226
1227   try:
1228     r_dev.Grow(amount)
1229   except errors.BlockDeviceError, err:
1230     return False, str(err)
1231
1232   return True, None
1233
1234
1235 def SnapshotBlockDevice(disk):
1236   """Create a snapshot copy of a block device.
1237
1238   This function is called recursively, and the snapshot is actually created
1239   just for the leaf lvm backend device.
1240
1241   Args:
1242     disk: the disk to be snapshotted
1243
1244   Returns:
1245     a config entry for the actual lvm device snapshotted.
1246
1247   """
1248   if disk.children:
1249     if len(disk.children) == 1:
1250       # only one child, let's recurse on it
1251       return SnapshotBlockDevice(disk.children[0])
1252     else:
1253       # more than one child, choose one that matches
1254       for child in disk.children:
1255         if child.size == disk.size:
1256           # return implies breaking the loop
1257           return SnapshotBlockDevice(child)
1258   elif disk.dev_type == constants.LD_LV:
1259     r_dev = _RecursiveFindBD(disk)
1260     if r_dev is not None:
1261       # let's stay on the safe side and ask for the full size, for now
1262       return r_dev.Snapshot(disk.size)
1263     else:
1264       return None
1265   else:
1266     raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1267                                  " '%s' of type '%s'" %
1268                                  (disk.unique_id, disk.dev_type))
1269
1270
1271 def ExportSnapshot(disk, dest_node, instance):
1272   """Export a block device snapshot to a remote node.
1273
1274   Args:
1275     disk: the snapshot block device
1276     dest_node: the node to send the image to
1277     instance: instance being exported
1278
1279   Returns:
1280     True if successful, False otherwise.
1281
1282   """
1283   inst_os = OSFromDisk(instance.os)
1284   export_script = inst_os.export_script
1285
1286   logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1287                                      instance.name, int(time.time()))
1288   if not os.path.exists(constants.LOG_OS_DIR):
1289     os.mkdir(constants.LOG_OS_DIR, 0750)
1290
1291   real_os_dev = _RecursiveFindBD(disk)
1292   if real_os_dev is None:
1293     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1294                                   str(disk))
1295   real_os_dev.Open()
1296
1297   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1298   destfile = disk.physical_id[1]
1299
1300   # the target command is built out of three individual commands,
1301   # which are joined by pipes; we check each individual command for
1302   # valid parameters
1303
1304   expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1305                                export_script, instance.name,
1306                                real_os_dev.dev_path, logfile)
1307
1308   comprcmd = "gzip"
1309
1310   destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1311                                 destdir, destdir, destfile)
1312   remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1313                                        destcmd)
1314
1315   # all commands have been checked, so we're safe to combine them
1316   command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1317
1318   result = utils.RunCmd(command)
1319
1320   if result.failed:
1321     logging.error("os snapshot export command '%s' returned error: %s"
1322                   " output: %s", command, result.fail_reason, result.output)
1323     return False
1324
1325   return True
1326
1327
1328 def FinalizeExport(instance, snap_disks):
1329   """Write out the export configuration information.
1330
1331   Args:
1332     instance: instance configuration
1333     snap_disks: snapshot block devices
1334
1335   Returns:
1336     False in case of error, True otherwise.
1337
1338   """
1339   destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1340   finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1341
1342   config = objects.SerializableConfigParser()
1343
1344   config.add_section(constants.INISECT_EXP)
1345   config.set(constants.INISECT_EXP, 'version', '0')
1346   config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1347   config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1348   config.set(constants.INISECT_EXP, 'os', instance.os)
1349   config.set(constants.INISECT_EXP, 'compression', 'gzip')
1350
1351   config.add_section(constants.INISECT_INS)
1352   config.set(constants.INISECT_INS, 'name', instance.name)
1353   config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1354   config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1355   config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1356
1357   nic_count = 0
1358   for nic_count, nic in enumerate(instance.nics):
1359     config.set(constants.INISECT_INS, 'nic%d_mac' %
1360                nic_count, '%s' % nic.mac)
1361     config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1362     config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1363                '%s' % nic.bridge)
1364   # TODO: redundant: on load can read nics until it doesn't exist
1365   config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1366
1367   disk_count = 0
1368   for disk_count, disk in enumerate(snap_disks):
1369     config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1370                ('%s' % disk.iv_name))
1371     config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1372                ('%s' % disk.physical_id[1]))
1373     config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1374                ('%d' % disk.size))
1375   config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1376
1377   cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1378   cfo = open(cff, 'w')
1379   try:
1380     config.write(cfo)
1381   finally:
1382     cfo.close()
1383
1384   shutil.rmtree(finaldestdir, True)
1385   shutil.move(destdir, finaldestdir)
1386
1387   return True
1388
1389
1390 def ExportInfo(dest):
1391   """Get export configuration information.
1392
1393   Args:
1394     dest: directory containing the export
1395
1396   Returns:
1397     A serializable config file containing the export info.
1398
1399   """
1400   cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1401
1402   config = objects.SerializableConfigParser()
1403   config.read(cff)
1404
1405   if (not config.has_section(constants.INISECT_EXP) or
1406       not config.has_section(constants.INISECT_INS)):
1407     return None
1408
1409   return config
1410
1411
1412 def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1413   """Import an os image into an instance.
1414
1415   Args:
1416     instance: the instance object
1417     os_disk: the instance-visible name of the os device
1418     swap_disk: the instance-visible name of the swap device
1419     src_node: node holding the source image
1420     src_image: path to the source image on src_node
1421
1422   Returns:
1423     False in case of error, True otherwise.
1424
1425   """
1426   inst_os = OSFromDisk(instance.os)
1427   import_script = inst_os.import_script
1428
1429   os_device = instance.FindDisk(os_disk)
1430   if os_device is None:
1431     logging.error("Can't find this device-visible name '%s'", os_disk)
1432     return False
1433
1434   swap_device = instance.FindDisk(swap_disk)
1435   if swap_device is None:
1436     logging.error("Can't find this device-visible name '%s'", swap_disk)
1437     return False
1438
1439   real_os_dev = _RecursiveFindBD(os_device)
1440   if real_os_dev is None:
1441     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1442                                   str(os_device))
1443   real_os_dev.Open()
1444
1445   real_swap_dev = _RecursiveFindBD(swap_device)
1446   if real_swap_dev is None:
1447     raise errors.BlockDeviceError("Block device '%s' is not set up" %
1448                                   str(swap_device))
1449   real_swap_dev.Open()
1450
1451   logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1452                                         instance.name, int(time.time()))
1453   if not os.path.exists(constants.LOG_OS_DIR):
1454     os.mkdir(constants.LOG_OS_DIR, 0750)
1455
1456   destcmd = utils.BuildShellCmd('cat %s', src_image)
1457   remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1458                                        destcmd)
1459
1460   comprcmd = "gunzip"
1461   impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1462                                inst_os.path, import_script, instance.name,
1463                                real_os_dev.dev_path, real_swap_dev.dev_path,
1464                                logfile)
1465
1466   command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1467
1468   result = utils.RunCmd(command)
1469
1470   if result.failed:
1471     logging.error("os import command '%s' returned error: %s"
1472                   " output: %s", command, result.fail_reason, result.output)
1473     return False
1474
1475   return True
1476
1477
1478 def ListExports():
1479   """Return a list of exports currently available on this machine.
1480
1481   """
1482   if os.path.isdir(constants.EXPORT_DIR):
1483     return utils.ListVisibleFiles(constants.EXPORT_DIR)
1484   else:
1485     return []
1486
1487
1488 def RemoveExport(export):
1489   """Remove an existing export from the node.
1490
1491   Args:
1492     export: the name of the export to remove
1493
1494   Returns:
1495     False in case of error, True otherwise.
1496
1497   """
1498   target = os.path.join(constants.EXPORT_DIR, export)
1499
1500   shutil.rmtree(target)
1501   # TODO: catch some of the relevant exceptions and provide a pretty
1502   # error message if rmtree fails.
1503
1504   return True
1505
1506
1507 def RenameBlockDevices(devlist):
1508   """Rename a list of block devices.
1509
1510   The devlist argument is a list of tuples (disk, new_logical,
1511   new_physical). The return value will be a combined boolean result
1512   (True only if all renames succeeded).
1513
1514   """
1515   result = True
1516   for disk, unique_id in devlist:
1517     dev = _RecursiveFindBD(disk)
1518     if dev is None:
1519       result = False
1520       continue
1521     try:
1522       old_rpath = dev.dev_path
1523       dev.Rename(unique_id)
1524       new_rpath = dev.dev_path
1525       if old_rpath != new_rpath:
1526         DevCacheManager.RemoveCache(old_rpath)
1527         # FIXME: we should add the new cache information here, like:
1528         # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1529         # but we don't have the owner here - maybe parse from existing
1530         # cache? for now, we only lose lvm data when we rename, which
1531         # is less critical than DRBD or MD
1532     except errors.BlockDeviceError, err:
1533       logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1534       result = False
1535   return result
1536
1537
1538 def _TransformFileStorageDir(file_storage_dir):
1539   """Checks whether given file_storage_dir is valid.
1540
1541   Checks wheter the given file_storage_dir is within the cluster-wide
1542   default file_storage_dir stored in SimpleStore. Only paths under that
1543   directory are allowed.
1544
1545   Args:
1546     file_storage_dir: string with path
1547
1548   Returns:
1549     normalized file_storage_dir (string) if valid, None otherwise
1550
1551   """
1552   file_storage_dir = os.path.normpath(file_storage_dir)
1553   base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1554   if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1555       base_file_storage_dir):
1556     logging.error("file storage directory '%s' is not under base file"
1557                   " storage directory '%s'",
1558                   file_storage_dir, base_file_storage_dir)
1559     return None
1560   return file_storage_dir
1561
1562
1563 def CreateFileStorageDir(file_storage_dir):
1564   """Create file storage directory.
1565
1566   Args:
1567     file_storage_dir: string containing the path
1568
1569   Returns:
1570     tuple with first element a boolean indicating wheter dir
1571     creation was successful or not
1572
1573   """
1574   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1575   result = True,
1576   if not file_storage_dir:
1577     result = False,
1578   else:
1579     if os.path.exists(file_storage_dir):
1580       if not os.path.isdir(file_storage_dir):
1581         logging.error("'%s' is not a directory", file_storage_dir)
1582         result = False,
1583     else:
1584       try:
1585         os.makedirs(file_storage_dir, 0750)
1586       except OSError, err:
1587         logging.error("Cannot create file storage directory '%s': %s",
1588                       file_storage_dir, err)
1589         result = False,
1590   return result
1591
1592
1593 def RemoveFileStorageDir(file_storage_dir):
1594   """Remove file storage directory.
1595
1596   Remove it only if it's empty. If not log an error and return.
1597
1598   Args:
1599     file_storage_dir: string containing the path
1600
1601   Returns:
1602     tuple with first element a boolean indicating wheter dir
1603     removal was successful or not
1604
1605   """
1606   file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1607   result = True,
1608   if not file_storage_dir:
1609     result = False,
1610   else:
1611     if os.path.exists(file_storage_dir):
1612       if not os.path.isdir(file_storage_dir):
1613         logging.error("'%s' is not a directory", file_storage_dir)
1614         result = False,
1615       # deletes dir only if empty, otherwise we want to return False
1616       try:
1617         os.rmdir(file_storage_dir)
1618       except OSError, err:
1619         logging.exception("Cannot remove file storage directory '%s'",
1620                           file_storage_dir)
1621         result = False,
1622   return result
1623
1624
1625 def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1626   """Rename the file storage directory.
1627
1628   Args:
1629     old_file_storage_dir: string containing the old path
1630     new_file_storage_dir: string containing the new path
1631
1632   Returns:
1633     tuple with first element a boolean indicating wheter dir
1634     rename was successful or not
1635
1636   """
1637   old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1638   new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1639   result = True,
1640   if not old_file_storage_dir or not new_file_storage_dir:
1641     result = False,
1642   else:
1643     if not os.path.exists(new_file_storage_dir):
1644       if os.path.isdir(old_file_storage_dir):
1645         try:
1646           os.rename(old_file_storage_dir, new_file_storage_dir)
1647         except OSError, err:
1648           logging.exception("Cannot rename '%s' to '%s'",
1649                             old_file_storage_dir, new_file_storage_dir)
1650           result =  False,
1651       else:
1652         logging.error("'%s' is not a directory", old_file_storage_dir)
1653         result = False,
1654     else:
1655       if os.path.exists(old_file_storage_dir):
1656         logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1657                       old_file_storage_dir, new_file_storage_dir)
1658         result = False,
1659   return result
1660
1661
1662 def CloseBlockDevices(disks):
1663   """Closes the given block devices.
1664
1665   This means they will be switched to secondary mode (in case of DRBD).
1666
1667   """
1668   bdevs = []
1669   for cf in disks:
1670     rd = _RecursiveFindBD(cf)
1671     if rd is None:
1672       return (False, "Can't find device %s" % cf)
1673     bdevs.append(rd)
1674
1675   msg = []
1676   for rd in bdevs:
1677     try:
1678       rd.Close()
1679     except errors.BlockDeviceError, err:
1680       msg.append(str(err))
1681   if msg:
1682     return (False, "Can't make devices secondary: %s" % ",".join(msg))
1683   else:
1684     return (True, "All devices secondary")
1685
1686
1687 class HooksRunner(object):
1688   """Hook runner.
1689
1690   This class is instantiated on the node side (ganeti-noded) and not on
1691   the master side.
1692
1693   """
1694   RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1695
1696   def __init__(self, hooks_base_dir=None):
1697     """Constructor for hooks runner.
1698
1699     Args:
1700       - hooks_base_dir: if not None, this overrides the
1701         constants.HOOKS_BASE_DIR (useful for unittests)
1702
1703     """
1704     if hooks_base_dir is None:
1705       hooks_base_dir = constants.HOOKS_BASE_DIR
1706     self._BASE_DIR = hooks_base_dir
1707
1708   @staticmethod
1709   def ExecHook(script, env):
1710     """Exec one hook script.
1711
1712     Args:
1713      - script: the full path to the script
1714      - env: the environment with which to exec the script
1715
1716     """
1717     # exec the process using subprocess and log the output
1718     fdstdin = None
1719     try:
1720       fdstdin = open("/dev/null", "r")
1721       child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1722                                stderr=subprocess.STDOUT, close_fds=True,
1723                                shell=False, cwd="/", env=env)
1724       output = ""
1725       try:
1726         output = child.stdout.read(4096)
1727         child.stdout.close()
1728       except EnvironmentError, err:
1729         output += "Hook script error: %s" % str(err)
1730
1731       while True:
1732         try:
1733           result = child.wait()
1734           break
1735         except EnvironmentError, err:
1736           if err.errno == errno.EINTR:
1737             continue
1738           raise
1739     finally:
1740       # try not to leak fds
1741       for fd in (fdstdin, ):
1742         if fd is not None:
1743           try:
1744             fd.close()
1745           except EnvironmentError, err:
1746             # just log the error
1747             #logging.exception("Error while closing fd %s", fd)
1748             pass
1749
1750     return result == 0, output
1751
1752   def RunHooks(self, hpath, phase, env):
1753     """Run the scripts in the hooks directory.
1754
1755     This method will not be usually overriden by child opcodes.
1756
1757     """
1758     if phase == constants.HOOKS_PHASE_PRE:
1759       suffix = "pre"
1760     elif phase == constants.HOOKS_PHASE_POST:
1761       suffix = "post"
1762     else:
1763       raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1764     rr = []
1765
1766     subdir = "%s-%s.d" % (hpath, suffix)
1767     dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1768     try:
1769       dir_contents = utils.ListVisibleFiles(dir_name)
1770     except OSError, err:
1771       # must log
1772       return rr
1773
1774     # we use the standard python sort order,
1775     # so 00name is the recommended naming scheme
1776     dir_contents.sort()
1777     for relname in dir_contents:
1778       fname = os.path.join(dir_name, relname)
1779       if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1780           self.RE_MASK.match(relname) is not None):
1781         rrval = constants.HKR_SKIP
1782         output = ""
1783       else:
1784         result, output = self.ExecHook(fname, env)
1785         if not result:
1786           rrval = constants.HKR_FAIL
1787         else:
1788           rrval = constants.HKR_SUCCESS
1789       rr.append(("%s/%s" % (subdir, relname), rrval, output))
1790
1791     return rr
1792
1793
1794 class IAllocatorRunner(object):
1795   """IAllocator runner.
1796
1797   This class is instantiated on the node side (ganeti-noded) and not on
1798   the master side.
1799
1800   """
1801   def Run(self, name, idata):
1802     """Run an iallocator script.
1803
1804     Return value: tuple of:
1805        - run status (one of the IARUN_ constants)
1806        - stdout
1807        - stderr
1808        - fail reason (as from utils.RunResult)
1809
1810     """
1811     alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1812                                   os.path.isfile)
1813     if alloc_script is None:
1814       return (constants.IARUN_NOTFOUND, None, None, None)
1815
1816     fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1817     try:
1818       os.write(fd, idata)
1819       os.close(fd)
1820       result = utils.RunCmd([alloc_script, fin_name])
1821       if result.failed:
1822         return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1823                 result.fail_reason)
1824     finally:
1825       os.unlink(fin_name)
1826
1827     return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1828
1829
1830 class DevCacheManager(object):
1831   """Simple class for managing a cache of block device information.
1832
1833   """
1834   _DEV_PREFIX = "/dev/"
1835   _ROOT_DIR = constants.BDEV_CACHE_DIR
1836
1837   @classmethod
1838   def _ConvertPath(cls, dev_path):
1839     """Converts a /dev/name path to the cache file name.
1840
1841     This replaces slashes with underscores and strips the /dev
1842     prefix. It then returns the full path to the cache file
1843
1844     """
1845     if dev_path.startswith(cls._DEV_PREFIX):
1846       dev_path = dev_path[len(cls._DEV_PREFIX):]
1847     dev_path = dev_path.replace("/", "_")
1848     fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1849     return fpath
1850
1851   @classmethod
1852   def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1853     """Updates the cache information for a given device.
1854
1855     """
1856     if dev_path is None:
1857       logging.error("DevCacheManager.UpdateCache got a None dev_path")
1858       return
1859     fpath = cls._ConvertPath(dev_path)
1860     if on_primary:
1861       state = "primary"
1862     else:
1863       state = "secondary"
1864     if iv_name is None:
1865       iv_name = "not_visible"
1866     fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1867     try:
1868       utils.WriteFile(fpath, data=fdata)
1869     except EnvironmentError, err:
1870       logging.exception("Can't update bdev cache for %s", dev_path)
1871
1872   @classmethod
1873   def RemoveCache(cls, dev_path):
1874     """Remove data for a dev_path.
1875
1876     """
1877     if dev_path is None:
1878       logging.error("DevCacheManager.RemoveCache got a None dev_path")
1879       return
1880     fpath = cls._ConvertPath(dev_path)
1881     try:
1882       utils.RemoveFile(fpath)
1883     except EnvironmentError, err:
1884       logging.exception("Can't update bdev cache for %s", dev_path)