Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ ca52cdeb

History | View | Annotate | Download (54.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetSshRunner():
48
  return ssh.SshRunner()
49

    
50

    
51
def _CleanDirectory(path):
52
  if not os.path.isdir(path):
53
    return
54
  for rel_name in utils.ListVisibleFiles(path):
55
    full_name = os.path.join(path, rel_name)
56
    if os.path.isfile(full_name) and not os.path.islink(full_name):
57
      utils.RemoveFile(full_name)
58

    
59

    
60
def _GetMasterInfo():
61
  """Return the master ip and netdev.
62

63
  """
64
  try:
65
    ss = ssconf.SimpleStore()
66
    master_netdev = ss.GetMasterNetdev()
67
    master_ip = ss.GetMasterIP()
68
  except errors.ConfigurationError, err:
69
    logging.exception("Cluster configuration incomplete")
70
    return (None, None)
71
  return (master_netdev, master_ip)
72

    
73

    
74
def StartMaster(start_daemons):
75
  """Activate local node as master node.
76

77
  The function will always try activate the IP address of the master
78
  (if someone else has it, then it won't). Then, if the start_daemons
79
  parameter is True, it will also start the master daemons
80
  (ganet-masterd and ganeti-rapi).
81

82
  """
83
  ok = True
84
  master_netdev, master_ip = _GetMasterInfo()
85
  if not master_netdev:
86
    return False
87

    
88
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
89
    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
90
                     source=constants.LOCALHOST_IP_ADDRESS):
91
      # we already have the ip:
92
      logging.debug("Already started")
93
    else:
94
      logging.error("Someone else has the master ip, not activating")
95
      ok = False
96
  else:
97
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
98
                           "dev", master_netdev, "label",
99
                           "%s:0" % master_netdev])
100
    if result.failed:
101
      logging.error("Can't activate master IP: %s", result.output)
102
      ok = False
103

    
104
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
105
                           "-s", master_ip, master_ip])
106
    # we'll ignore the exit code of arping
107

    
108
  # and now start the master and rapi daemons
109
  if start_daemons:
110
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
111
      result = utils.RunCmd([daemon])
112
      if result.failed:
113
        logging.error("Can't start daemon %s: %s", daemon, result.output)
114
        ok = False
115
  return ok
116

    
117

    
118
def StopMaster(stop_daemons):
119
  """Deactivate this node as master.
120

121
  The function will always try to deactivate the IP address of the
122
  master. Then, if the stop_daemons parameter is True, it will also
123
  stop the master daemons (ganet-masterd and ganeti-rapi).
124

125
  """
126
  master_netdev, master_ip = _GetMasterInfo()
127
  if not master_netdev:
128
    return False
129

    
130
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
131
                         "dev", master_netdev])
132
  if result.failed:
133
    logging.error("Can't remove the master IP, error: %s", result.output)
134
    # but otherwise ignore the failure
135

    
136
  if stop_daemons:
137
    # stop/kill the rapi and the master daemon
138
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
139
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
140

    
141
  return True
142

    
143

    
144
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
145
  """Joins this node to the cluster.
146

147
  This does the following:
148
      - updates the hostkeys of the machine (rsa and dsa)
149
      - adds the ssh private key to the user
150
      - adds the ssh public key to the users' authorized_keys file
151

152
  """
153
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
154
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
155
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
156
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
157
  for name, content, mode in sshd_keys:
158
    utils.WriteFile(name, data=content, mode=mode)
159

    
160
  try:
161
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
162
                                                    mkdir=True)
163
  except errors.OpExecError, err:
164
    logging.exception("Error while processing user ssh files")
165
    return False
166

    
167
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
168
    utils.WriteFile(name, data=content, mode=0600)
169

    
170
  utils.AddAuthorizedKey(auth_keys, sshpub)
171

    
172
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
173

    
174
  return True
175

    
176

    
177
def LeaveCluster():
178
  """Cleans up the current node and prepares it to be removed from the cluster.
179

180
  """
181
  _CleanDirectory(constants.DATA_DIR)
182

    
183
  JobQueuePurge()
184

    
185
  try:
186
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
187
  except errors.OpExecError:
188
    logging.exception("Error while processing ssh files")
189
    return
190

    
191
  f = open(pub_key, 'r')
192
  try:
193
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
194
  finally:
195
    f.close()
196

    
197
  utils.RemoveFile(priv_key)
198
  utils.RemoveFile(pub_key)
199

    
200
  # Return a reassuring string to the caller, and quit
201
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
202

    
203

    
204
def GetNodeInfo(vgname):
205
  """Gives back a hash with different informations about the node.
206

207
  Returns:
208
    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
209
      'memory_free' : xxx, 'memory_total' : xxx }
210
    where
211
    vg_size is the size of the configured volume group in MiB
212
    vg_free is the free size of the volume group in MiB
213
    memory_dom0 is the memory allocated for domain0 in MiB
214
    memory_free is the currently available (free) ram in MiB
215
    memory_total is the total number of ram in MiB
216

217
  """
218
  outputarray = {}
219
  vginfo = _GetVGInfo(vgname)
220
  outputarray['vg_size'] = vginfo['vg_size']
221
  outputarray['vg_free'] = vginfo['vg_free']
222

    
223
  hyper = hypervisor.GetHypervisor()
224
  hyp_info = hyper.GetNodeInfo()
225
  if hyp_info is not None:
226
    outputarray.update(hyp_info)
227

    
228
  f = open("/proc/sys/kernel/random/boot_id", 'r')
229
  try:
230
    outputarray["bootid"] = f.read(128).rstrip("\n")
231
  finally:
232
    f.close()
233

    
234
  return outputarray
235

    
236

    
237
def VerifyNode(what):
238
  """Verify the status of the local node.
239

240
  Args:
241
    what - a dictionary of things to check:
242
      'filelist' : list of files for which to compute checksums
243
      'nodelist' : list of nodes we should check communication with
244
      'hypervisor': run the hypervisor-specific verify
245

246
  Requested files on local node are checksummed and the result returned.
247

248
  The nodelist is traversed, with the following checks being made
249
  for each node:
250
  - known_hosts key correct
251
  - correct resolving of node name (target node returns its own hostname
252
    by ssh-execution of 'hostname', result compared against name in list.
253

254
  """
255
  result = {}
256

    
257
  if 'hypervisor' in what:
258
    result['hypervisor'] = hypervisor.GetHypervisor().Verify()
259

    
260
  if 'filelist' in what:
261
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
262

    
263
  if 'nodelist' in what:
264
    result['nodelist'] = {}
265
    random.shuffle(what['nodelist'])
266
    for node in what['nodelist']:
267
      success, message = _GetSshRunner().VerifyNodeHostname(node)
268
      if not success:
269
        result['nodelist'][node] = message
270
  if 'node-net-test' in what:
271
    result['node-net-test'] = {}
272
    my_name = utils.HostInfo().name
273
    my_pip = my_sip = None
274
    for name, pip, sip in what['node-net-test']:
275
      if name == my_name:
276
        my_pip = pip
277
        my_sip = sip
278
        break
279
    if not my_pip:
280
      result['node-net-test'][my_name] = ("Can't find my own"
281
                                          " primary/secondary IP"
282
                                          " in the node list")
283
    else:
284
      port = ssconf.SimpleStore().GetNodeDaemonPort()
285
      for name, pip, sip in what['node-net-test']:
286
        fail = []
287
        if not utils.TcpPing(pip, port, source=my_pip):
288
          fail.append("primary")
289
        if sip != pip:
290
          if not utils.TcpPing(sip, port, source=my_sip):
291
            fail.append("secondary")
292
        if fail:
293
          result['node-net-test'][name] = ("failure using the %s"
294
                                           " interface(s)" %
295
                                           " and ".join(fail))
296

    
297
  return result
298

    
299

    
300
def GetVolumeList(vg_name):
301
  """Compute list of logical volumes and their size.
302

303
  Returns:
304
    dictionary of all partions (key) with their size (in MiB), inactive
305
    and online status:
306
    {'test1': ('20.06', True, True)}
307

308
  """
309
  lvs = {}
310
  sep = '|'
311
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
312
                         "--separator=%s" % sep,
313
                         "-olv_name,lv_size,lv_attr", vg_name])
314
  if result.failed:
315
    logging.error("Failed to list logical volumes, lvs output: %s",
316
                  result.output)
317
    return result.output
318

    
319
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
320
  for line in result.stdout.splitlines():
321
    line = line.strip()
322
    match = valid_line_re.match(line)
323
    if not match:
324
      logging.error("Invalid line returned from lvs output: '%s'", line)
325
      continue
326
    name, size, attr = match.groups()
327
    inactive = attr[4] == '-'
328
    online = attr[5] == 'o'
329
    lvs[name] = (size, inactive, online)
330

    
331
  return lvs
332

    
333

    
334
def ListVolumeGroups():
335
  """List the volume groups and their size.
336

337
  Returns:
338
    Dictionary with keys volume name and values the size of the volume
339

340
  """
341
  return utils.ListVolumeGroups()
342

    
343

    
344
def NodeVolumes():
345
  """List all volumes on this node.
346

347
  """
348
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
349
                         "--separator=|",
350
                         "--options=lv_name,lv_size,devices,vg_name"])
351
  if result.failed:
352
    logging.error("Failed to list logical volumes, lvs output: %s",
353
                  result.output)
354
    return {}
355

    
356
  def parse_dev(dev):
357
    if '(' in dev:
358
      return dev.split('(')[0]
359
    else:
360
      return dev
361

    
362
  def map_line(line):
363
    return {
364
      'name': line[0].strip(),
365
      'size': line[1].strip(),
366
      'dev': parse_dev(line[2].strip()),
367
      'vg': line[3].strip(),
368
    }
369

    
370
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
371
          if line.count('|') >= 3]
372

    
373

    
374
def BridgesExist(bridges_list):
375
  """Check if a list of bridges exist on the current node.
376

377
  Returns:
378
    True if all of them exist, false otherwise
379

380
  """
381
  for bridge in bridges_list:
382
    if not utils.BridgeExists(bridge):
383
      return False
384

    
385
  return True
386

    
387

    
388
def GetInstanceList():
389
  """Provides a list of instances.
390

391
  Returns:
392
    A list of all running instances on the current node
393
    - instance1.example.com
394
    - instance2.example.com
395

396
  """
397
  try:
398
    names = hypervisor.GetHypervisor().ListInstances()
399
  except errors.HypervisorError, err:
400
    logging.exception("Error enumerating instances")
401
    raise
402

    
403
  return names
404

    
405

    
406
def GetInstanceInfo(instance):
407
  """Gives back the informations about an instance as a dictionary.
408

409
  Args:
410
    instance: name of the instance (ex. instance1.example.com)
411

412
  Returns:
413
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
414
    where
415
    memory: memory size of instance (int)
416
    state: xen state of instance (string)
417
    time: cpu time of instance (float)
418

419
  """
420
  output = {}
421

    
422
  iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
423
  if iinfo is not None:
424
    output['memory'] = iinfo[2]
425
    output['state'] = iinfo[4]
426
    output['time'] = iinfo[5]
427

    
428
  return output
429

    
430

    
431
def GetAllInstancesInfo():
432
  """Gather data about all instances.
433

434
  This is the equivalent of `GetInstanceInfo()`, except that it
435
  computes data for all instances at once, thus being faster if one
436
  needs data about more than one instance.
437

438
  Returns: a dictionary of dictionaries, keys being the instance name,
439
    and with values:
440
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
441
    where
442
    memory: memory size of instance (int)
443
    state: xen state of instance (string)
444
    time: cpu time of instance (float)
445
    vcpus: the number of cpus
446

447
  """
448
  output = {}
449

    
450
  iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
451
  if iinfo:
452
    for name, inst_id, memory, vcpus, state, times in iinfo:
453
      output[name] = {
454
        'memory': memory,
455
        'vcpus': vcpus,
456
        'state': state,
457
        'time': times,
458
        }
459

    
460
  return output
461

    
462

    
463
def AddOSToInstance(instance, os_disk, swap_disk):
464
  """Add an OS to an instance.
465

466
  Args:
467
    instance: the instance object
468
    os_disk: the instance-visible name of the os device
469
    swap_disk: the instance-visible name of the swap device
470

471
  """
472
  inst_os = OSFromDisk(instance.os)
473

    
474
  create_script = inst_os.create_script
475

    
476
  os_device = instance.FindDisk(os_disk)
477
  if os_device is None:
478
    logging.error("Can't find this device-visible name '%s'", os_disk)
479
    return False
480

    
481
  swap_device = instance.FindDisk(swap_disk)
482
  if swap_device is None:
483
    logging.error("Can't find this device-visible name '%s'", swap_disk)
484
    return False
485

    
486
  real_os_dev = _RecursiveFindBD(os_device)
487
  if real_os_dev is None:
488
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
489
                                  str(os_device))
490
  real_os_dev.Open()
491

    
492
  real_swap_dev = _RecursiveFindBD(swap_device)
493
  if real_swap_dev is None:
494
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
495
                                  str(swap_device))
496
  real_swap_dev.Open()
497

    
498
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
499
                                     instance.name, int(time.time()))
500
  if not os.path.exists(constants.LOG_OS_DIR):
501
    os.mkdir(constants.LOG_OS_DIR, 0750)
502

    
503
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
504
                                inst_os.path, create_script, instance.name,
505
                                real_os_dev.dev_path, real_swap_dev.dev_path,
506
                                logfile)
507

    
508
  result = utils.RunCmd(command)
509
  if result.failed:
510
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
511
                  " output: %s", command, result.fail_reason, logfile,
512
                  result.output)
513
    return False
514

    
515
  return True
516

    
517

    
518
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
519
  """Run the OS rename script for an instance.
520

521
  Args:
522
    instance: the instance object
523
    old_name: the old name of the instance
524
    os_disk: the instance-visible name of the os device
525
    swap_disk: the instance-visible name of the swap device
526

527
  """
528
  inst_os = OSFromDisk(instance.os)
529

    
530
  script = inst_os.rename_script
531

    
532
  os_device = instance.FindDisk(os_disk)
533
  if os_device is None:
534
    logging.error("Can't find this device-visible name '%s'", os_disk)
535
    return False
536

    
537
  swap_device = instance.FindDisk(swap_disk)
538
  if swap_device is None:
539
    logging.error("Can't find this device-visible name '%s'", swap_disk)
540
    return False
541

    
542
  real_os_dev = _RecursiveFindBD(os_device)
543
  if real_os_dev is None:
544
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
545
                                  str(os_device))
546
  real_os_dev.Open()
547

    
548
  real_swap_dev = _RecursiveFindBD(swap_device)
549
  if real_swap_dev is None:
550
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
551
                                  str(swap_device))
552
  real_swap_dev.Open()
553

    
554
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
555
                                           old_name,
556
                                           instance.name, int(time.time()))
557
  if not os.path.exists(constants.LOG_OS_DIR):
558
    os.mkdir(constants.LOG_OS_DIR, 0750)
559

    
560
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
561
                                inst_os.path, script, old_name, instance.name,
562
                                real_os_dev.dev_path, real_swap_dev.dev_path,
563
                                logfile)
564

    
565
  result = utils.RunCmd(command)
566

    
567
  if result.failed:
568
    logging.error("os create command '%s' returned error: %s output: %s",
569
                  command, result.fail_reason, result.output)
570
    return False
571

    
572
  return True
573

    
574

    
575
def _GetVGInfo(vg_name):
576
  """Get informations about the volume group.
577

578
  Args:
579
    vg_name: the volume group
580

581
  Returns:
582
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
583
    where
584
    vg_size is the total size of the volume group in MiB
585
    vg_free is the free size of the volume group in MiB
586
    pv_count are the number of physical disks in that vg
587

588
  If an error occurs during gathering of data, we return the same dict
589
  with keys all set to None.
590

591
  """
592
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
593

    
594
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
595
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
596

    
597
  if retval.failed:
598
    logging.error("volume group %s not present", vg_name)
599
    return retdic
600
  valarr = retval.stdout.strip().rstrip(':').split(':')
601
  if len(valarr) == 3:
602
    try:
603
      retdic = {
604
        "vg_size": int(round(float(valarr[0]), 0)),
605
        "vg_free": int(round(float(valarr[1]), 0)),
606
        "pv_count": int(valarr[2]),
607
        }
608
    except ValueError, err:
609
      logging.exception("Fail to parse vgs output")
610
  else:
611
    logging.error("vgs output has the wrong number of fields (expected"
612
                  " three): %s", str(valarr))
613
  return retdic
614

    
615

    
616
def _GatherBlockDevs(instance):
617
  """Set up an instance's block device(s).
618

619
  This is run on the primary node at instance startup. The block
620
  devices must be already assembled.
621

622
  """
623
  block_devices = []
624
  for disk in instance.disks:
625
    device = _RecursiveFindBD(disk)
626
    if device is None:
627
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
628
                                    str(disk))
629
    device.Open()
630
    block_devices.append((disk, device))
631
  return block_devices
632

    
633

    
634
def StartInstance(instance, extra_args):
635
  """Start an instance.
636

637
  Args:
638
    instance - name of instance to start.
639

640
  """
641
  running_instances = GetInstanceList()
642

    
643
  if instance.name in running_instances:
644
    return True
645

    
646
  block_devices = _GatherBlockDevs(instance)
647
  hyper = hypervisor.GetHypervisor()
648

    
649
  try:
650
    hyper.StartInstance(instance, block_devices, extra_args)
651
  except errors.HypervisorError, err:
652
    logging.exception("Failed to start instance")
653
    return False
654

    
655
  return True
656

    
657

    
658
def ShutdownInstance(instance):
659
  """Shut an instance down.
660

661
  Args:
662
    instance - name of instance to shutdown.
663

664
  """
665
  running_instances = GetInstanceList()
666

    
667
  if instance.name not in running_instances:
668
    return True
669

    
670
  hyper = hypervisor.GetHypervisor()
671
  try:
672
    hyper.StopInstance(instance)
673
  except errors.HypervisorError, err:
674
    logging.error("Failed to stop instance")
675
    return False
676

    
677
  # test every 10secs for 2min
678
  shutdown_ok = False
679

    
680
  time.sleep(1)
681
  for dummy in range(11):
682
    if instance.name not in GetInstanceList():
683
      break
684
    time.sleep(10)
685
  else:
686
    # the shutdown did not succeed
687
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
688

    
689
    try:
690
      hyper.StopInstance(instance, force=True)
691
    except errors.HypervisorError, err:
692
      logging.exception("Failed to stop instance")
693
      return False
694

    
695
    time.sleep(1)
696
    if instance.name in GetInstanceList():
697
      logging.error("could not shutdown instance '%s' even by destroy",
698
                    instance.name)
699
      return False
700

    
701
  return True
702

    
703

    
704
def RebootInstance(instance, reboot_type, extra_args):
705
  """Reboot an instance.
706

707
  Args:
708
    instance    - name of instance to reboot
709
    reboot_type - how to reboot [soft,hard,full]
710

711
  """
712
  running_instances = GetInstanceList()
713

    
714
  if instance.name not in running_instances:
715
    logging.error("Cannot reboot instance that is not running")
716
    return False
717

    
718
  hyper = hypervisor.GetHypervisor()
719
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
720
    try:
721
      hyper.RebootInstance(instance)
722
    except errors.HypervisorError, err:
723
      logging.exception("Failed to soft reboot instance")
724
      return False
725
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
726
    try:
727
      ShutdownInstance(instance)
728
      StartInstance(instance, extra_args)
729
    except errors.HypervisorError, err:
730
      logging.exception("Failed to hard reboot instance")
731
      return False
732
  else:
733
    raise errors.ParameterError("reboot_type invalid")
734

    
735

    
736
  return True
737

    
738

    
739
def MigrateInstance(instance, target, live):
740
  """Migrates an instance to another node.
741

742
  """
743
  hyper = hypervisor.GetHypervisor()
744

    
745
  try:
746
    hyper.MigrateInstance(instance, target, live)
747
  except errors.HypervisorError, err:
748
    msg = "Failed to migrate instance: %s" % str(err)
749
    logging.error(msg)
750
    return (False, msg)
751
  return (True, "Migration successfull")
752

    
753

    
754
def CreateBlockDevice(disk, size, owner, on_primary, info):
755
  """Creates a block device for an instance.
756

757
  Args:
758
   disk: a ganeti.objects.Disk object
759
   size: the size of the physical underlying device
760
   owner: a string with the name of the instance
761
   on_primary: a boolean indicating if it is the primary node or not
762
   info: string that will be sent to the physical device creation
763

764
  Returns:
765
    the new unique_id of the device (this can sometime be
766
    computed only after creation), or None. On secondary nodes,
767
    it's not required to return anything.
768

769
  """
770
  clist = []
771
  if disk.children:
772
    for child in disk.children:
773
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
774
      if on_primary or disk.AssembleOnSecondary():
775
        # we need the children open in case the device itself has to
776
        # be assembled
777
        crdev.Open()
778
      clist.append(crdev)
779
  try:
780
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
781
    if device is not None:
782
      logging.info("removing existing device %s", disk)
783
      device.Remove()
784
  except errors.BlockDeviceError, err:
785
    pass
786

    
787
  device = bdev.Create(disk.dev_type, disk.physical_id,
788
                       clist, size)
789
  if device is None:
790
    raise ValueError("Can't create child device for %s, %s" %
791
                     (disk, size))
792
  if on_primary or disk.AssembleOnSecondary():
793
    if not device.Assemble():
794
      errorstring = "Can't assemble device after creation"
795
      logging.error(errorstring)
796
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
797
                                    " daemon logs" % errorstring)
798
    device.SetSyncSpeed(constants.SYNC_SPEED)
799
    if on_primary or disk.OpenOnSecondary():
800
      device.Open(force=True)
801
    DevCacheManager.UpdateCache(device.dev_path, owner,
802
                                on_primary, disk.iv_name)
803

    
804
  device.SetInfo(info)
805

    
806
  physical_id = device.unique_id
807
  return physical_id
808

    
809

    
810
def RemoveBlockDevice(disk):
811
  """Remove a block device.
812

813
  This is intended to be called recursively.
814

815
  """
816
  try:
817
    # since we are removing the device, allow a partial match
818
    # this allows removal of broken mirrors
819
    rdev = _RecursiveFindBD(disk, allow_partial=True)
820
  except errors.BlockDeviceError, err:
821
    # probably can't attach
822
    logging.info("Can't attach to device %s in remove", disk)
823
    rdev = None
824
  if rdev is not None:
825
    r_path = rdev.dev_path
826
    result = rdev.Remove()
827
    if result:
828
      DevCacheManager.RemoveCache(r_path)
829
  else:
830
    result = True
831
  if disk.children:
832
    for child in disk.children:
833
      result = result and RemoveBlockDevice(child)
834
  return result
835

    
836

    
837
def _RecursiveAssembleBD(disk, owner, as_primary):
838
  """Activate a block device for an instance.
839

840
  This is run on the primary and secondary nodes for an instance.
841

842
  This function is called recursively.
843

844
  Args:
845
    disk: a objects.Disk object
846
    as_primary: if we should make the block device read/write
847

848
  Returns:
849
    the assembled device or None (in case no device was assembled)
850

851
  If the assembly is not successful, an exception is raised.
852

853
  """
854
  children = []
855
  if disk.children:
856
    mcn = disk.ChildrenNeeded()
857
    if mcn == -1:
858
      mcn = 0 # max number of Nones allowed
859
    else:
860
      mcn = len(disk.children) - mcn # max number of Nones
861
    for chld_disk in disk.children:
862
      try:
863
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
864
      except errors.BlockDeviceError, err:
865
        if children.count(None) >= mcn:
866
          raise
867
        cdev = None
868
        logging.debug("Error in child activation: %s", str(err))
869
      children.append(cdev)
870

    
871
  if as_primary or disk.AssembleOnSecondary():
872
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
873
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
874
    result = r_dev
875
    if as_primary or disk.OpenOnSecondary():
876
      r_dev.Open()
877
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
878
                                as_primary, disk.iv_name)
879

    
880
  else:
881
    result = True
882
  return result
883

    
884

    
885
def AssembleBlockDevice(disk, owner, as_primary):
886
  """Activate a block device for an instance.
887

888
  This is a wrapper over _RecursiveAssembleBD.
889

890
  Returns:
891
    a /dev path for primary nodes
892
    True for secondary nodes
893

894
  """
895
  result = _RecursiveAssembleBD(disk, owner, as_primary)
896
  if isinstance(result, bdev.BlockDev):
897
    result = result.dev_path
898
  return result
899

    
900

    
901
def ShutdownBlockDevice(disk):
902
  """Shut down a block device.
903

904
  First, if the device is assembled (can `Attach()`), then the device
905
  is shutdown. Then the children of the device are shutdown.
906

907
  This function is called recursively. Note that we don't cache the
908
  children or such, as oppossed to assemble, shutdown of different
909
  devices doesn't require that the upper device was active.
910

911
  """
912
  r_dev = _RecursiveFindBD(disk)
913
  if r_dev is not None:
914
    r_path = r_dev.dev_path
915
    result = r_dev.Shutdown()
916
    if result:
917
      DevCacheManager.RemoveCache(r_path)
918
  else:
919
    result = True
920
  if disk.children:
921
    for child in disk.children:
922
      result = result and ShutdownBlockDevice(child)
923
  return result
924

    
925

    
926
def MirrorAddChildren(parent_cdev, new_cdevs):
927
  """Extend a mirrored block device.
928

929
  """
930
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
931
  if parent_bdev is None:
932
    logging.error("Can't find parent device")
933
    return False
934
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
935
  if new_bdevs.count(None) > 0:
936
    logging.error("Can't find new device(s) to add: %s:%s",
937
                  new_bdevs, new_cdevs)
938
    return False
939
  parent_bdev.AddChildren(new_bdevs)
940
  return True
941

    
942

    
943
def MirrorRemoveChildren(parent_cdev, new_cdevs):
944
  """Shrink a mirrored block device.
945

946
  """
947
  parent_bdev = _RecursiveFindBD(parent_cdev)
948
  if parent_bdev is None:
949
    logging.error("Can't find parent in remove children: %s", parent_cdev)
950
    return False
951
  devs = []
952
  for disk in new_cdevs:
953
    rpath = disk.StaticDevPath()
954
    if rpath is None:
955
      bd = _RecursiveFindBD(disk)
956
      if bd is None:
957
        logging.error("Can't find dynamic device %s while removing children",
958
                      disk)
959
        return False
960
      else:
961
        devs.append(bd.dev_path)
962
    else:
963
      devs.append(rpath)
964
  parent_bdev.RemoveChildren(devs)
965
  return True
966

    
967

    
968
def GetMirrorStatus(disks):
969
  """Get the mirroring status of a list of devices.
970

971
  Args:
972
    disks: list of `objects.Disk`
973

974
  Returns:
975
    list of (mirror_done, estimated_time) tuples, which
976
    are the result of bdev.BlockDevice.CombinedSyncStatus()
977

978
  """
979
  stats = []
980
  for dsk in disks:
981
    rbd = _RecursiveFindBD(dsk)
982
    if rbd is None:
983
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
984
    stats.append(rbd.CombinedSyncStatus())
985
  return stats
986

    
987

    
988
def _RecursiveFindBD(disk, allow_partial=False):
989
  """Check if a device is activated.
990

991
  If so, return informations about the real device.
992

993
  Args:
994
    disk: the objects.Disk instance
995
    allow_partial: don't abort the find if a child of the
996
                   device can't be found; this is intended to be
997
                   used when repairing mirrors
998

999
  Returns:
1000
    None if the device can't be found
1001
    otherwise the device instance
1002

1003
  """
1004
  children = []
1005
  if disk.children:
1006
    for chdisk in disk.children:
1007
      children.append(_RecursiveFindBD(chdisk))
1008

    
1009
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1010

    
1011

    
1012
def FindBlockDevice(disk):
1013
  """Check if a device is activated.
1014

1015
  If so, return informations about the real device.
1016

1017
  Args:
1018
    disk: the objects.Disk instance
1019
  Returns:
1020
    None if the device can't be found
1021
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1022

1023
  """
1024
  rbd = _RecursiveFindBD(disk)
1025
  if rbd is None:
1026
    return rbd
1027
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1028

    
1029
def _IsJobQueueFile(file_name):
1030
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1031
  return os.path.commonprefix([queue_dir, file_name]) == queue_dir
1032

    
1033
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1034
  """Write a file to the filesystem.
1035

1036
  This allows the master to overwrite(!) a file. It will only perform
1037
  the operation if the file belongs to a list of configuration files.
1038

1039
  """
1040
  if not os.path.isabs(file_name):
1041
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1042
                  file_name)
1043
    return False
1044

    
1045
  allowed_files = [
1046
    constants.CLUSTER_CONF_FILE,
1047
    constants.ETC_HOSTS,
1048
    constants.SSH_KNOWN_HOSTS_FILE,
1049
    constants.VNC_PASSWORD_FILE,
1050
    ]
1051
  allowed_files.extend(ssconf.SimpleStore().GetFileList())
1052

    
1053
  if not (file_name in allowed_files or _IsJobQueueFile(file_name)):
1054
    logging.error("Filename passed to UploadFile not in allowed"
1055
                 " upload targets: '%s'", file_name)
1056
    return False
1057

    
1058
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1059
                  atime=atime, mtime=mtime)
1060
  return True
1061

    
1062

    
1063
def _ErrnoOrStr(err):
1064
  """Format an EnvironmentError exception.
1065

1066
  If the `err` argument has an errno attribute, it will be looked up
1067
  and converted into a textual EXXXX description. Otherwise the string
1068
  representation of the error will be returned.
1069

1070
  """
1071
  if hasattr(err, 'errno'):
1072
    detail = errno.errorcode[err.errno]
1073
  else:
1074
    detail = str(err)
1075
  return detail
1076

    
1077

    
1078
def _OSOndiskVersion(name, os_dir):
1079
  """Compute and return the API version of a given OS.
1080

1081
  This function will try to read the API version of the os given by
1082
  the 'name' parameter and residing in the 'os_dir' directory.
1083

1084
  Return value will be either an integer denoting the version or None in the
1085
  case when this is not a valid OS name.
1086

1087
  """
1088
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1089

    
1090
  try:
1091
    st = os.stat(api_file)
1092
  except EnvironmentError, err:
1093
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1094
                           " found (%s)" % _ErrnoOrStr(err))
1095

    
1096
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1097
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1098
                           " a regular file")
1099

    
1100
  try:
1101
    f = open(api_file)
1102
    try:
1103
      api_version = f.read(256)
1104
    finally:
1105
      f.close()
1106
  except EnvironmentError, err:
1107
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1108
                           " API version (%s)" % _ErrnoOrStr(err))
1109

    
1110
  api_version = api_version.strip()
1111
  try:
1112
    api_version = int(api_version)
1113
  except (TypeError, ValueError), err:
1114
    raise errors.InvalidOS(name, os_dir,
1115
                           "API version is not integer (%s)" % str(err))
1116

    
1117
  return api_version
1118

    
1119

    
1120
def DiagnoseOS(top_dirs=None):
1121
  """Compute the validity for all OSes.
1122

1123
  Returns an OS object for each name in all the given top directories
1124
  (if not given defaults to constants.OS_SEARCH_PATH)
1125

1126
  Returns:
1127
    list of OS objects
1128

1129
  """
1130
  if top_dirs is None:
1131
    top_dirs = constants.OS_SEARCH_PATH
1132

    
1133
  result = []
1134
  for dir_name in top_dirs:
1135
    if os.path.isdir(dir_name):
1136
      try:
1137
        f_names = utils.ListVisibleFiles(dir_name)
1138
      except EnvironmentError, err:
1139
        logging.exception("Can't list the OS directory %s", dir_name)
1140
        break
1141
      for name in f_names:
1142
        try:
1143
          os_inst = OSFromDisk(name, base_dir=dir_name)
1144
          result.append(os_inst)
1145
        except errors.InvalidOS, err:
1146
          result.append(objects.OS.FromInvalidOS(err))
1147

    
1148
  return result
1149

    
1150

    
1151
def OSFromDisk(name, base_dir=None):
1152
  """Create an OS instance from disk.
1153

1154
  This function will return an OS instance if the given name is a
1155
  valid OS name. Otherwise, it will raise an appropriate
1156
  `errors.InvalidOS` exception, detailing why this is not a valid
1157
  OS.
1158

1159
  Args:
1160
    os_dir: Directory containing the OS scripts. Defaults to a search
1161
            in all the OS_SEARCH_PATH directories.
1162

1163
  """
1164

    
1165
  if base_dir is None:
1166
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1167
    if os_dir is None:
1168
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1169
  else:
1170
    os_dir = os.path.sep.join([base_dir, name])
1171

    
1172
  api_version = _OSOndiskVersion(name, os_dir)
1173

    
1174
  if api_version != constants.OS_API_VERSION:
1175
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1176
                           " (found %s want %s)"
1177
                           % (api_version, constants.OS_API_VERSION))
1178

    
1179
  # OS Scripts dictionary, we will populate it with the actual script names
1180
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1181

    
1182
  for script in os_scripts:
1183
    os_scripts[script] = os.path.sep.join([os_dir, script])
1184

    
1185
    try:
1186
      st = os.stat(os_scripts[script])
1187
    except EnvironmentError, err:
1188
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1189
                             (script, _ErrnoOrStr(err)))
1190

    
1191
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1192
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1193
                             script)
1194

    
1195
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1196
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1197
                             script)
1198

    
1199

    
1200
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1201
                    create_script=os_scripts['create'],
1202
                    export_script=os_scripts['export'],
1203
                    import_script=os_scripts['import'],
1204
                    rename_script=os_scripts['rename'],
1205
                    api_version=api_version)
1206

    
1207

    
1208
def GrowBlockDevice(disk, amount):
1209
  """Grow a stack of block devices.
1210

1211
  This function is called recursively, with the childrens being the
1212
  first one resize.
1213

1214
  Args:
1215
    disk: the disk to be grown
1216

1217
  Returns: a tuple of (status, result), with:
1218
    status: the result (true/false) of the operation
1219
    result: the error message if the operation failed, otherwise not used
1220

1221
  """
1222
  r_dev = _RecursiveFindBD(disk)
1223
  if r_dev is None:
1224
    return False, "Cannot find block device %s" % (disk,)
1225

    
1226
  try:
1227
    r_dev.Grow(amount)
1228
  except errors.BlockDeviceError, err:
1229
    return False, str(err)
1230

    
1231
  return True, None
1232

    
1233

    
1234
def SnapshotBlockDevice(disk):
1235
  """Create a snapshot copy of a block device.
1236

1237
  This function is called recursively, and the snapshot is actually created
1238
  just for the leaf lvm backend device.
1239

1240
  Args:
1241
    disk: the disk to be snapshotted
1242

1243
  Returns:
1244
    a config entry for the actual lvm device snapshotted.
1245

1246
  """
1247
  if disk.children:
1248
    if len(disk.children) == 1:
1249
      # only one child, let's recurse on it
1250
      return SnapshotBlockDevice(disk.children[0])
1251
    else:
1252
      # more than one child, choose one that matches
1253
      for child in disk.children:
1254
        if child.size == disk.size:
1255
          # return implies breaking the loop
1256
          return SnapshotBlockDevice(child)
1257
  elif disk.dev_type == constants.LD_LV:
1258
    r_dev = _RecursiveFindBD(disk)
1259
    if r_dev is not None:
1260
      # let's stay on the safe side and ask for the full size, for now
1261
      return r_dev.Snapshot(disk.size)
1262
    else:
1263
      return None
1264
  else:
1265
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1266
                                 " '%s' of type '%s'" %
1267
                                 (disk.unique_id, disk.dev_type))
1268

    
1269

    
1270
def ExportSnapshot(disk, dest_node, instance):
1271
  """Export a block device snapshot to a remote node.
1272

1273
  Args:
1274
    disk: the snapshot block device
1275
    dest_node: the node to send the image to
1276
    instance: instance being exported
1277

1278
  Returns:
1279
    True if successful, False otherwise.
1280

1281
  """
1282
  inst_os = OSFromDisk(instance.os)
1283
  export_script = inst_os.export_script
1284

    
1285
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1286
                                     instance.name, int(time.time()))
1287
  if not os.path.exists(constants.LOG_OS_DIR):
1288
    os.mkdir(constants.LOG_OS_DIR, 0750)
1289

    
1290
  real_os_dev = _RecursiveFindBD(disk)
1291
  if real_os_dev is None:
1292
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1293
                                  str(disk))
1294
  real_os_dev.Open()
1295

    
1296
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1297
  destfile = disk.physical_id[1]
1298

    
1299
  # the target command is built out of three individual commands,
1300
  # which are joined by pipes; we check each individual command for
1301
  # valid parameters
1302

    
1303
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1304
                               export_script, instance.name,
1305
                               real_os_dev.dev_path, logfile)
1306

    
1307
  comprcmd = "gzip"
1308

    
1309
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1310
                                destdir, destdir, destfile)
1311
  remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1312
                                       destcmd)
1313

    
1314
  # all commands have been checked, so we're safe to combine them
1315
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1316

    
1317
  result = utils.RunCmd(command)
1318

    
1319
  if result.failed:
1320
    logging.error("os snapshot export command '%s' returned error: %s"
1321
                  " output: %s", command, result.fail_reason, result.output)
1322
    return False
1323

    
1324
  return True
1325

    
1326

    
1327
def FinalizeExport(instance, snap_disks):
1328
  """Write out the export configuration information.
1329

1330
  Args:
1331
    instance: instance configuration
1332
    snap_disks: snapshot block devices
1333

1334
  Returns:
1335
    False in case of error, True otherwise.
1336

1337
  """
1338
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1339
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1340

    
1341
  config = objects.SerializableConfigParser()
1342

    
1343
  config.add_section(constants.INISECT_EXP)
1344
  config.set(constants.INISECT_EXP, 'version', '0')
1345
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1346
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1347
  config.set(constants.INISECT_EXP, 'os', instance.os)
1348
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1349

    
1350
  config.add_section(constants.INISECT_INS)
1351
  config.set(constants.INISECT_INS, 'name', instance.name)
1352
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1353
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1354
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1355

    
1356
  nic_count = 0
1357
  for nic_count, nic in enumerate(instance.nics):
1358
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1359
               nic_count, '%s' % nic.mac)
1360
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1361
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1362
               '%s' % nic.bridge)
1363
  # TODO: redundant: on load can read nics until it doesn't exist
1364
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1365

    
1366
  disk_count = 0
1367
  for disk_count, disk in enumerate(snap_disks):
1368
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1369
               ('%s' % disk.iv_name))
1370
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1371
               ('%s' % disk.physical_id[1]))
1372
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1373
               ('%d' % disk.size))
1374
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1375

    
1376
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1377
  cfo = open(cff, 'w')
1378
  try:
1379
    config.write(cfo)
1380
  finally:
1381
    cfo.close()
1382

    
1383
  shutil.rmtree(finaldestdir, True)
1384
  shutil.move(destdir, finaldestdir)
1385

    
1386
  return True
1387

    
1388

    
1389
def ExportInfo(dest):
1390
  """Get export configuration information.
1391

1392
  Args:
1393
    dest: directory containing the export
1394

1395
  Returns:
1396
    A serializable config file containing the export info.
1397

1398
  """
1399
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1400

    
1401
  config = objects.SerializableConfigParser()
1402
  config.read(cff)
1403

    
1404
  if (not config.has_section(constants.INISECT_EXP) or
1405
      not config.has_section(constants.INISECT_INS)):
1406
    return None
1407

    
1408
  return config
1409

    
1410

    
1411
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1412
  """Import an os image into an instance.
1413

1414
  Args:
1415
    instance: the instance object
1416
    os_disk: the instance-visible name of the os device
1417
    swap_disk: the instance-visible name of the swap device
1418
    src_node: node holding the source image
1419
    src_image: path to the source image on src_node
1420

1421
  Returns:
1422
    False in case of error, True otherwise.
1423

1424
  """
1425
  inst_os = OSFromDisk(instance.os)
1426
  import_script = inst_os.import_script
1427

    
1428
  os_device = instance.FindDisk(os_disk)
1429
  if os_device is None:
1430
    logging.error("Can't find this device-visible name '%s'", os_disk)
1431
    return False
1432

    
1433
  swap_device = instance.FindDisk(swap_disk)
1434
  if swap_device is None:
1435
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1436
    return False
1437

    
1438
  real_os_dev = _RecursiveFindBD(os_device)
1439
  if real_os_dev is None:
1440
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1441
                                  str(os_device))
1442
  real_os_dev.Open()
1443

    
1444
  real_swap_dev = _RecursiveFindBD(swap_device)
1445
  if real_swap_dev is None:
1446
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1447
                                  str(swap_device))
1448
  real_swap_dev.Open()
1449

    
1450
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1451
                                        instance.name, int(time.time()))
1452
  if not os.path.exists(constants.LOG_OS_DIR):
1453
    os.mkdir(constants.LOG_OS_DIR, 0750)
1454

    
1455
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1456
  remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1457
                                       destcmd)
1458

    
1459
  comprcmd = "gunzip"
1460
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1461
                               inst_os.path, import_script, instance.name,
1462
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1463
                               logfile)
1464

    
1465
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1466

    
1467
  result = utils.RunCmd(command)
1468

    
1469
  if result.failed:
1470
    logging.error("os import command '%s' returned error: %s"
1471
                  " output: %s", command, result.fail_reason, result.output)
1472
    return False
1473

    
1474
  return True
1475

    
1476

    
1477
def ListExports():
1478
  """Return a list of exports currently available on this machine.
1479

1480
  """
1481
  if os.path.isdir(constants.EXPORT_DIR):
1482
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1483
  else:
1484
    return []
1485

    
1486

    
1487
def RemoveExport(export):
1488
  """Remove an existing export from the node.
1489

1490
  Args:
1491
    export: the name of the export to remove
1492

1493
  Returns:
1494
    False in case of error, True otherwise.
1495

1496
  """
1497
  target = os.path.join(constants.EXPORT_DIR, export)
1498

    
1499
  shutil.rmtree(target)
1500
  # TODO: catch some of the relevant exceptions and provide a pretty
1501
  # error message if rmtree fails.
1502

    
1503
  return True
1504

    
1505

    
1506
def RenameBlockDevices(devlist):
1507
  """Rename a list of block devices.
1508

1509
  The devlist argument is a list of tuples (disk, new_logical,
1510
  new_physical). The return value will be a combined boolean result
1511
  (True only if all renames succeeded).
1512

1513
  """
1514
  result = True
1515
  for disk, unique_id in devlist:
1516
    dev = _RecursiveFindBD(disk)
1517
    if dev is None:
1518
      result = False
1519
      continue
1520
    try:
1521
      old_rpath = dev.dev_path
1522
      dev.Rename(unique_id)
1523
      new_rpath = dev.dev_path
1524
      if old_rpath != new_rpath:
1525
        DevCacheManager.RemoveCache(old_rpath)
1526
        # FIXME: we should add the new cache information here, like:
1527
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1528
        # but we don't have the owner here - maybe parse from existing
1529
        # cache? for now, we only lose lvm data when we rename, which
1530
        # is less critical than DRBD or MD
1531
    except errors.BlockDeviceError, err:
1532
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1533
      result = False
1534
  return result
1535

    
1536

    
1537
def _TransformFileStorageDir(file_storage_dir):
1538
  """Checks whether given file_storage_dir is valid.
1539

1540
  Checks wheter the given file_storage_dir is within the cluster-wide
1541
  default file_storage_dir stored in SimpleStore. Only paths under that
1542
  directory are allowed.
1543

1544
  Args:
1545
    file_storage_dir: string with path
1546

1547
  Returns:
1548
    normalized file_storage_dir (string) if valid, None otherwise
1549

1550
  """
1551
  file_storage_dir = os.path.normpath(file_storage_dir)
1552
  base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1553
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1554
      base_file_storage_dir):
1555
    logging.error("file storage directory '%s' is not under base file"
1556
                  " storage directory '%s'",
1557
                  file_storage_dir, base_file_storage_dir)
1558
    return None
1559
  return file_storage_dir
1560

    
1561

    
1562
def CreateFileStorageDir(file_storage_dir):
1563
  """Create file storage directory.
1564

1565
  Args:
1566
    file_storage_dir: string containing the path
1567

1568
  Returns:
1569
    tuple with first element a boolean indicating wheter dir
1570
    creation was successful or not
1571

1572
  """
1573
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1574
  result = True,
1575
  if not file_storage_dir:
1576
    result = False,
1577
  else:
1578
    if os.path.exists(file_storage_dir):
1579
      if not os.path.isdir(file_storage_dir):
1580
        logging.error("'%s' is not a directory", file_storage_dir)
1581
        result = False,
1582
    else:
1583
      try:
1584
        os.makedirs(file_storage_dir, 0750)
1585
      except OSError, err:
1586
        logging.error("Cannot create file storage directory '%s': %s",
1587
                      file_storage_dir, err)
1588
        result = False,
1589
  return result
1590

    
1591

    
1592
def RemoveFileStorageDir(file_storage_dir):
1593
  """Remove file storage directory.
1594

1595
  Remove it only if it's empty. If not log an error and return.
1596

1597
  Args:
1598
    file_storage_dir: string containing the path
1599

1600
  Returns:
1601
    tuple with first element a boolean indicating wheter dir
1602
    removal was successful or not
1603

1604
  """
1605
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1606
  result = True,
1607
  if not file_storage_dir:
1608
    result = False,
1609
  else:
1610
    if os.path.exists(file_storage_dir):
1611
      if not os.path.isdir(file_storage_dir):
1612
        logging.error("'%s' is not a directory", file_storage_dir)
1613
        result = False,
1614
      # deletes dir only if empty, otherwise we want to return False
1615
      try:
1616
        os.rmdir(file_storage_dir)
1617
      except OSError, err:
1618
        logging.exception("Cannot remove file storage directory '%s'",
1619
                          file_storage_dir)
1620
        result = False,
1621
  return result
1622

    
1623

    
1624
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1625
  """Rename the file storage directory.
1626

1627
  Args:
1628
    old_file_storage_dir: string containing the old path
1629
    new_file_storage_dir: string containing the new path
1630

1631
  Returns:
1632
    tuple with first element a boolean indicating wheter dir
1633
    rename was successful or not
1634

1635
  """
1636
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1637
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1638
  result = True,
1639
  if not old_file_storage_dir or not new_file_storage_dir:
1640
    result = False,
1641
  else:
1642
    if not os.path.exists(new_file_storage_dir):
1643
      if os.path.isdir(old_file_storage_dir):
1644
        try:
1645
          os.rename(old_file_storage_dir, new_file_storage_dir)
1646
        except OSError, err:
1647
          logging.exception("Cannot rename '%s' to '%s'",
1648
                            old_file_storage_dir, new_file_storage_dir)
1649
          result =  False,
1650
      else:
1651
        logging.error("'%s' is not a directory", old_file_storage_dir)
1652
        result = False,
1653
    else:
1654
      if os.path.exists(old_file_storage_dir):
1655
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1656
                      old_file_storage_dir, new_file_storage_dir)
1657
        result = False,
1658
  return result
1659

    
1660

    
1661
def JobQueueUpdate(file_name, content):
1662
  """Updates a file in the queue directory.
1663

1664
  """
1665
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1666
  if os.path.commonprefix([queue_dir, file_name]) != queue_dir:
1667
    logging.error("'%s' is not a file in the queue directory",
1668
                  file_name)
1669
    return False
1670

    
1671
  # Write and replace the file atomically
1672
  utils.WriteFile(file_name, data=content)
1673

    
1674
  return True
1675

    
1676

    
1677
def JobQueuePurge():
1678
  """Removes job queue files and archived jobs
1679

1680
  """
1681
  _CleanDirectory(constants.QUEUE_DIR)
1682
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
1683

    
1684

    
1685
def CloseBlockDevices(disks):
1686
  """Closes the given block devices.
1687

1688
  This means they will be switched to secondary mode (in case of DRBD).
1689

1690
  """
1691
  bdevs = []
1692
  for cf in disks:
1693
    rd = _RecursiveFindBD(cf)
1694
    if rd is None:
1695
      return (False, "Can't find device %s" % cf)
1696
    bdevs.append(rd)
1697

    
1698
  msg = []
1699
  for rd in bdevs:
1700
    try:
1701
      rd.Close()
1702
    except errors.BlockDeviceError, err:
1703
      msg.append(str(err))
1704
  if msg:
1705
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1706
  else:
1707
    return (True, "All devices secondary")
1708

    
1709

    
1710
class HooksRunner(object):
1711
  """Hook runner.
1712

1713
  This class is instantiated on the node side (ganeti-noded) and not on
1714
  the master side.
1715

1716
  """
1717
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1718

    
1719
  def __init__(self, hooks_base_dir=None):
1720
    """Constructor for hooks runner.
1721

1722
    Args:
1723
      - hooks_base_dir: if not None, this overrides the
1724
        constants.HOOKS_BASE_DIR (useful for unittests)
1725

1726
    """
1727
    if hooks_base_dir is None:
1728
      hooks_base_dir = constants.HOOKS_BASE_DIR
1729
    self._BASE_DIR = hooks_base_dir
1730

    
1731
  @staticmethod
1732
  def ExecHook(script, env):
1733
    """Exec one hook script.
1734

1735
    Args:
1736
     - script: the full path to the script
1737
     - env: the environment with which to exec the script
1738

1739
    """
1740
    # exec the process using subprocess and log the output
1741
    fdstdin = None
1742
    try:
1743
      fdstdin = open("/dev/null", "r")
1744
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1745
                               stderr=subprocess.STDOUT, close_fds=True,
1746
                               shell=False, cwd="/", env=env)
1747
      output = ""
1748
      try:
1749
        output = child.stdout.read(4096)
1750
        child.stdout.close()
1751
      except EnvironmentError, err:
1752
        output += "Hook script error: %s" % str(err)
1753

    
1754
      while True:
1755
        try:
1756
          result = child.wait()
1757
          break
1758
        except EnvironmentError, err:
1759
          if err.errno == errno.EINTR:
1760
            continue
1761
          raise
1762
    finally:
1763
      # try not to leak fds
1764
      for fd in (fdstdin, ):
1765
        if fd is not None:
1766
          try:
1767
            fd.close()
1768
          except EnvironmentError, err:
1769
            # just log the error
1770
            #logging.exception("Error while closing fd %s", fd)
1771
            pass
1772

    
1773
    return result == 0, output
1774

    
1775
  def RunHooks(self, hpath, phase, env):
1776
    """Run the scripts in the hooks directory.
1777

1778
    This method will not be usually overriden by child opcodes.
1779

1780
    """
1781
    if phase == constants.HOOKS_PHASE_PRE:
1782
      suffix = "pre"
1783
    elif phase == constants.HOOKS_PHASE_POST:
1784
      suffix = "post"
1785
    else:
1786
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1787
    rr = []
1788

    
1789
    subdir = "%s-%s.d" % (hpath, suffix)
1790
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1791
    try:
1792
      dir_contents = utils.ListVisibleFiles(dir_name)
1793
    except OSError, err:
1794
      # must log
1795
      return rr
1796

    
1797
    # we use the standard python sort order,
1798
    # so 00name is the recommended naming scheme
1799
    dir_contents.sort()
1800
    for relname in dir_contents:
1801
      fname = os.path.join(dir_name, relname)
1802
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1803
          self.RE_MASK.match(relname) is not None):
1804
        rrval = constants.HKR_SKIP
1805
        output = ""
1806
      else:
1807
        result, output = self.ExecHook(fname, env)
1808
        if not result:
1809
          rrval = constants.HKR_FAIL
1810
        else:
1811
          rrval = constants.HKR_SUCCESS
1812
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1813

    
1814
    return rr
1815

    
1816

    
1817
class IAllocatorRunner(object):
1818
  """IAllocator runner.
1819

1820
  This class is instantiated on the node side (ganeti-noded) and not on
1821
  the master side.
1822

1823
  """
1824
  def Run(self, name, idata):
1825
    """Run an iallocator script.
1826

1827
    Return value: tuple of:
1828
       - run status (one of the IARUN_ constants)
1829
       - stdout
1830
       - stderr
1831
       - fail reason (as from utils.RunResult)
1832

1833
    """
1834
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1835
                                  os.path.isfile)
1836
    if alloc_script is None:
1837
      return (constants.IARUN_NOTFOUND, None, None, None)
1838

    
1839
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1840
    try:
1841
      os.write(fd, idata)
1842
      os.close(fd)
1843
      result = utils.RunCmd([alloc_script, fin_name])
1844
      if result.failed:
1845
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1846
                result.fail_reason)
1847
    finally:
1848
      os.unlink(fin_name)
1849

    
1850
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1851

    
1852

    
1853
class DevCacheManager(object):
1854
  """Simple class for managing a cache of block device information.
1855

1856
  """
1857
  _DEV_PREFIX = "/dev/"
1858
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1859

    
1860
  @classmethod
1861
  def _ConvertPath(cls, dev_path):
1862
    """Converts a /dev/name path to the cache file name.
1863

1864
    This replaces slashes with underscores and strips the /dev
1865
    prefix. It then returns the full path to the cache file
1866

1867
    """
1868
    if dev_path.startswith(cls._DEV_PREFIX):
1869
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1870
    dev_path = dev_path.replace("/", "_")
1871
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1872
    return fpath
1873

    
1874
  @classmethod
1875
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1876
    """Updates the cache information for a given device.
1877

1878
    """
1879
    if dev_path is None:
1880
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1881
      return
1882
    fpath = cls._ConvertPath(dev_path)
1883
    if on_primary:
1884
      state = "primary"
1885
    else:
1886
      state = "secondary"
1887
    if iv_name is None:
1888
      iv_name = "not_visible"
1889
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1890
    try:
1891
      utils.WriteFile(fpath, data=fdata)
1892
    except EnvironmentError, err:
1893
      logging.exception("Can't update bdev cache for %s", dev_path)
1894

    
1895
  @classmethod
1896
  def RemoveCache(cls, dev_path):
1897
    """Remove data for a dev_path.
1898

1899
    """
1900
    if dev_path is None:
1901
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
1902
      return
1903
    fpath = cls._ConvertPath(dev_path)
1904
    try:
1905
      utils.RemoveFile(fpath)
1906
    except EnvironmentError, err:
1907
      logging.exception("Can't update bdev cache for %s", dev_path)