Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 553f1c1d

History | View | Annotate | Download (54.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetSshRunner():
48
  return ssh.SshRunner()
49

    
50

    
51
def _CleanDirectory(path):
52
  if not os.path.isdir(path):
53
    return
54
  for rel_name in utils.ListVisibleFiles(path):
55
    full_name = os.path.join(path, rel_name)
56
    if os.path.isfile(full_name) and not os.path.islink(full_name):
57
      utils.RemoveFile(full_name)
58

    
59

    
60
def _GetMasterInfo():
61
  """Return the master ip and netdev.
62

63
  """
64
  try:
65
    ss = ssconf.SimpleStore()
66
    master_netdev = ss.GetMasterNetdev()
67
    master_ip = ss.GetMasterIP()
68
  except errors.ConfigurationError, err:
69
    logging.exception("Cluster configuration incomplete")
70
    return (None, None)
71
  return (master_netdev, master_ip)
72

    
73

    
74
def StartMaster(start_daemons):
75
  """Activate local node as master node.
76

77
  The function will always try activate the IP address of the master
78
  (if someone else has it, then it won't). Then, if the start_daemons
79
  parameter is True, it will also start the master daemons
80
  (ganet-masterd and ganeti-rapi).
81

82
  """
83
  ok = True
84
  master_netdev, master_ip = _GetMasterInfo()
85
  if not master_netdev:
86
    return False
87

    
88
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
89
    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
90
                     source=constants.LOCALHOST_IP_ADDRESS):
91
      # we already have the ip:
92
      logging.debug("Already started")
93
    else:
94
      logging.error("Someone else has the master ip, not activating")
95
      ok = False
96
  else:
97
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
98
                           "dev", master_netdev, "label",
99
                           "%s:0" % master_netdev])
100
    if result.failed:
101
      logging.error("Can't activate master IP: %s", result.output)
102
      ok = False
103

    
104
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
105
                           "-s", master_ip, master_ip])
106
    # we'll ignore the exit code of arping
107

    
108
  # and now start the master and rapi daemons
109
  if start_daemons:
110
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
111
      result = utils.RunCmd([daemon])
112
      if result.failed:
113
        logging.error("Can't start daemon %s: %s", daemon, result.output)
114
        ok = False
115
  return ok
116

    
117

    
118
def StopMaster(stop_daemons):
119
  """Deactivate this node as master.
120

121
  The function will always try to deactivate the IP address of the
122
  master. Then, if the stop_daemons parameter is True, it will also
123
  stop the master daemons (ganet-masterd and ganeti-rapi).
124

125
  """
126
  master_netdev, master_ip = _GetMasterInfo()
127
  if not master_netdev:
128
    return False
129

    
130
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
131
                         "dev", master_netdev])
132
  if result.failed:
133
    logging.error("Can't remove the master IP, error: %s", result.output)
134
    # but otherwise ignore the failure
135

    
136
  if stop_daemons:
137
    # stop/kill the rapi and the master daemon
138
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
139
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
140

    
141
  return True
142

    
143

    
144
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
145
  """Joins this node to the cluster.
146

147
  This does the following:
148
      - updates the hostkeys of the machine (rsa and dsa)
149
      - adds the ssh private key to the user
150
      - adds the ssh public key to the users' authorized_keys file
151

152
  """
153
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
154
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
155
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
156
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
157
  for name, content, mode in sshd_keys:
158
    utils.WriteFile(name, data=content, mode=mode)
159

    
160
  try:
161
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
162
                                                    mkdir=True)
163
  except errors.OpExecError, err:
164
    logging.exception("Error while processing user ssh files")
165
    return False
166

    
167
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
168
    utils.WriteFile(name, data=content, mode=0600)
169

    
170
  utils.AddAuthorizedKey(auth_keys, sshpub)
171

    
172
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
173

    
174
  return True
175

    
176

    
177
def LeaveCluster():
178
  """Cleans up the current node and prepares it to be removed from the cluster.
179

180
  """
181
  _CleanDirectory(constants.DATA_DIR)
182

    
183
  JobQueuePurge()
184

    
185
  try:
186
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
187
  except errors.OpExecError:
188
    logging.exception("Error while processing ssh files")
189
    return
190

    
191
  f = open(pub_key, 'r')
192
  try:
193
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
194
  finally:
195
    f.close()
196

    
197
  utils.RemoveFile(priv_key)
198
  utils.RemoveFile(pub_key)
199

    
200
  # Return a reassuring string to the caller, and quit
201
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
202

    
203

    
204
def GetNodeInfo(vgname):
205
  """Gives back a hash with different informations about the node.
206

207
  Returns:
208
    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
209
      'memory_free' : xxx, 'memory_total' : xxx }
210
    where
211
    vg_size is the size of the configured volume group in MiB
212
    vg_free is the free size of the volume group in MiB
213
    memory_dom0 is the memory allocated for domain0 in MiB
214
    memory_free is the currently available (free) ram in MiB
215
    memory_total is the total number of ram in MiB
216

217
  """
218
  outputarray = {}
219
  vginfo = _GetVGInfo(vgname)
220
  outputarray['vg_size'] = vginfo['vg_size']
221
  outputarray['vg_free'] = vginfo['vg_free']
222

    
223
  hyper = hypervisor.GetHypervisor()
224
  hyp_info = hyper.GetNodeInfo()
225
  if hyp_info is not None:
226
    outputarray.update(hyp_info)
227

    
228
  f = open("/proc/sys/kernel/random/boot_id", 'r')
229
  try:
230
    outputarray["bootid"] = f.read(128).rstrip("\n")
231
  finally:
232
    f.close()
233

    
234
  return outputarray
235

    
236

    
237
def VerifyNode(what):
238
  """Verify the status of the local node.
239

240
  Args:
241
    what - a dictionary of things to check:
242
      'filelist' : list of files for which to compute checksums
243
      'nodelist' : list of nodes we should check communication with
244
      'hypervisor': run the hypervisor-specific verify
245

246
  Requested files on local node are checksummed and the result returned.
247

248
  The nodelist is traversed, with the following checks being made
249
  for each node:
250
  - known_hosts key correct
251
  - correct resolving of node name (target node returns its own hostname
252
    by ssh-execution of 'hostname', result compared against name in list.
253

254
  """
255
  result = {}
256

    
257
  if 'hypervisor' in what:
258
    result['hypervisor'] = hypervisor.GetHypervisor().Verify()
259

    
260
  if 'filelist' in what:
261
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
262

    
263
  if 'nodelist' in what:
264
    result['nodelist'] = {}
265
    random.shuffle(what['nodelist'])
266
    for node in what['nodelist']:
267
      success, message = _GetSshRunner().VerifyNodeHostname(node)
268
      if not success:
269
        result['nodelist'][node] = message
270
  if 'node-net-test' in what:
271
    result['node-net-test'] = {}
272
    my_name = utils.HostInfo().name
273
    my_pip = my_sip = None
274
    for name, pip, sip in what['node-net-test']:
275
      if name == my_name:
276
        my_pip = pip
277
        my_sip = sip
278
        break
279
    if not my_pip:
280
      result['node-net-test'][my_name] = ("Can't find my own"
281
                                          " primary/secondary IP"
282
                                          " in the node list")
283
    else:
284
      port = ssconf.SimpleStore().GetNodeDaemonPort()
285
      for name, pip, sip in what['node-net-test']:
286
        fail = []
287
        if not utils.TcpPing(pip, port, source=my_pip):
288
          fail.append("primary")
289
        if sip != pip:
290
          if not utils.TcpPing(sip, port, source=my_sip):
291
            fail.append("secondary")
292
        if fail:
293
          result['node-net-test'][name] = ("failure using the %s"
294
                                           " interface(s)" %
295
                                           " and ".join(fail))
296

    
297
  return result
298

    
299

    
300
def GetVolumeList(vg_name):
301
  """Compute list of logical volumes and their size.
302

303
  Returns:
304
    dictionary of all partions (key) with their size (in MiB), inactive
305
    and online status:
306
    {'test1': ('20.06', True, True)}
307

308
  """
309
  lvs = {}
310
  sep = '|'
311
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
312
                         "--separator=%s" % sep,
313
                         "-olv_name,lv_size,lv_attr", vg_name])
314
  if result.failed:
315
    logging.error("Failed to list logical volumes, lvs output: %s",
316
                  result.output)
317
    return result.output
318

    
319
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
320
  for line in result.stdout.splitlines():
321
    line = line.strip()
322
    match = valid_line_re.match(line)
323
    if not match:
324
      logging.error("Invalid line returned from lvs output: '%s'", line)
325
      continue
326
    name, size, attr = match.groups()
327
    inactive = attr[4] == '-'
328
    online = attr[5] == 'o'
329
    lvs[name] = (size, inactive, online)
330

    
331
  return lvs
332

    
333

    
334
def ListVolumeGroups():
335
  """List the volume groups and their size.
336

337
  Returns:
338
    Dictionary with keys volume name and values the size of the volume
339

340
  """
341
  return utils.ListVolumeGroups()
342

    
343

    
344
def NodeVolumes():
345
  """List all volumes on this node.
346

347
  """
348
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
349
                         "--separator=|",
350
                         "--options=lv_name,lv_size,devices,vg_name"])
351
  if result.failed:
352
    logging.error("Failed to list logical volumes, lvs output: %s",
353
                  result.output)
354
    return {}
355

    
356
  def parse_dev(dev):
357
    if '(' in dev:
358
      return dev.split('(')[0]
359
    else:
360
      return dev
361

    
362
  def map_line(line):
363
    return {
364
      'name': line[0].strip(),
365
      'size': line[1].strip(),
366
      'dev': parse_dev(line[2].strip()),
367
      'vg': line[3].strip(),
368
    }
369

    
370
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
371
          if line.count('|') >= 3]
372

    
373

    
374
def BridgesExist(bridges_list):
375
  """Check if a list of bridges exist on the current node.
376

377
  Returns:
378
    True if all of them exist, false otherwise
379

380
  """
381
  for bridge in bridges_list:
382
    if not utils.BridgeExists(bridge):
383
      return False
384

    
385
  return True
386

    
387

    
388
def GetInstanceList():
389
  """Provides a list of instances.
390

391
  Returns:
392
    A list of all running instances on the current node
393
    - instance1.example.com
394
    - instance2.example.com
395

396
  """
397
  try:
398
    names = hypervisor.GetHypervisor().ListInstances()
399
  except errors.HypervisorError, err:
400
    logging.exception("Error enumerating instances")
401
    raise
402

    
403
  return names
404

    
405

    
406
def GetInstanceInfo(instance):
407
  """Gives back the informations about an instance as a dictionary.
408

409
  Args:
410
    instance: name of the instance (ex. instance1.example.com)
411

412
  Returns:
413
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
414
    where
415
    memory: memory size of instance (int)
416
    state: xen state of instance (string)
417
    time: cpu time of instance (float)
418

419
  """
420
  output = {}
421

    
422
  iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
423
  if iinfo is not None:
424
    output['memory'] = iinfo[2]
425
    output['state'] = iinfo[4]
426
    output['time'] = iinfo[5]
427

    
428
  return output
429

    
430

    
431
def GetAllInstancesInfo():
432
  """Gather data about all instances.
433

434
  This is the equivalent of `GetInstanceInfo()`, except that it
435
  computes data for all instances at once, thus being faster if one
436
  needs data about more than one instance.
437

438
  Returns: a dictionary of dictionaries, keys being the instance name,
439
    and with values:
440
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
441
    where
442
    memory: memory size of instance (int)
443
    state: xen state of instance (string)
444
    time: cpu time of instance (float)
445
    vcpus: the number of cpus
446

447
  """
448
  output = {}
449

    
450
  iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
451
  if iinfo:
452
    for name, inst_id, memory, vcpus, state, times in iinfo:
453
      output[name] = {
454
        'memory': memory,
455
        'vcpus': vcpus,
456
        'state': state,
457
        'time': times,
458
        }
459

    
460
  return output
461

    
462

    
463
def AddOSToInstance(instance, os_disk, swap_disk):
464
  """Add an OS to an instance.
465

466
  Args:
467
    instance: the instance object
468
    os_disk: the instance-visible name of the os device
469
    swap_disk: the instance-visible name of the swap device
470

471
  """
472
  inst_os = OSFromDisk(instance.os)
473

    
474
  create_script = inst_os.create_script
475

    
476
  os_device = instance.FindDisk(os_disk)
477
  if os_device is None:
478
    logging.error("Can't find this device-visible name '%s'", os_disk)
479
    return False
480

    
481
  swap_device = instance.FindDisk(swap_disk)
482
  if swap_device is None:
483
    logging.error("Can't find this device-visible name '%s'", swap_disk)
484
    return False
485

    
486
  real_os_dev = _RecursiveFindBD(os_device)
487
  if real_os_dev is None:
488
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
489
                                  str(os_device))
490
  real_os_dev.Open()
491

    
492
  real_swap_dev = _RecursiveFindBD(swap_device)
493
  if real_swap_dev is None:
494
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
495
                                  str(swap_device))
496
  real_swap_dev.Open()
497

    
498
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
499
                                     instance.name, int(time.time()))
500
  if not os.path.exists(constants.LOG_OS_DIR):
501
    os.mkdir(constants.LOG_OS_DIR, 0750)
502

    
503
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
504
                                inst_os.path, create_script, instance.name,
505
                                real_os_dev.dev_path, real_swap_dev.dev_path,
506
                                logfile)
507

    
508
  result = utils.RunCmd(command)
509
  if result.failed:
510
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
511
                  " output: %s", command, result.fail_reason, logfile,
512
                  result.output)
513
    return False
514

    
515
  return True
516

    
517

    
518
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
519
  """Run the OS rename script for an instance.
520

521
  Args:
522
    instance: the instance object
523
    old_name: the old name of the instance
524
    os_disk: the instance-visible name of the os device
525
    swap_disk: the instance-visible name of the swap device
526

527
  """
528
  inst_os = OSFromDisk(instance.os)
529

    
530
  script = inst_os.rename_script
531

    
532
  os_device = instance.FindDisk(os_disk)
533
  if os_device is None:
534
    logging.error("Can't find this device-visible name '%s'", os_disk)
535
    return False
536

    
537
  swap_device = instance.FindDisk(swap_disk)
538
  if swap_device is None:
539
    logging.error("Can't find this device-visible name '%s'", swap_disk)
540
    return False
541

    
542
  real_os_dev = _RecursiveFindBD(os_device)
543
  if real_os_dev is None:
544
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
545
                                  str(os_device))
546
  real_os_dev.Open()
547

    
548
  real_swap_dev = _RecursiveFindBD(swap_device)
549
  if real_swap_dev is None:
550
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
551
                                  str(swap_device))
552
  real_swap_dev.Open()
553

    
554
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
555
                                           old_name,
556
                                           instance.name, int(time.time()))
557
  if not os.path.exists(constants.LOG_OS_DIR):
558
    os.mkdir(constants.LOG_OS_DIR, 0750)
559

    
560
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
561
                                inst_os.path, script, old_name, instance.name,
562
                                real_os_dev.dev_path, real_swap_dev.dev_path,
563
                                logfile)
564

    
565
  result = utils.RunCmd(command)
566

    
567
  if result.failed:
568
    logging.error("os create command '%s' returned error: %s output: %s",
569
                  command, result.fail_reason, result.output)
570
    return False
571

    
572
  return True
573

    
574

    
575
def _GetVGInfo(vg_name):
576
  """Get informations about the volume group.
577

578
  Args:
579
    vg_name: the volume group
580

581
  Returns:
582
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
583
    where
584
    vg_size is the total size of the volume group in MiB
585
    vg_free is the free size of the volume group in MiB
586
    pv_count are the number of physical disks in that vg
587

588
  If an error occurs during gathering of data, we return the same dict
589
  with keys all set to None.
590

591
  """
592
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
593

    
594
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
595
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
596

    
597
  if retval.failed:
598
    logging.error("volume group %s not present", vg_name)
599
    return retdic
600
  valarr = retval.stdout.strip().rstrip(':').split(':')
601
  if len(valarr) == 3:
602
    try:
603
      retdic = {
604
        "vg_size": int(round(float(valarr[0]), 0)),
605
        "vg_free": int(round(float(valarr[1]), 0)),
606
        "pv_count": int(valarr[2]),
607
        }
608
    except ValueError, err:
609
      logging.exception("Fail to parse vgs output")
610
  else:
611
    logging.error("vgs output has the wrong number of fields (expected"
612
                  " three): %s", str(valarr))
613
  return retdic
614

    
615

    
616
def _GatherBlockDevs(instance):
617
  """Set up an instance's block device(s).
618

619
  This is run on the primary node at instance startup. The block
620
  devices must be already assembled.
621

622
  """
623
  block_devices = []
624
  for disk in instance.disks:
625
    device = _RecursiveFindBD(disk)
626
    if device is None:
627
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
628
                                    str(disk))
629
    device.Open()
630
    block_devices.append((disk, device))
631
  return block_devices
632

    
633

    
634
def StartInstance(instance, extra_args):
635
  """Start an instance.
636

637
  Args:
638
    instance - name of instance to start.
639

640
  """
641
  running_instances = GetInstanceList()
642

    
643
  if instance.name in running_instances:
644
    return True
645

    
646
  block_devices = _GatherBlockDevs(instance)
647
  hyper = hypervisor.GetHypervisor()
648

    
649
  try:
650
    hyper.StartInstance(instance, block_devices, extra_args)
651
  except errors.HypervisorError, err:
652
    logging.exception("Failed to start instance")
653
    return False
654

    
655
  return True
656

    
657

    
658
def ShutdownInstance(instance):
659
  """Shut an instance down.
660

661
  Args:
662
    instance - name of instance to shutdown.
663

664
  """
665
  running_instances = GetInstanceList()
666

    
667
  if instance.name not in running_instances:
668
    return True
669

    
670
  hyper = hypervisor.GetHypervisor()
671
  try:
672
    hyper.StopInstance(instance)
673
  except errors.HypervisorError, err:
674
    logging.error("Failed to stop instance")
675
    return False
676

    
677
  # test every 10secs for 2min
678
  shutdown_ok = False
679

    
680
  time.sleep(1)
681
  for dummy in range(11):
682
    if instance.name not in GetInstanceList():
683
      break
684
    time.sleep(10)
685
  else:
686
    # the shutdown did not succeed
687
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
688

    
689
    try:
690
      hyper.StopInstance(instance, force=True)
691
    except errors.HypervisorError, err:
692
      logging.exception("Failed to stop instance")
693
      return False
694

    
695
    time.sleep(1)
696
    if instance.name in GetInstanceList():
697
      logging.error("could not shutdown instance '%s' even by destroy",
698
                    instance.name)
699
      return False
700

    
701
  return True
702

    
703

    
704
def RebootInstance(instance, reboot_type, extra_args):
705
  """Reboot an instance.
706

707
  Args:
708
    instance    - name of instance to reboot
709
    reboot_type - how to reboot [soft,hard,full]
710

711
  """
712
  running_instances = GetInstanceList()
713

    
714
  if instance.name not in running_instances:
715
    logging.error("Cannot reboot instance that is not running")
716
    return False
717

    
718
  hyper = hypervisor.GetHypervisor()
719
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
720
    try:
721
      hyper.RebootInstance(instance)
722
    except errors.HypervisorError, err:
723
      logging.exception("Failed to soft reboot instance")
724
      return False
725
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
726
    try:
727
      ShutdownInstance(instance)
728
      StartInstance(instance, extra_args)
729
    except errors.HypervisorError, err:
730
      logging.exception("Failed to hard reboot instance")
731
      return False
732
  else:
733
    raise errors.ParameterError("reboot_type invalid")
734

    
735

    
736
  return True
737

    
738

    
739
def MigrateInstance(instance, target, live):
740
  """Migrates an instance to another node.
741

742
  """
743
  hyper = hypervisor.GetHypervisor()
744

    
745
  try:
746
    hyper.MigrateInstance(instance, target, live)
747
  except errors.HypervisorError, err:
748
    msg = "Failed to migrate instance: %s" % str(err)
749
    logging.error(msg)
750
    return (False, msg)
751
  return (True, "Migration successfull")
752

    
753

    
754
def CreateBlockDevice(disk, size, owner, on_primary, info):
755
  """Creates a block device for an instance.
756

757
  Args:
758
   disk: a ganeti.objects.Disk object
759
   size: the size of the physical underlying device
760
   owner: a string with the name of the instance
761
   on_primary: a boolean indicating if it is the primary node or not
762
   info: string that will be sent to the physical device creation
763

764
  Returns:
765
    the new unique_id of the device (this can sometime be
766
    computed only after creation), or None. On secondary nodes,
767
    it's not required to return anything.
768

769
  """
770
  clist = []
771
  if disk.children:
772
    for child in disk.children:
773
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
774
      if on_primary or disk.AssembleOnSecondary():
775
        # we need the children open in case the device itself has to
776
        # be assembled
777
        crdev.Open()
778
      clist.append(crdev)
779
  try:
780
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
781
    if device is not None:
782
      logging.info("removing existing device %s", disk)
783
      device.Remove()
784
  except errors.BlockDeviceError, err:
785
    pass
786

    
787
  device = bdev.Create(disk.dev_type, disk.physical_id,
788
                       clist, size)
789
  if device is None:
790
    raise ValueError("Can't create child device for %s, %s" %
791
                     (disk, size))
792
  if on_primary or disk.AssembleOnSecondary():
793
    if not device.Assemble():
794
      errorstring = "Can't assemble device after creation"
795
      logging.error(errorstring)
796
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
797
                                    " daemon logs" % errorstring)
798
    device.SetSyncSpeed(constants.SYNC_SPEED)
799
    if on_primary or disk.OpenOnSecondary():
800
      device.Open(force=True)
801
    DevCacheManager.UpdateCache(device.dev_path, owner,
802
                                on_primary, disk.iv_name)
803

    
804
  device.SetInfo(info)
805

    
806
  physical_id = device.unique_id
807
  return physical_id
808

    
809

    
810
def RemoveBlockDevice(disk):
811
  """Remove a block device.
812

813
  This is intended to be called recursively.
814

815
  """
816
  try:
817
    # since we are removing the device, allow a partial match
818
    # this allows removal of broken mirrors
819
    rdev = _RecursiveFindBD(disk, allow_partial=True)
820
  except errors.BlockDeviceError, err:
821
    # probably can't attach
822
    logging.info("Can't attach to device %s in remove", disk)
823
    rdev = None
824
  if rdev is not None:
825
    r_path = rdev.dev_path
826
    result = rdev.Remove()
827
    if result:
828
      DevCacheManager.RemoveCache(r_path)
829
  else:
830
    result = True
831
  if disk.children:
832
    for child in disk.children:
833
      result = result and RemoveBlockDevice(child)
834
  return result
835

    
836

    
837
def _RecursiveAssembleBD(disk, owner, as_primary):
838
  """Activate a block device for an instance.
839

840
  This is run on the primary and secondary nodes for an instance.
841

842
  This function is called recursively.
843

844
  Args:
845
    disk: a objects.Disk object
846
    as_primary: if we should make the block device read/write
847

848
  Returns:
849
    the assembled device or None (in case no device was assembled)
850

851
  If the assembly is not successful, an exception is raised.
852

853
  """
854
  children = []
855
  if disk.children:
856
    mcn = disk.ChildrenNeeded()
857
    if mcn == -1:
858
      mcn = 0 # max number of Nones allowed
859
    else:
860
      mcn = len(disk.children) - mcn # max number of Nones
861
    for chld_disk in disk.children:
862
      try:
863
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
864
      except errors.BlockDeviceError, err:
865
        if children.count(None) >= mcn:
866
          raise
867
        cdev = None
868
        logging.debug("Error in child activation: %s", str(err))
869
      children.append(cdev)
870

    
871
  if as_primary or disk.AssembleOnSecondary():
872
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
873
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
874
    result = r_dev
875
    if as_primary or disk.OpenOnSecondary():
876
      r_dev.Open()
877
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
878
                                as_primary, disk.iv_name)
879

    
880
  else:
881
    result = True
882
  return result
883

    
884

    
885
def AssembleBlockDevice(disk, owner, as_primary):
886
  """Activate a block device for an instance.
887

888
  This is a wrapper over _RecursiveAssembleBD.
889

890
  Returns:
891
    a /dev path for primary nodes
892
    True for secondary nodes
893

894
  """
895
  result = _RecursiveAssembleBD(disk, owner, as_primary)
896
  if isinstance(result, bdev.BlockDev):
897
    result = result.dev_path
898
  return result
899

    
900

    
901
def ShutdownBlockDevice(disk):
902
  """Shut down a block device.
903

904
  First, if the device is assembled (can `Attach()`), then the device
905
  is shutdown. Then the children of the device are shutdown.
906

907
  This function is called recursively. Note that we don't cache the
908
  children or such, as oppossed to assemble, shutdown of different
909
  devices doesn't require that the upper device was active.
910

911
  """
912
  r_dev = _RecursiveFindBD(disk)
913
  if r_dev is not None:
914
    r_path = r_dev.dev_path
915
    result = r_dev.Shutdown()
916
    if result:
917
      DevCacheManager.RemoveCache(r_path)
918
  else:
919
    result = True
920
  if disk.children:
921
    for child in disk.children:
922
      result = result and ShutdownBlockDevice(child)
923
  return result
924

    
925

    
926
def MirrorAddChildren(parent_cdev, new_cdevs):
927
  """Extend a mirrored block device.
928

929
  """
930
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
931
  if parent_bdev is None:
932
    logging.error("Can't find parent device")
933
    return False
934
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
935
  if new_bdevs.count(None) > 0:
936
    logging.error("Can't find new device(s) to add: %s:%s",
937
                  new_bdevs, new_cdevs)
938
    return False
939
  parent_bdev.AddChildren(new_bdevs)
940
  return True
941

    
942

    
943
def MirrorRemoveChildren(parent_cdev, new_cdevs):
944
  """Shrink a mirrored block device.
945

946
  """
947
  parent_bdev = _RecursiveFindBD(parent_cdev)
948
  if parent_bdev is None:
949
    logging.error("Can't find parent in remove children: %s", parent_cdev)
950
    return False
951
  devs = []
952
  for disk in new_cdevs:
953
    rpath = disk.StaticDevPath()
954
    if rpath is None:
955
      bd = _RecursiveFindBD(disk)
956
      if bd is None:
957
        logging.error("Can't find dynamic device %s while removing children",
958
                      disk)
959
        return False
960
      else:
961
        devs.append(bd.dev_path)
962
    else:
963
      devs.append(rpath)
964
  parent_bdev.RemoveChildren(devs)
965
  return True
966

    
967

    
968
def GetMirrorStatus(disks):
969
  """Get the mirroring status of a list of devices.
970

971
  Args:
972
    disks: list of `objects.Disk`
973

974
  Returns:
975
    list of (mirror_done, estimated_time) tuples, which
976
    are the result of bdev.BlockDevice.CombinedSyncStatus()
977

978
  """
979
  stats = []
980
  for dsk in disks:
981
    rbd = _RecursiveFindBD(dsk)
982
    if rbd is None:
983
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
984
    stats.append(rbd.CombinedSyncStatus())
985
  return stats
986

    
987

    
988
def _RecursiveFindBD(disk, allow_partial=False):
989
  """Check if a device is activated.
990

991
  If so, return informations about the real device.
992

993
  Args:
994
    disk: the objects.Disk instance
995
    allow_partial: don't abort the find if a child of the
996
                   device can't be found; this is intended to be
997
                   used when repairing mirrors
998

999
  Returns:
1000
    None if the device can't be found
1001
    otherwise the device instance
1002

1003
  """
1004
  children = []
1005
  if disk.children:
1006
    for chdisk in disk.children:
1007
      children.append(_RecursiveFindBD(chdisk))
1008

    
1009
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1010

    
1011

    
1012
def FindBlockDevice(disk):
1013
  """Check if a device is activated.
1014

1015
  If so, return informations about the real device.
1016

1017
  Args:
1018
    disk: the objects.Disk instance
1019
  Returns:
1020
    None if the device can't be found
1021
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1022

1023
  """
1024
  rbd = _RecursiveFindBD(disk)
1025
  if rbd is None:
1026
    return rbd
1027
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1028

    
1029

    
1030
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1031
  """Write a file to the filesystem.
1032

1033
  This allows the master to overwrite(!) a file. It will only perform
1034
  the operation if the file belongs to a list of configuration files.
1035

1036
  """
1037
  if not os.path.isabs(file_name):
1038
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1039
                  file_name)
1040
    return False
1041

    
1042
  allowed_files = [
1043
    constants.CLUSTER_CONF_FILE,
1044
    constants.ETC_HOSTS,
1045
    constants.SSH_KNOWN_HOSTS_FILE,
1046
    constants.VNC_PASSWORD_FILE,
1047
    ]
1048
  allowed_files.extend(ssconf.SimpleStore().GetFileList())
1049

    
1050
  if file_name not in allowed_files:
1051
    logging.error("Filename passed to UploadFile not in allowed"
1052
                 " upload targets: '%s'", file_name)
1053
    return False
1054

    
1055
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1056
                  atime=atime, mtime=mtime)
1057
  return True
1058

    
1059

    
1060
def _ErrnoOrStr(err):
1061
  """Format an EnvironmentError exception.
1062

1063
  If the `err` argument has an errno attribute, it will be looked up
1064
  and converted into a textual EXXXX description. Otherwise the string
1065
  representation of the error will be returned.
1066

1067
  """
1068
  if hasattr(err, 'errno'):
1069
    detail = errno.errorcode[err.errno]
1070
  else:
1071
    detail = str(err)
1072
  return detail
1073

    
1074

    
1075
def _OSOndiskVersion(name, os_dir):
1076
  """Compute and return the API version of a given OS.
1077

1078
  This function will try to read the API version of the os given by
1079
  the 'name' parameter and residing in the 'os_dir' directory.
1080

1081
  Return value will be either an integer denoting the version or None in the
1082
  case when this is not a valid OS name.
1083

1084
  """
1085
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1086

    
1087
  try:
1088
    st = os.stat(api_file)
1089
  except EnvironmentError, err:
1090
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1091
                           " found (%s)" % _ErrnoOrStr(err))
1092

    
1093
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1094
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1095
                           " a regular file")
1096

    
1097
  try:
1098
    f = open(api_file)
1099
    try:
1100
      api_version = f.read(256)
1101
    finally:
1102
      f.close()
1103
  except EnvironmentError, err:
1104
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1105
                           " API version (%s)" % _ErrnoOrStr(err))
1106

    
1107
  api_version = api_version.strip()
1108
  try:
1109
    api_version = int(api_version)
1110
  except (TypeError, ValueError), err:
1111
    raise errors.InvalidOS(name, os_dir,
1112
                           "API version is not integer (%s)" % str(err))
1113

    
1114
  return api_version
1115

    
1116

    
1117
def DiagnoseOS(top_dirs=None):
1118
  """Compute the validity for all OSes.
1119

1120
  Returns an OS object for each name in all the given top directories
1121
  (if not given defaults to constants.OS_SEARCH_PATH)
1122

1123
  Returns:
1124
    list of OS objects
1125

1126
  """
1127
  if top_dirs is None:
1128
    top_dirs = constants.OS_SEARCH_PATH
1129

    
1130
  result = []
1131
  for dir_name in top_dirs:
1132
    if os.path.isdir(dir_name):
1133
      try:
1134
        f_names = utils.ListVisibleFiles(dir_name)
1135
      except EnvironmentError, err:
1136
        logging.exception("Can't list the OS directory %s", dir_name)
1137
        break
1138
      for name in f_names:
1139
        try:
1140
          os_inst = OSFromDisk(name, base_dir=dir_name)
1141
          result.append(os_inst)
1142
        except errors.InvalidOS, err:
1143
          result.append(objects.OS.FromInvalidOS(err))
1144

    
1145
  return result
1146

    
1147

    
1148
def OSFromDisk(name, base_dir=None):
1149
  """Create an OS instance from disk.
1150

1151
  This function will return an OS instance if the given name is a
1152
  valid OS name. Otherwise, it will raise an appropriate
1153
  `errors.InvalidOS` exception, detailing why this is not a valid
1154
  OS.
1155

1156
  Args:
1157
    os_dir: Directory containing the OS scripts. Defaults to a search
1158
            in all the OS_SEARCH_PATH directories.
1159

1160
  """
1161

    
1162
  if base_dir is None:
1163
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1164
    if os_dir is None:
1165
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1166
  else:
1167
    os_dir = os.path.sep.join([base_dir, name])
1168

    
1169
  api_version = _OSOndiskVersion(name, os_dir)
1170

    
1171
  if api_version != constants.OS_API_VERSION:
1172
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1173
                           " (found %s want %s)"
1174
                           % (api_version, constants.OS_API_VERSION))
1175

    
1176
  # OS Scripts dictionary, we will populate it with the actual script names
1177
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1178

    
1179
  for script in os_scripts:
1180
    os_scripts[script] = os.path.sep.join([os_dir, script])
1181

    
1182
    try:
1183
      st = os.stat(os_scripts[script])
1184
    except EnvironmentError, err:
1185
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1186
                             (script, _ErrnoOrStr(err)))
1187

    
1188
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1189
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1190
                             script)
1191

    
1192
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1193
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1194
                             script)
1195

    
1196

    
1197
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1198
                    create_script=os_scripts['create'],
1199
                    export_script=os_scripts['export'],
1200
                    import_script=os_scripts['import'],
1201
                    rename_script=os_scripts['rename'],
1202
                    api_version=api_version)
1203

    
1204

    
1205
def GrowBlockDevice(disk, amount):
1206
  """Grow a stack of block devices.
1207

1208
  This function is called recursively, with the childrens being the
1209
  first one resize.
1210

1211
  Args:
1212
    disk: the disk to be grown
1213

1214
  Returns: a tuple of (status, result), with:
1215
    status: the result (true/false) of the operation
1216
    result: the error message if the operation failed, otherwise not used
1217

1218
  """
1219
  r_dev = _RecursiveFindBD(disk)
1220
  if r_dev is None:
1221
    return False, "Cannot find block device %s" % (disk,)
1222

    
1223
  try:
1224
    r_dev.Grow(amount)
1225
  except errors.BlockDeviceError, err:
1226
    return False, str(err)
1227

    
1228
  return True, None
1229

    
1230

    
1231
def SnapshotBlockDevice(disk):
1232
  """Create a snapshot copy of a block device.
1233

1234
  This function is called recursively, and the snapshot is actually created
1235
  just for the leaf lvm backend device.
1236

1237
  Args:
1238
    disk: the disk to be snapshotted
1239

1240
  Returns:
1241
    a config entry for the actual lvm device snapshotted.
1242

1243
  """
1244
  if disk.children:
1245
    if len(disk.children) == 1:
1246
      # only one child, let's recurse on it
1247
      return SnapshotBlockDevice(disk.children[0])
1248
    else:
1249
      # more than one child, choose one that matches
1250
      for child in disk.children:
1251
        if child.size == disk.size:
1252
          # return implies breaking the loop
1253
          return SnapshotBlockDevice(child)
1254
  elif disk.dev_type == constants.LD_LV:
1255
    r_dev = _RecursiveFindBD(disk)
1256
    if r_dev is not None:
1257
      # let's stay on the safe side and ask for the full size, for now
1258
      return r_dev.Snapshot(disk.size)
1259
    else:
1260
      return None
1261
  else:
1262
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1263
                                 " '%s' of type '%s'" %
1264
                                 (disk.unique_id, disk.dev_type))
1265

    
1266

    
1267
def ExportSnapshot(disk, dest_node, instance):
1268
  """Export a block device snapshot to a remote node.
1269

1270
  Args:
1271
    disk: the snapshot block device
1272
    dest_node: the node to send the image to
1273
    instance: instance being exported
1274

1275
  Returns:
1276
    True if successful, False otherwise.
1277

1278
  """
1279
  inst_os = OSFromDisk(instance.os)
1280
  export_script = inst_os.export_script
1281

    
1282
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1283
                                     instance.name, int(time.time()))
1284
  if not os.path.exists(constants.LOG_OS_DIR):
1285
    os.mkdir(constants.LOG_OS_DIR, 0750)
1286

    
1287
  real_os_dev = _RecursiveFindBD(disk)
1288
  if real_os_dev is None:
1289
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1290
                                  str(disk))
1291
  real_os_dev.Open()
1292

    
1293
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1294
  destfile = disk.physical_id[1]
1295

    
1296
  # the target command is built out of three individual commands,
1297
  # which are joined by pipes; we check each individual command for
1298
  # valid parameters
1299

    
1300
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1301
                               export_script, instance.name,
1302
                               real_os_dev.dev_path, logfile)
1303

    
1304
  comprcmd = "gzip"
1305

    
1306
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1307
                                destdir, destdir, destfile)
1308
  remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1309
                                       destcmd)
1310

    
1311
  # all commands have been checked, so we're safe to combine them
1312
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1313

    
1314
  result = utils.RunCmd(command)
1315

    
1316
  if result.failed:
1317
    logging.error("os snapshot export command '%s' returned error: %s"
1318
                  " output: %s", command, result.fail_reason, result.output)
1319
    return False
1320

    
1321
  return True
1322

    
1323

    
1324
def FinalizeExport(instance, snap_disks):
1325
  """Write out the export configuration information.
1326

1327
  Args:
1328
    instance: instance configuration
1329
    snap_disks: snapshot block devices
1330

1331
  Returns:
1332
    False in case of error, True otherwise.
1333

1334
  """
1335
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1336
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1337

    
1338
  config = objects.SerializableConfigParser()
1339

    
1340
  config.add_section(constants.INISECT_EXP)
1341
  config.set(constants.INISECT_EXP, 'version', '0')
1342
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1343
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1344
  config.set(constants.INISECT_EXP, 'os', instance.os)
1345
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1346

    
1347
  config.add_section(constants.INISECT_INS)
1348
  config.set(constants.INISECT_INS, 'name', instance.name)
1349
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1350
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1351
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1352

    
1353
  nic_count = 0
1354
  for nic_count, nic in enumerate(instance.nics):
1355
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1356
               nic_count, '%s' % nic.mac)
1357
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1358
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1359
               '%s' % nic.bridge)
1360
  # TODO: redundant: on load can read nics until it doesn't exist
1361
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1362

    
1363
  disk_count = 0
1364
  for disk_count, disk in enumerate(snap_disks):
1365
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1366
               ('%s' % disk.iv_name))
1367
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1368
               ('%s' % disk.physical_id[1]))
1369
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1370
               ('%d' % disk.size))
1371
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1372

    
1373
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1374
  cfo = open(cff, 'w')
1375
  try:
1376
    config.write(cfo)
1377
  finally:
1378
    cfo.close()
1379

    
1380
  shutil.rmtree(finaldestdir, True)
1381
  shutil.move(destdir, finaldestdir)
1382

    
1383
  return True
1384

    
1385

    
1386
def ExportInfo(dest):
1387
  """Get export configuration information.
1388

1389
  Args:
1390
    dest: directory containing the export
1391

1392
  Returns:
1393
    A serializable config file containing the export info.
1394

1395
  """
1396
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1397

    
1398
  config = objects.SerializableConfigParser()
1399
  config.read(cff)
1400

    
1401
  if (not config.has_section(constants.INISECT_EXP) or
1402
      not config.has_section(constants.INISECT_INS)):
1403
    return None
1404

    
1405
  return config
1406

    
1407

    
1408
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1409
  """Import an os image into an instance.
1410

1411
  Args:
1412
    instance: the instance object
1413
    os_disk: the instance-visible name of the os device
1414
    swap_disk: the instance-visible name of the swap device
1415
    src_node: node holding the source image
1416
    src_image: path to the source image on src_node
1417

1418
  Returns:
1419
    False in case of error, True otherwise.
1420

1421
  """
1422
  inst_os = OSFromDisk(instance.os)
1423
  import_script = inst_os.import_script
1424

    
1425
  os_device = instance.FindDisk(os_disk)
1426
  if os_device is None:
1427
    logging.error("Can't find this device-visible name '%s'", os_disk)
1428
    return False
1429

    
1430
  swap_device = instance.FindDisk(swap_disk)
1431
  if swap_device is None:
1432
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1433
    return False
1434

    
1435
  real_os_dev = _RecursiveFindBD(os_device)
1436
  if real_os_dev is None:
1437
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1438
                                  str(os_device))
1439
  real_os_dev.Open()
1440

    
1441
  real_swap_dev = _RecursiveFindBD(swap_device)
1442
  if real_swap_dev is None:
1443
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1444
                                  str(swap_device))
1445
  real_swap_dev.Open()
1446

    
1447
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1448
                                        instance.name, int(time.time()))
1449
  if not os.path.exists(constants.LOG_OS_DIR):
1450
    os.mkdir(constants.LOG_OS_DIR, 0750)
1451

    
1452
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1453
  remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1454
                                       destcmd)
1455

    
1456
  comprcmd = "gunzip"
1457
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1458
                               inst_os.path, import_script, instance.name,
1459
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1460
                               logfile)
1461

    
1462
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1463

    
1464
  result = utils.RunCmd(command)
1465

    
1466
  if result.failed:
1467
    logging.error("os import command '%s' returned error: %s"
1468
                  " output: %s", command, result.fail_reason, result.output)
1469
    return False
1470

    
1471
  return True
1472

    
1473

    
1474
def ListExports():
1475
  """Return a list of exports currently available on this machine.
1476

1477
  """
1478
  if os.path.isdir(constants.EXPORT_DIR):
1479
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1480
  else:
1481
    return []
1482

    
1483

    
1484
def RemoveExport(export):
1485
  """Remove an existing export from the node.
1486

1487
  Args:
1488
    export: the name of the export to remove
1489

1490
  Returns:
1491
    False in case of error, True otherwise.
1492

1493
  """
1494
  target = os.path.join(constants.EXPORT_DIR, export)
1495

    
1496
  shutil.rmtree(target)
1497
  # TODO: catch some of the relevant exceptions and provide a pretty
1498
  # error message if rmtree fails.
1499

    
1500
  return True
1501

    
1502

    
1503
def RenameBlockDevices(devlist):
1504
  """Rename a list of block devices.
1505

1506
  The devlist argument is a list of tuples (disk, new_logical,
1507
  new_physical). The return value will be a combined boolean result
1508
  (True only if all renames succeeded).
1509

1510
  """
1511
  result = True
1512
  for disk, unique_id in devlist:
1513
    dev = _RecursiveFindBD(disk)
1514
    if dev is None:
1515
      result = False
1516
      continue
1517
    try:
1518
      old_rpath = dev.dev_path
1519
      dev.Rename(unique_id)
1520
      new_rpath = dev.dev_path
1521
      if old_rpath != new_rpath:
1522
        DevCacheManager.RemoveCache(old_rpath)
1523
        # FIXME: we should add the new cache information here, like:
1524
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1525
        # but we don't have the owner here - maybe parse from existing
1526
        # cache? for now, we only lose lvm data when we rename, which
1527
        # is less critical than DRBD or MD
1528
    except errors.BlockDeviceError, err:
1529
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1530
      result = False
1531
  return result
1532

    
1533

    
1534
def _TransformFileStorageDir(file_storage_dir):
1535
  """Checks whether given file_storage_dir is valid.
1536

1537
  Checks wheter the given file_storage_dir is within the cluster-wide
1538
  default file_storage_dir stored in SimpleStore. Only paths under that
1539
  directory are allowed.
1540

1541
  Args:
1542
    file_storage_dir: string with path
1543

1544
  Returns:
1545
    normalized file_storage_dir (string) if valid, None otherwise
1546

1547
  """
1548
  file_storage_dir = os.path.normpath(file_storage_dir)
1549
  base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1550
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1551
      base_file_storage_dir):
1552
    logging.error("file storage directory '%s' is not under base file"
1553
                  " storage directory '%s'",
1554
                  file_storage_dir, base_file_storage_dir)
1555
    return None
1556
  return file_storage_dir
1557

    
1558

    
1559
def CreateFileStorageDir(file_storage_dir):
1560
  """Create file storage directory.
1561

1562
  Args:
1563
    file_storage_dir: string containing the path
1564

1565
  Returns:
1566
    tuple with first element a boolean indicating wheter dir
1567
    creation was successful or not
1568

1569
  """
1570
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1571
  result = True,
1572
  if not file_storage_dir:
1573
    result = False,
1574
  else:
1575
    if os.path.exists(file_storage_dir):
1576
      if not os.path.isdir(file_storage_dir):
1577
        logging.error("'%s' is not a directory", file_storage_dir)
1578
        result = False,
1579
    else:
1580
      try:
1581
        os.makedirs(file_storage_dir, 0750)
1582
      except OSError, err:
1583
        logging.error("Cannot create file storage directory '%s': %s",
1584
                      file_storage_dir, err)
1585
        result = False,
1586
  return result
1587

    
1588

    
1589
def RemoveFileStorageDir(file_storage_dir):
1590
  """Remove file storage directory.
1591

1592
  Remove it only if it's empty. If not log an error and return.
1593

1594
  Args:
1595
    file_storage_dir: string containing the path
1596

1597
  Returns:
1598
    tuple with first element a boolean indicating wheter dir
1599
    removal was successful or not
1600

1601
  """
1602
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1603
  result = True,
1604
  if not file_storage_dir:
1605
    result = False,
1606
  else:
1607
    if os.path.exists(file_storage_dir):
1608
      if not os.path.isdir(file_storage_dir):
1609
        logging.error("'%s' is not a directory", file_storage_dir)
1610
        result = False,
1611
      # deletes dir only if empty, otherwise we want to return False
1612
      try:
1613
        os.rmdir(file_storage_dir)
1614
      except OSError, err:
1615
        logging.exception("Cannot remove file storage directory '%s'",
1616
                          file_storage_dir)
1617
        result = False,
1618
  return result
1619

    
1620

    
1621
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1622
  """Rename the file storage directory.
1623

1624
  Args:
1625
    old_file_storage_dir: string containing the old path
1626
    new_file_storage_dir: string containing the new path
1627

1628
  Returns:
1629
    tuple with first element a boolean indicating wheter dir
1630
    rename was successful or not
1631

1632
  """
1633
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1634
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1635
  result = True,
1636
  if not old_file_storage_dir or not new_file_storage_dir:
1637
    result = False,
1638
  else:
1639
    if not os.path.exists(new_file_storage_dir):
1640
      if os.path.isdir(old_file_storage_dir):
1641
        try:
1642
          os.rename(old_file_storage_dir, new_file_storage_dir)
1643
        except OSError, err:
1644
          logging.exception("Cannot rename '%s' to '%s'",
1645
                            old_file_storage_dir, new_file_storage_dir)
1646
          result =  False,
1647
      else:
1648
        logging.error("'%s' is not a directory", old_file_storage_dir)
1649
        result = False,
1650
    else:
1651
      if os.path.exists(old_file_storage_dir):
1652
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1653
                      old_file_storage_dir, new_file_storage_dir)
1654
        result = False,
1655
  return result
1656

    
1657

    
1658
def JobQueueUpdate(file_name, content):
1659
  """Updates a file in the queue directory.
1660

1661
  """
1662
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1663
  if os.path.commonprefix([queue_dir, file_name]) != queue_dir:
1664
    logging.error("'%s' is not a file in the queue directory",
1665
                  file_name)
1666
    return False
1667

    
1668
  # Write and replace the file atomically
1669
  utils.WriteFile(file_name, data=content)
1670

    
1671
  return True
1672

    
1673

    
1674
def JobQueuePurge():
1675
  """Removes job queue files and archived jobs
1676

1677
  """
1678
  _CleanDirectory(constants.QUEUE_DIR)
1679
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
1680

    
1681

    
1682
def CloseBlockDevices(disks):
1683
  """Closes the given block devices.
1684

1685
  This means they will be switched to secondary mode (in case of DRBD).
1686

1687
  """
1688
  bdevs = []
1689
  for cf in disks:
1690
    rd = _RecursiveFindBD(cf)
1691
    if rd is None:
1692
      return (False, "Can't find device %s" % cf)
1693
    bdevs.append(rd)
1694

    
1695
  msg = []
1696
  for rd in bdevs:
1697
    try:
1698
      rd.Close()
1699
    except errors.BlockDeviceError, err:
1700
      msg.append(str(err))
1701
  if msg:
1702
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1703
  else:
1704
    return (True, "All devices secondary")
1705

    
1706

    
1707
class HooksRunner(object):
1708
  """Hook runner.
1709

1710
  This class is instantiated on the node side (ganeti-noded) and not on
1711
  the master side.
1712

1713
  """
1714
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1715

    
1716
  def __init__(self, hooks_base_dir=None):
1717
    """Constructor for hooks runner.
1718

1719
    Args:
1720
      - hooks_base_dir: if not None, this overrides the
1721
        constants.HOOKS_BASE_DIR (useful for unittests)
1722

1723
    """
1724
    if hooks_base_dir is None:
1725
      hooks_base_dir = constants.HOOKS_BASE_DIR
1726
    self._BASE_DIR = hooks_base_dir
1727

    
1728
  @staticmethod
1729
  def ExecHook(script, env):
1730
    """Exec one hook script.
1731

1732
    Args:
1733
     - script: the full path to the script
1734
     - env: the environment with which to exec the script
1735

1736
    """
1737
    # exec the process using subprocess and log the output
1738
    fdstdin = None
1739
    try:
1740
      fdstdin = open("/dev/null", "r")
1741
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1742
                               stderr=subprocess.STDOUT, close_fds=True,
1743
                               shell=False, cwd="/", env=env)
1744
      output = ""
1745
      try:
1746
        output = child.stdout.read(4096)
1747
        child.stdout.close()
1748
      except EnvironmentError, err:
1749
        output += "Hook script error: %s" % str(err)
1750

    
1751
      while True:
1752
        try:
1753
          result = child.wait()
1754
          break
1755
        except EnvironmentError, err:
1756
          if err.errno == errno.EINTR:
1757
            continue
1758
          raise
1759
    finally:
1760
      # try not to leak fds
1761
      for fd in (fdstdin, ):
1762
        if fd is not None:
1763
          try:
1764
            fd.close()
1765
          except EnvironmentError, err:
1766
            # just log the error
1767
            #logging.exception("Error while closing fd %s", fd)
1768
            pass
1769

    
1770
    return result == 0, output
1771

    
1772
  def RunHooks(self, hpath, phase, env):
1773
    """Run the scripts in the hooks directory.
1774

1775
    This method will not be usually overriden by child opcodes.
1776

1777
    """
1778
    if phase == constants.HOOKS_PHASE_PRE:
1779
      suffix = "pre"
1780
    elif phase == constants.HOOKS_PHASE_POST:
1781
      suffix = "post"
1782
    else:
1783
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1784
    rr = []
1785

    
1786
    subdir = "%s-%s.d" % (hpath, suffix)
1787
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1788
    try:
1789
      dir_contents = utils.ListVisibleFiles(dir_name)
1790
    except OSError, err:
1791
      # must log
1792
      return rr
1793

    
1794
    # we use the standard python sort order,
1795
    # so 00name is the recommended naming scheme
1796
    dir_contents.sort()
1797
    for relname in dir_contents:
1798
      fname = os.path.join(dir_name, relname)
1799
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1800
          self.RE_MASK.match(relname) is not None):
1801
        rrval = constants.HKR_SKIP
1802
        output = ""
1803
      else:
1804
        result, output = self.ExecHook(fname, env)
1805
        if not result:
1806
          rrval = constants.HKR_FAIL
1807
        else:
1808
          rrval = constants.HKR_SUCCESS
1809
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1810

    
1811
    return rr
1812

    
1813

    
1814
class IAllocatorRunner(object):
1815
  """IAllocator runner.
1816

1817
  This class is instantiated on the node side (ganeti-noded) and not on
1818
  the master side.
1819

1820
  """
1821
  def Run(self, name, idata):
1822
    """Run an iallocator script.
1823

1824
    Return value: tuple of:
1825
       - run status (one of the IARUN_ constants)
1826
       - stdout
1827
       - stderr
1828
       - fail reason (as from utils.RunResult)
1829

1830
    """
1831
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1832
                                  os.path.isfile)
1833
    if alloc_script is None:
1834
      return (constants.IARUN_NOTFOUND, None, None, None)
1835

    
1836
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1837
    try:
1838
      os.write(fd, idata)
1839
      os.close(fd)
1840
      result = utils.RunCmd([alloc_script, fin_name])
1841
      if result.failed:
1842
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1843
                result.fail_reason)
1844
    finally:
1845
      os.unlink(fin_name)
1846

    
1847
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1848

    
1849

    
1850
class DevCacheManager(object):
1851
  """Simple class for managing a cache of block device information.
1852

1853
  """
1854
  _DEV_PREFIX = "/dev/"
1855
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1856

    
1857
  @classmethod
1858
  def _ConvertPath(cls, dev_path):
1859
    """Converts a /dev/name path to the cache file name.
1860

1861
    This replaces slashes with underscores and strips the /dev
1862
    prefix. It then returns the full path to the cache file
1863

1864
    """
1865
    if dev_path.startswith(cls._DEV_PREFIX):
1866
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1867
    dev_path = dev_path.replace("/", "_")
1868
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1869
    return fpath
1870

    
1871
  @classmethod
1872
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1873
    """Updates the cache information for a given device.
1874

1875
    """
1876
    if dev_path is None:
1877
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1878
      return
1879
    fpath = cls._ConvertPath(dev_path)
1880
    if on_primary:
1881
      state = "primary"
1882
    else:
1883
      state = "secondary"
1884
    if iv_name is None:
1885
      iv_name = "not_visible"
1886
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1887
    try:
1888
      utils.WriteFile(fpath, data=fdata)
1889
    except EnvironmentError, err:
1890
      logging.exception("Can't update bdev cache for %s", dev_path)
1891

    
1892
  @classmethod
1893
  def RemoveCache(cls, dev_path):
1894
    """Remove data for a dev_path.
1895

1896
    """
1897
    if dev_path is None:
1898
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
1899
      return
1900
    fpath = cls._ConvertPath(dev_path)
1901
    try:
1902
      utils.RemoveFile(fpath)
1903
    except EnvironmentError, err:
1904
      logging.exception("Can't update bdev cache for %s", dev_path)