Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 76ab5558

History | View | Annotate | Download (55.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetSshRunner():
48
  return ssh.SshRunner()
49

    
50

    
51
def _CleanDirectory(path, exclude=[]):
52
  """Removes all regular files in a directory.
53

54
  @param exclude: List of files to be excluded.
55
  @type exclude: list
56

57
  """
58
  if not os.path.isdir(path):
59
    return
60

    
61
  # Normalize excluded paths
62
  exclude = [os.path.normpath(i) for i in exclude]
63

    
64
  for rel_name in utils.ListVisibleFiles(path):
65
    full_name = os.path.normpath(os.path.join(path, rel_name))
66
    if full_name in exclude:
67
      continue
68
    if os.path.isfile(full_name) and not os.path.islink(full_name):
69
      utils.RemoveFile(full_name)
70

    
71

    
72
def _GetMasterInfo():
73
  """Return the master ip and netdev.
74

75
  """
76
  try:
77
    ss = ssconf.SimpleStore()
78
    master_netdev = ss.GetMasterNetdev()
79
    master_ip = ss.GetMasterIP()
80
  except errors.ConfigurationError, err:
81
    logging.exception("Cluster configuration incomplete")
82
    return (None, None)
83
  return (master_netdev, master_ip)
84

    
85

    
86
def StartMaster(start_daemons):
87
  """Activate local node as master node.
88

89
  The function will always try activate the IP address of the master
90
  (if someone else has it, then it won't). Then, if the start_daemons
91
  parameter is True, it will also start the master daemons
92
  (ganet-masterd and ganeti-rapi).
93

94
  """
95
  ok = True
96
  master_netdev, master_ip = _GetMasterInfo()
97
  if not master_netdev:
98
    return False
99

    
100
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
101
    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
102
                     source=constants.LOCALHOST_IP_ADDRESS):
103
      # we already have the ip:
104
      logging.debug("Already started")
105
    else:
106
      logging.error("Someone else has the master ip, not activating")
107
      ok = False
108
  else:
109
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
110
                           "dev", master_netdev, "label",
111
                           "%s:0" % master_netdev])
112
    if result.failed:
113
      logging.error("Can't activate master IP: %s", result.output)
114
      ok = False
115

    
116
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
117
                           "-s", master_ip, master_ip])
118
    # we'll ignore the exit code of arping
119

    
120
  # and now start the master and rapi daemons
121
  if start_daemons:
122
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
123
      result = utils.RunCmd([daemon])
124
      if result.failed:
125
        logging.error("Can't start daemon %s: %s", daemon, result.output)
126
        ok = False
127
  return ok
128

    
129

    
130
def StopMaster(stop_daemons):
131
  """Deactivate this node as master.
132

133
  The function will always try to deactivate the IP address of the
134
  master. Then, if the stop_daemons parameter is True, it will also
135
  stop the master daemons (ganet-masterd and ganeti-rapi).
136

137
  """
138
  master_netdev, master_ip = _GetMasterInfo()
139
  if not master_netdev:
140
    return False
141

    
142
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
143
                         "dev", master_netdev])
144
  if result.failed:
145
    logging.error("Can't remove the master IP, error: %s", result.output)
146
    # but otherwise ignore the failure
147

    
148
  if stop_daemons:
149
    # stop/kill the rapi and the master daemon
150
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
151
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
152

    
153
  return True
154

    
155

    
156
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
157
  """Joins this node to the cluster.
158

159
  This does the following:
160
      - updates the hostkeys of the machine (rsa and dsa)
161
      - adds the ssh private key to the user
162
      - adds the ssh public key to the users' authorized_keys file
163

164
  """
165
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
166
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
167
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
168
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
169
  for name, content, mode in sshd_keys:
170
    utils.WriteFile(name, data=content, mode=mode)
171

    
172
  try:
173
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
174
                                                    mkdir=True)
175
  except errors.OpExecError, err:
176
    logging.exception("Error while processing user ssh files")
177
    return False
178

    
179
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
180
    utils.WriteFile(name, data=content, mode=0600)
181

    
182
  utils.AddAuthorizedKey(auth_keys, sshpub)
183

    
184
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
185

    
186
  return True
187

    
188

    
189
def LeaveCluster():
190
  """Cleans up the current node and prepares it to be removed from the cluster.
191

192
  """
193
  _CleanDirectory(constants.DATA_DIR)
194

    
195
  JobQueuePurge()
196

    
197
  try:
198
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
199
  except errors.OpExecError:
200
    logging.exception("Error while processing ssh files")
201
    return
202

    
203
  f = open(pub_key, 'r')
204
  try:
205
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
206
  finally:
207
    f.close()
208

    
209
  utils.RemoveFile(priv_key)
210
  utils.RemoveFile(pub_key)
211

    
212
  # Return a reassuring string to the caller, and quit
213
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
214

    
215

    
216
def GetNodeInfo(vgname):
217
  """Gives back a hash with different informations about the node.
218

219
  Returns:
220
    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
221
      'memory_free' : xxx, 'memory_total' : xxx }
222
    where
223
    vg_size is the size of the configured volume group in MiB
224
    vg_free is the free size of the volume group in MiB
225
    memory_dom0 is the memory allocated for domain0 in MiB
226
    memory_free is the currently available (free) ram in MiB
227
    memory_total is the total number of ram in MiB
228

229
  """
230
  outputarray = {}
231
  vginfo = _GetVGInfo(vgname)
232
  outputarray['vg_size'] = vginfo['vg_size']
233
  outputarray['vg_free'] = vginfo['vg_free']
234

    
235
  hyper = hypervisor.GetHypervisor()
236
  hyp_info = hyper.GetNodeInfo()
237
  if hyp_info is not None:
238
    outputarray.update(hyp_info)
239

    
240
  f = open("/proc/sys/kernel/random/boot_id", 'r')
241
  try:
242
    outputarray["bootid"] = f.read(128).rstrip("\n")
243
  finally:
244
    f.close()
245

    
246
  return outputarray
247

    
248

    
249
def VerifyNode(what):
250
  """Verify the status of the local node.
251

252
  Args:
253
    what - a dictionary of things to check:
254
      'filelist' : list of files for which to compute checksums
255
      'nodelist' : list of nodes we should check communication with
256
      'hypervisor': run the hypervisor-specific verify
257

258
  Requested files on local node are checksummed and the result returned.
259

260
  The nodelist is traversed, with the following checks being made
261
  for each node:
262
  - known_hosts key correct
263
  - correct resolving of node name (target node returns its own hostname
264
    by ssh-execution of 'hostname', result compared against name in list.
265

266
  """
267
  result = {}
268

    
269
  if 'hypervisor' in what:
270
    result['hypervisor'] = hypervisor.GetHypervisor().Verify()
271

    
272
  if 'filelist' in what:
273
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
274

    
275
  if 'nodelist' in what:
276
    result['nodelist'] = {}
277
    random.shuffle(what['nodelist'])
278
    for node in what['nodelist']:
279
      success, message = _GetSshRunner().VerifyNodeHostname(node)
280
      if not success:
281
        result['nodelist'][node] = message
282
  if 'node-net-test' in what:
283
    result['node-net-test'] = {}
284
    my_name = utils.HostInfo().name
285
    my_pip = my_sip = None
286
    for name, pip, sip in what['node-net-test']:
287
      if name == my_name:
288
        my_pip = pip
289
        my_sip = sip
290
        break
291
    if not my_pip:
292
      result['node-net-test'][my_name] = ("Can't find my own"
293
                                          " primary/secondary IP"
294
                                          " in the node list")
295
    else:
296
      port = ssconf.SimpleStore().GetNodeDaemonPort()
297
      for name, pip, sip in what['node-net-test']:
298
        fail = []
299
        if not utils.TcpPing(pip, port, source=my_pip):
300
          fail.append("primary")
301
        if sip != pip:
302
          if not utils.TcpPing(sip, port, source=my_sip):
303
            fail.append("secondary")
304
        if fail:
305
          result['node-net-test'][name] = ("failure using the %s"
306
                                           " interface(s)" %
307
                                           " and ".join(fail))
308

    
309
  return result
310

    
311

    
312
def GetVolumeList(vg_name):
313
  """Compute list of logical volumes and their size.
314

315
  Returns:
316
    dictionary of all partions (key) with their size (in MiB), inactive
317
    and online status:
318
    {'test1': ('20.06', True, True)}
319

320
  """
321
  lvs = {}
322
  sep = '|'
323
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
324
                         "--separator=%s" % sep,
325
                         "-olv_name,lv_size,lv_attr", vg_name])
326
  if result.failed:
327
    logging.error("Failed to list logical volumes, lvs output: %s",
328
                  result.output)
329
    return result.output
330

    
331
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
332
  for line in result.stdout.splitlines():
333
    line = line.strip()
334
    match = valid_line_re.match(line)
335
    if not match:
336
      logging.error("Invalid line returned from lvs output: '%s'", line)
337
      continue
338
    name, size, attr = match.groups()
339
    inactive = attr[4] == '-'
340
    online = attr[5] == 'o'
341
    lvs[name] = (size, inactive, online)
342

    
343
  return lvs
344

    
345

    
346
def ListVolumeGroups():
347
  """List the volume groups and their size.
348

349
  Returns:
350
    Dictionary with keys volume name and values the size of the volume
351

352
  """
353
  return utils.ListVolumeGroups()
354

    
355

    
356
def NodeVolumes():
357
  """List all volumes on this node.
358

359
  """
360
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
361
                         "--separator=|",
362
                         "--options=lv_name,lv_size,devices,vg_name"])
363
  if result.failed:
364
    logging.error("Failed to list logical volumes, lvs output: %s",
365
                  result.output)
366
    return {}
367

    
368
  def parse_dev(dev):
369
    if '(' in dev:
370
      return dev.split('(')[0]
371
    else:
372
      return dev
373

    
374
  def map_line(line):
375
    return {
376
      'name': line[0].strip(),
377
      'size': line[1].strip(),
378
      'dev': parse_dev(line[2].strip()),
379
      'vg': line[3].strip(),
380
    }
381

    
382
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
383
          if line.count('|') >= 3]
384

    
385

    
386
def BridgesExist(bridges_list):
387
  """Check if a list of bridges exist on the current node.
388

389
  Returns:
390
    True if all of them exist, false otherwise
391

392
  """
393
  for bridge in bridges_list:
394
    if not utils.BridgeExists(bridge):
395
      return False
396

    
397
  return True
398

    
399

    
400
def GetInstanceList():
401
  """Provides a list of instances.
402

403
  Returns:
404
    A list of all running instances on the current node
405
    - instance1.example.com
406
    - instance2.example.com
407

408
  """
409
  try:
410
    names = hypervisor.GetHypervisor().ListInstances()
411
  except errors.HypervisorError, err:
412
    logging.exception("Error enumerating instances")
413
    raise
414

    
415
  return names
416

    
417

    
418
def GetInstanceInfo(instance):
419
  """Gives back the informations about an instance as a dictionary.
420

421
  Args:
422
    instance: name of the instance (ex. instance1.example.com)
423

424
  Returns:
425
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
426
    where
427
    memory: memory size of instance (int)
428
    state: xen state of instance (string)
429
    time: cpu time of instance (float)
430

431
  """
432
  output = {}
433

    
434
  iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
435
  if iinfo is not None:
436
    output['memory'] = iinfo[2]
437
    output['state'] = iinfo[4]
438
    output['time'] = iinfo[5]
439

    
440
  return output
441

    
442

    
443
def GetAllInstancesInfo():
444
  """Gather data about all instances.
445

446
  This is the equivalent of `GetInstanceInfo()`, except that it
447
  computes data for all instances at once, thus being faster if one
448
  needs data about more than one instance.
449

450
  Returns: a dictionary of dictionaries, keys being the instance name,
451
    and with values:
452
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
453
    where
454
    memory: memory size of instance (int)
455
    state: xen state of instance (string)
456
    time: cpu time of instance (float)
457
    vcpus: the number of cpus
458

459
  """
460
  output = {}
461

    
462
  iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
463
  if iinfo:
464
    for name, inst_id, memory, vcpus, state, times in iinfo:
465
      output[name] = {
466
        'memory': memory,
467
        'vcpus': vcpus,
468
        'state': state,
469
        'time': times,
470
        }
471

    
472
  return output
473

    
474

    
475
def AddOSToInstance(instance, os_disk, swap_disk):
476
  """Add an OS to an instance.
477

478
  Args:
479
    instance: the instance object
480
    os_disk: the instance-visible name of the os device
481
    swap_disk: the instance-visible name of the swap device
482

483
  """
484
  inst_os = OSFromDisk(instance.os)
485

    
486
  create_script = inst_os.create_script
487

    
488
  os_device = instance.FindDisk(os_disk)
489
  if os_device is None:
490
    logging.error("Can't find this device-visible name '%s'", os_disk)
491
    return False
492

    
493
  swap_device = instance.FindDisk(swap_disk)
494
  if swap_device is None:
495
    logging.error("Can't find this device-visible name '%s'", swap_disk)
496
    return False
497

    
498
  real_os_dev = _RecursiveFindBD(os_device)
499
  if real_os_dev is None:
500
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
501
                                  str(os_device))
502
  real_os_dev.Open()
503

    
504
  real_swap_dev = _RecursiveFindBD(swap_device)
505
  if real_swap_dev is None:
506
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
507
                                  str(swap_device))
508
  real_swap_dev.Open()
509

    
510
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
511
                                     instance.name, int(time.time()))
512
  if not os.path.exists(constants.LOG_OS_DIR):
513
    os.mkdir(constants.LOG_OS_DIR, 0750)
514

    
515
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
516
                                inst_os.path, create_script, instance.name,
517
                                real_os_dev.dev_path, real_swap_dev.dev_path,
518
                                logfile)
519

    
520
  result = utils.RunCmd(command)
521
  if result.failed:
522
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
523
                  " output: %s", command, result.fail_reason, logfile,
524
                  result.output)
525
    return False
526

    
527
  return True
528

    
529

    
530
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
531
  """Run the OS rename script for an instance.
532

533
  Args:
534
    instance: the instance object
535
    old_name: the old name of the instance
536
    os_disk: the instance-visible name of the os device
537
    swap_disk: the instance-visible name of the swap device
538

539
  """
540
  inst_os = OSFromDisk(instance.os)
541

    
542
  script = inst_os.rename_script
543

    
544
  os_device = instance.FindDisk(os_disk)
545
  if os_device is None:
546
    logging.error("Can't find this device-visible name '%s'", os_disk)
547
    return False
548

    
549
  swap_device = instance.FindDisk(swap_disk)
550
  if swap_device is None:
551
    logging.error("Can't find this device-visible name '%s'", swap_disk)
552
    return False
553

    
554
  real_os_dev = _RecursiveFindBD(os_device)
555
  if real_os_dev is None:
556
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
557
                                  str(os_device))
558
  real_os_dev.Open()
559

    
560
  real_swap_dev = _RecursiveFindBD(swap_device)
561
  if real_swap_dev is None:
562
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
563
                                  str(swap_device))
564
  real_swap_dev.Open()
565

    
566
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
567
                                           old_name,
568
                                           instance.name, int(time.time()))
569
  if not os.path.exists(constants.LOG_OS_DIR):
570
    os.mkdir(constants.LOG_OS_DIR, 0750)
571

    
572
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
573
                                inst_os.path, script, old_name, instance.name,
574
                                real_os_dev.dev_path, real_swap_dev.dev_path,
575
                                logfile)
576

    
577
  result = utils.RunCmd(command)
578

    
579
  if result.failed:
580
    logging.error("os create command '%s' returned error: %s output: %s",
581
                  command, result.fail_reason, result.output)
582
    return False
583

    
584
  return True
585

    
586

    
587
def _GetVGInfo(vg_name):
588
  """Get informations about the volume group.
589

590
  Args:
591
    vg_name: the volume group
592

593
  Returns:
594
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
595
    where
596
    vg_size is the total size of the volume group in MiB
597
    vg_free is the free size of the volume group in MiB
598
    pv_count are the number of physical disks in that vg
599

600
  If an error occurs during gathering of data, we return the same dict
601
  with keys all set to None.
602

603
  """
604
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
605

    
606
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
607
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
608

    
609
  if retval.failed:
610
    logging.error("volume group %s not present", vg_name)
611
    return retdic
612
  valarr = retval.stdout.strip().rstrip(':').split(':')
613
  if len(valarr) == 3:
614
    try:
615
      retdic = {
616
        "vg_size": int(round(float(valarr[0]), 0)),
617
        "vg_free": int(round(float(valarr[1]), 0)),
618
        "pv_count": int(valarr[2]),
619
        }
620
    except ValueError, err:
621
      logging.exception("Fail to parse vgs output")
622
  else:
623
    logging.error("vgs output has the wrong number of fields (expected"
624
                  " three): %s", str(valarr))
625
  return retdic
626

    
627

    
628
def _GatherBlockDevs(instance):
629
  """Set up an instance's block device(s).
630

631
  This is run on the primary node at instance startup. The block
632
  devices must be already assembled.
633

634
  """
635
  block_devices = []
636
  for disk in instance.disks:
637
    device = _RecursiveFindBD(disk)
638
    if device is None:
639
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
640
                                    str(disk))
641
    device.Open()
642
    block_devices.append((disk, device))
643
  return block_devices
644

    
645

    
646
def StartInstance(instance, extra_args):
647
  """Start an instance.
648

649
  Args:
650
    instance - name of instance to start.
651

652
  """
653
  running_instances = GetInstanceList()
654

    
655
  if instance.name in running_instances:
656
    return True
657

    
658
  block_devices = _GatherBlockDevs(instance)
659
  hyper = hypervisor.GetHypervisor()
660

    
661
  try:
662
    hyper.StartInstance(instance, block_devices, extra_args)
663
  except errors.HypervisorError, err:
664
    logging.exception("Failed to start instance")
665
    return False
666

    
667
  return True
668

    
669

    
670
def ShutdownInstance(instance):
671
  """Shut an instance down.
672

673
  Args:
674
    instance - name of instance to shutdown.
675

676
  """
677
  running_instances = GetInstanceList()
678

    
679
  if instance.name not in running_instances:
680
    return True
681

    
682
  hyper = hypervisor.GetHypervisor()
683
  try:
684
    hyper.StopInstance(instance)
685
  except errors.HypervisorError, err:
686
    logging.error("Failed to stop instance")
687
    return False
688

    
689
  # test every 10secs for 2min
690
  shutdown_ok = False
691

    
692
  time.sleep(1)
693
  for dummy in range(11):
694
    if instance.name not in GetInstanceList():
695
      break
696
    time.sleep(10)
697
  else:
698
    # the shutdown did not succeed
699
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
700

    
701
    try:
702
      hyper.StopInstance(instance, force=True)
703
    except errors.HypervisorError, err:
704
      logging.exception("Failed to stop instance")
705
      return False
706

    
707
    time.sleep(1)
708
    if instance.name in GetInstanceList():
709
      logging.error("could not shutdown instance '%s' even by destroy",
710
                    instance.name)
711
      return False
712

    
713
  return True
714

    
715

    
716
def RebootInstance(instance, reboot_type, extra_args):
717
  """Reboot an instance.
718

719
  Args:
720
    instance    - name of instance to reboot
721
    reboot_type - how to reboot [soft,hard,full]
722

723
  """
724
  running_instances = GetInstanceList()
725

    
726
  if instance.name not in running_instances:
727
    logging.error("Cannot reboot instance that is not running")
728
    return False
729

    
730
  hyper = hypervisor.GetHypervisor()
731
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
732
    try:
733
      hyper.RebootInstance(instance)
734
    except errors.HypervisorError, err:
735
      logging.exception("Failed to soft reboot instance")
736
      return False
737
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
738
    try:
739
      ShutdownInstance(instance)
740
      StartInstance(instance, extra_args)
741
    except errors.HypervisorError, err:
742
      logging.exception("Failed to hard reboot instance")
743
      return False
744
  else:
745
    raise errors.ParameterError("reboot_type invalid")
746

    
747

    
748
  return True
749

    
750

    
751
def MigrateInstance(instance, target, live):
752
  """Migrates an instance to another node.
753

754
  """
755
  hyper = hypervisor.GetHypervisor()
756

    
757
  try:
758
    hyper.MigrateInstance(instance, target, live)
759
  except errors.HypervisorError, err:
760
    msg = "Failed to migrate instance: %s" % str(err)
761
    logging.error(msg)
762
    return (False, msg)
763
  return (True, "Migration successfull")
764

    
765

    
766
def CreateBlockDevice(disk, size, owner, on_primary, info):
767
  """Creates a block device for an instance.
768

769
  Args:
770
   disk: a ganeti.objects.Disk object
771
   size: the size of the physical underlying device
772
   owner: a string with the name of the instance
773
   on_primary: a boolean indicating if it is the primary node or not
774
   info: string that will be sent to the physical device creation
775

776
  Returns:
777
    the new unique_id of the device (this can sometime be
778
    computed only after creation), or None. On secondary nodes,
779
    it's not required to return anything.
780

781
  """
782
  clist = []
783
  if disk.children:
784
    for child in disk.children:
785
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
786
      if on_primary or disk.AssembleOnSecondary():
787
        # we need the children open in case the device itself has to
788
        # be assembled
789
        crdev.Open()
790
      clist.append(crdev)
791
  try:
792
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
793
    if device is not None:
794
      logging.info("removing existing device %s", disk)
795
      device.Remove()
796
  except errors.BlockDeviceError, err:
797
    pass
798

    
799
  device = bdev.Create(disk.dev_type, disk.physical_id,
800
                       clist, size)
801
  if device is None:
802
    raise ValueError("Can't create child device for %s, %s" %
803
                     (disk, size))
804
  if on_primary or disk.AssembleOnSecondary():
805
    if not device.Assemble():
806
      errorstring = "Can't assemble device after creation"
807
      logging.error(errorstring)
808
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
809
                                    " daemon logs" % errorstring)
810
    device.SetSyncSpeed(constants.SYNC_SPEED)
811
    if on_primary or disk.OpenOnSecondary():
812
      device.Open(force=True)
813
    DevCacheManager.UpdateCache(device.dev_path, owner,
814
                                on_primary, disk.iv_name)
815

    
816
  device.SetInfo(info)
817

    
818
  physical_id = device.unique_id
819
  return physical_id
820

    
821

    
822
def RemoveBlockDevice(disk):
823
  """Remove a block device.
824

825
  This is intended to be called recursively.
826

827
  """
828
  try:
829
    # since we are removing the device, allow a partial match
830
    # this allows removal of broken mirrors
831
    rdev = _RecursiveFindBD(disk, allow_partial=True)
832
  except errors.BlockDeviceError, err:
833
    # probably can't attach
834
    logging.info("Can't attach to device %s in remove", disk)
835
    rdev = None
836
  if rdev is not None:
837
    r_path = rdev.dev_path
838
    result = rdev.Remove()
839
    if result:
840
      DevCacheManager.RemoveCache(r_path)
841
  else:
842
    result = True
843
  if disk.children:
844
    for child in disk.children:
845
      result = result and RemoveBlockDevice(child)
846
  return result
847

    
848

    
849
def _RecursiveAssembleBD(disk, owner, as_primary):
850
  """Activate a block device for an instance.
851

852
  This is run on the primary and secondary nodes for an instance.
853

854
  This function is called recursively.
855

856
  Args:
857
    disk: a objects.Disk object
858
    as_primary: if we should make the block device read/write
859

860
  Returns:
861
    the assembled device or None (in case no device was assembled)
862

863
  If the assembly is not successful, an exception is raised.
864

865
  """
866
  children = []
867
  if disk.children:
868
    mcn = disk.ChildrenNeeded()
869
    if mcn == -1:
870
      mcn = 0 # max number of Nones allowed
871
    else:
872
      mcn = len(disk.children) - mcn # max number of Nones
873
    for chld_disk in disk.children:
874
      try:
875
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
876
      except errors.BlockDeviceError, err:
877
        if children.count(None) >= mcn:
878
          raise
879
        cdev = None
880
        logging.debug("Error in child activation: %s", str(err))
881
      children.append(cdev)
882

    
883
  if as_primary or disk.AssembleOnSecondary():
884
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
885
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
886
    result = r_dev
887
    if as_primary or disk.OpenOnSecondary():
888
      r_dev.Open()
889
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
890
                                as_primary, disk.iv_name)
891

    
892
  else:
893
    result = True
894
  return result
895

    
896

    
897
def AssembleBlockDevice(disk, owner, as_primary):
898
  """Activate a block device for an instance.
899

900
  This is a wrapper over _RecursiveAssembleBD.
901

902
  Returns:
903
    a /dev path for primary nodes
904
    True for secondary nodes
905

906
  """
907
  result = _RecursiveAssembleBD(disk, owner, as_primary)
908
  if isinstance(result, bdev.BlockDev):
909
    result = result.dev_path
910
  return result
911

    
912

    
913
def ShutdownBlockDevice(disk):
914
  """Shut down a block device.
915

916
  First, if the device is assembled (can `Attach()`), then the device
917
  is shutdown. Then the children of the device are shutdown.
918

919
  This function is called recursively. Note that we don't cache the
920
  children or such, as oppossed to assemble, shutdown of different
921
  devices doesn't require that the upper device was active.
922

923
  """
924
  r_dev = _RecursiveFindBD(disk)
925
  if r_dev is not None:
926
    r_path = r_dev.dev_path
927
    result = r_dev.Shutdown()
928
    if result:
929
      DevCacheManager.RemoveCache(r_path)
930
  else:
931
    result = True
932
  if disk.children:
933
    for child in disk.children:
934
      result = result and ShutdownBlockDevice(child)
935
  return result
936

    
937

    
938
def MirrorAddChildren(parent_cdev, new_cdevs):
939
  """Extend a mirrored block device.
940

941
  """
942
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
943
  if parent_bdev is None:
944
    logging.error("Can't find parent device")
945
    return False
946
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
947
  if new_bdevs.count(None) > 0:
948
    logging.error("Can't find new device(s) to add: %s:%s",
949
                  new_bdevs, new_cdevs)
950
    return False
951
  parent_bdev.AddChildren(new_bdevs)
952
  return True
953

    
954

    
955
def MirrorRemoveChildren(parent_cdev, new_cdevs):
956
  """Shrink a mirrored block device.
957

958
  """
959
  parent_bdev = _RecursiveFindBD(parent_cdev)
960
  if parent_bdev is None:
961
    logging.error("Can't find parent in remove children: %s", parent_cdev)
962
    return False
963
  devs = []
964
  for disk in new_cdevs:
965
    rpath = disk.StaticDevPath()
966
    if rpath is None:
967
      bd = _RecursiveFindBD(disk)
968
      if bd is None:
969
        logging.error("Can't find dynamic device %s while removing children",
970
                      disk)
971
        return False
972
      else:
973
        devs.append(bd.dev_path)
974
    else:
975
      devs.append(rpath)
976
  parent_bdev.RemoveChildren(devs)
977
  return True
978

    
979

    
980
def GetMirrorStatus(disks):
981
  """Get the mirroring status of a list of devices.
982

983
  Args:
984
    disks: list of `objects.Disk`
985

986
  Returns:
987
    list of (mirror_done, estimated_time) tuples, which
988
    are the result of bdev.BlockDevice.CombinedSyncStatus()
989

990
  """
991
  stats = []
992
  for dsk in disks:
993
    rbd = _RecursiveFindBD(dsk)
994
    if rbd is None:
995
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
996
    stats.append(rbd.CombinedSyncStatus())
997
  return stats
998

    
999

    
1000
def _RecursiveFindBD(disk, allow_partial=False):
1001
  """Check if a device is activated.
1002

1003
  If so, return informations about the real device.
1004

1005
  Args:
1006
    disk: the objects.Disk instance
1007
    allow_partial: don't abort the find if a child of the
1008
                   device can't be found; this is intended to be
1009
                   used when repairing mirrors
1010

1011
  Returns:
1012
    None if the device can't be found
1013
    otherwise the device instance
1014

1015
  """
1016
  children = []
1017
  if disk.children:
1018
    for chdisk in disk.children:
1019
      children.append(_RecursiveFindBD(chdisk))
1020

    
1021
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1022

    
1023

    
1024
def FindBlockDevice(disk):
1025
  """Check if a device is activated.
1026

1027
  If so, return informations about the real device.
1028

1029
  Args:
1030
    disk: the objects.Disk instance
1031
  Returns:
1032
    None if the device can't be found
1033
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1034

1035
  """
1036
  rbd = _RecursiveFindBD(disk)
1037
  if rbd is None:
1038
    return rbd
1039
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1040

    
1041

    
1042
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1043
  """Write a file to the filesystem.
1044

1045
  This allows the master to overwrite(!) a file. It will only perform
1046
  the operation if the file belongs to a list of configuration files.
1047

1048
  """
1049
  if not os.path.isabs(file_name):
1050
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1051
                  file_name)
1052
    return False
1053

    
1054
  allowed_files = [
1055
    constants.CLUSTER_CONF_FILE,
1056
    constants.ETC_HOSTS,
1057
    constants.SSH_KNOWN_HOSTS_FILE,
1058
    constants.VNC_PASSWORD_FILE,
1059
    ]
1060
  allowed_files.extend(ssconf.SimpleStore().GetFileList())
1061

    
1062
  if file_name not in allowed_files:
1063
    logging.error("Filename passed to UploadFile not in allowed"
1064
                 " upload targets: '%s'", file_name)
1065
    return False
1066

    
1067
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1068
                  atime=atime, mtime=mtime)
1069
  return True
1070

    
1071

    
1072
def _ErrnoOrStr(err):
1073
  """Format an EnvironmentError exception.
1074

1075
  If the `err` argument has an errno attribute, it will be looked up
1076
  and converted into a textual EXXXX description. Otherwise the string
1077
  representation of the error will be returned.
1078

1079
  """
1080
  if hasattr(err, 'errno'):
1081
    detail = errno.errorcode[err.errno]
1082
  else:
1083
    detail = str(err)
1084
  return detail
1085

    
1086

    
1087
def _OSOndiskVersion(name, os_dir):
1088
  """Compute and return the API version of a given OS.
1089

1090
  This function will try to read the API version of the os given by
1091
  the 'name' parameter and residing in the 'os_dir' directory.
1092

1093
  Return value will be either an integer denoting the version or None in the
1094
  case when this is not a valid OS name.
1095

1096
  """
1097
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1098

    
1099
  try:
1100
    st = os.stat(api_file)
1101
  except EnvironmentError, err:
1102
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1103
                           " found (%s)" % _ErrnoOrStr(err))
1104

    
1105
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1106
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1107
                           " a regular file")
1108

    
1109
  try:
1110
    f = open(api_file)
1111
    try:
1112
      api_version = f.read(256)
1113
    finally:
1114
      f.close()
1115
  except EnvironmentError, err:
1116
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1117
                           " API version (%s)" % _ErrnoOrStr(err))
1118

    
1119
  api_version = api_version.strip()
1120
  try:
1121
    api_version = int(api_version)
1122
  except (TypeError, ValueError), err:
1123
    raise errors.InvalidOS(name, os_dir,
1124
                           "API version is not integer (%s)" % str(err))
1125

    
1126
  return api_version
1127

    
1128

    
1129
def DiagnoseOS(top_dirs=None):
1130
  """Compute the validity for all OSes.
1131

1132
  Returns an OS object for each name in all the given top directories
1133
  (if not given defaults to constants.OS_SEARCH_PATH)
1134

1135
  Returns:
1136
    list of OS objects
1137

1138
  """
1139
  if top_dirs is None:
1140
    top_dirs = constants.OS_SEARCH_PATH
1141

    
1142
  result = []
1143
  for dir_name in top_dirs:
1144
    if os.path.isdir(dir_name):
1145
      try:
1146
        f_names = utils.ListVisibleFiles(dir_name)
1147
      except EnvironmentError, err:
1148
        logging.exception("Can't list the OS directory %s", dir_name)
1149
        break
1150
      for name in f_names:
1151
        try:
1152
          os_inst = OSFromDisk(name, base_dir=dir_name)
1153
          result.append(os_inst)
1154
        except errors.InvalidOS, err:
1155
          result.append(objects.OS.FromInvalidOS(err))
1156

    
1157
  return result
1158

    
1159

    
1160
def OSFromDisk(name, base_dir=None):
1161
  """Create an OS instance from disk.
1162

1163
  This function will return an OS instance if the given name is a
1164
  valid OS name. Otherwise, it will raise an appropriate
1165
  `errors.InvalidOS` exception, detailing why this is not a valid
1166
  OS.
1167

1168
  Args:
1169
    os_dir: Directory containing the OS scripts. Defaults to a search
1170
            in all the OS_SEARCH_PATH directories.
1171

1172
  """
1173

    
1174
  if base_dir is None:
1175
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1176
    if os_dir is None:
1177
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1178
  else:
1179
    os_dir = os.path.sep.join([base_dir, name])
1180

    
1181
  api_version = _OSOndiskVersion(name, os_dir)
1182

    
1183
  if api_version != constants.OS_API_VERSION:
1184
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1185
                           " (found %s want %s)"
1186
                           % (api_version, constants.OS_API_VERSION))
1187

    
1188
  # OS Scripts dictionary, we will populate it with the actual script names
1189
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1190

    
1191
  for script in os_scripts:
1192
    os_scripts[script] = os.path.sep.join([os_dir, script])
1193

    
1194
    try:
1195
      st = os.stat(os_scripts[script])
1196
    except EnvironmentError, err:
1197
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1198
                             (script, _ErrnoOrStr(err)))
1199

    
1200
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1201
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1202
                             script)
1203

    
1204
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1205
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1206
                             script)
1207

    
1208

    
1209
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1210
                    create_script=os_scripts['create'],
1211
                    export_script=os_scripts['export'],
1212
                    import_script=os_scripts['import'],
1213
                    rename_script=os_scripts['rename'],
1214
                    api_version=api_version)
1215

    
1216

    
1217
def GrowBlockDevice(disk, amount):
1218
  """Grow a stack of block devices.
1219

1220
  This function is called recursively, with the childrens being the
1221
  first one resize.
1222

1223
  Args:
1224
    disk: the disk to be grown
1225

1226
  Returns: a tuple of (status, result), with:
1227
    status: the result (true/false) of the operation
1228
    result: the error message if the operation failed, otherwise not used
1229

1230
  """
1231
  r_dev = _RecursiveFindBD(disk)
1232
  if r_dev is None:
1233
    return False, "Cannot find block device %s" % (disk,)
1234

    
1235
  try:
1236
    r_dev.Grow(amount)
1237
  except errors.BlockDeviceError, err:
1238
    return False, str(err)
1239

    
1240
  return True, None
1241

    
1242

    
1243
def SnapshotBlockDevice(disk):
1244
  """Create a snapshot copy of a block device.
1245

1246
  This function is called recursively, and the snapshot is actually created
1247
  just for the leaf lvm backend device.
1248

1249
  Args:
1250
    disk: the disk to be snapshotted
1251

1252
  Returns:
1253
    a config entry for the actual lvm device snapshotted.
1254

1255
  """
1256
  if disk.children:
1257
    if len(disk.children) == 1:
1258
      # only one child, let's recurse on it
1259
      return SnapshotBlockDevice(disk.children[0])
1260
    else:
1261
      # more than one child, choose one that matches
1262
      for child in disk.children:
1263
        if child.size == disk.size:
1264
          # return implies breaking the loop
1265
          return SnapshotBlockDevice(child)
1266
  elif disk.dev_type == constants.LD_LV:
1267
    r_dev = _RecursiveFindBD(disk)
1268
    if r_dev is not None:
1269
      # let's stay on the safe side and ask for the full size, for now
1270
      return r_dev.Snapshot(disk.size)
1271
    else:
1272
      return None
1273
  else:
1274
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1275
                                 " '%s' of type '%s'" %
1276
                                 (disk.unique_id, disk.dev_type))
1277

    
1278

    
1279
def ExportSnapshot(disk, dest_node, instance):
1280
  """Export a block device snapshot to a remote node.
1281

1282
  Args:
1283
    disk: the snapshot block device
1284
    dest_node: the node to send the image to
1285
    instance: instance being exported
1286

1287
  Returns:
1288
    True if successful, False otherwise.
1289

1290
  """
1291
  inst_os = OSFromDisk(instance.os)
1292
  export_script = inst_os.export_script
1293

    
1294
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1295
                                     instance.name, int(time.time()))
1296
  if not os.path.exists(constants.LOG_OS_DIR):
1297
    os.mkdir(constants.LOG_OS_DIR, 0750)
1298

    
1299
  real_os_dev = _RecursiveFindBD(disk)
1300
  if real_os_dev is None:
1301
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1302
                                  str(disk))
1303
  real_os_dev.Open()
1304

    
1305
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1306
  destfile = disk.physical_id[1]
1307

    
1308
  # the target command is built out of three individual commands,
1309
  # which are joined by pipes; we check each individual command for
1310
  # valid parameters
1311

    
1312
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1313
                               export_script, instance.name,
1314
                               real_os_dev.dev_path, logfile)
1315

    
1316
  comprcmd = "gzip"
1317

    
1318
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1319
                                destdir, destdir, destfile)
1320
  remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1321
                                       destcmd)
1322

    
1323
  # all commands have been checked, so we're safe to combine them
1324
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1325

    
1326
  result = utils.RunCmd(command)
1327

    
1328
  if result.failed:
1329
    logging.error("os snapshot export command '%s' returned error: %s"
1330
                  " output: %s", command, result.fail_reason, result.output)
1331
    return False
1332

    
1333
  return True
1334

    
1335

    
1336
def FinalizeExport(instance, snap_disks):
1337
  """Write out the export configuration information.
1338

1339
  Args:
1340
    instance: instance configuration
1341
    snap_disks: snapshot block devices
1342

1343
  Returns:
1344
    False in case of error, True otherwise.
1345

1346
  """
1347
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1348
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1349

    
1350
  config = objects.SerializableConfigParser()
1351

    
1352
  config.add_section(constants.INISECT_EXP)
1353
  config.set(constants.INISECT_EXP, 'version', '0')
1354
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1355
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1356
  config.set(constants.INISECT_EXP, 'os', instance.os)
1357
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1358

    
1359
  config.add_section(constants.INISECT_INS)
1360
  config.set(constants.INISECT_INS, 'name', instance.name)
1361
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1362
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1363
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1364

    
1365
  nic_count = 0
1366
  for nic_count, nic in enumerate(instance.nics):
1367
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1368
               nic_count, '%s' % nic.mac)
1369
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1370
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1371
               '%s' % nic.bridge)
1372
  # TODO: redundant: on load can read nics until it doesn't exist
1373
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1374

    
1375
  disk_count = 0
1376
  for disk_count, disk in enumerate(snap_disks):
1377
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1378
               ('%s' % disk.iv_name))
1379
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1380
               ('%s' % disk.physical_id[1]))
1381
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1382
               ('%d' % disk.size))
1383
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1384

    
1385
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1386
  cfo = open(cff, 'w')
1387
  try:
1388
    config.write(cfo)
1389
  finally:
1390
    cfo.close()
1391

    
1392
  shutil.rmtree(finaldestdir, True)
1393
  shutil.move(destdir, finaldestdir)
1394

    
1395
  return True
1396

    
1397

    
1398
def ExportInfo(dest):
1399
  """Get export configuration information.
1400

1401
  Args:
1402
    dest: directory containing the export
1403

1404
  Returns:
1405
    A serializable config file containing the export info.
1406

1407
  """
1408
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1409

    
1410
  config = objects.SerializableConfigParser()
1411
  config.read(cff)
1412

    
1413
  if (not config.has_section(constants.INISECT_EXP) or
1414
      not config.has_section(constants.INISECT_INS)):
1415
    return None
1416

    
1417
  return config
1418

    
1419

    
1420
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1421
  """Import an os image into an instance.
1422

1423
  Args:
1424
    instance: the instance object
1425
    os_disk: the instance-visible name of the os device
1426
    swap_disk: the instance-visible name of the swap device
1427
    src_node: node holding the source image
1428
    src_image: path to the source image on src_node
1429

1430
  Returns:
1431
    False in case of error, True otherwise.
1432

1433
  """
1434
  inst_os = OSFromDisk(instance.os)
1435
  import_script = inst_os.import_script
1436

    
1437
  os_device = instance.FindDisk(os_disk)
1438
  if os_device is None:
1439
    logging.error("Can't find this device-visible name '%s'", os_disk)
1440
    return False
1441

    
1442
  swap_device = instance.FindDisk(swap_disk)
1443
  if swap_device is None:
1444
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1445
    return False
1446

    
1447
  real_os_dev = _RecursiveFindBD(os_device)
1448
  if real_os_dev is None:
1449
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1450
                                  str(os_device))
1451
  real_os_dev.Open()
1452

    
1453
  real_swap_dev = _RecursiveFindBD(swap_device)
1454
  if real_swap_dev is None:
1455
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1456
                                  str(swap_device))
1457
  real_swap_dev.Open()
1458

    
1459
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1460
                                        instance.name, int(time.time()))
1461
  if not os.path.exists(constants.LOG_OS_DIR):
1462
    os.mkdir(constants.LOG_OS_DIR, 0750)
1463

    
1464
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1465
  remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1466
                                       destcmd)
1467

    
1468
  comprcmd = "gunzip"
1469
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1470
                               inst_os.path, import_script, instance.name,
1471
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1472
                               logfile)
1473

    
1474
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1475

    
1476
  result = utils.RunCmd(command)
1477

    
1478
  if result.failed:
1479
    logging.error("os import command '%s' returned error: %s"
1480
                  " output: %s", command, result.fail_reason, result.output)
1481
    return False
1482

    
1483
  return True
1484

    
1485

    
1486
def ListExports():
1487
  """Return a list of exports currently available on this machine.
1488

1489
  """
1490
  if os.path.isdir(constants.EXPORT_DIR):
1491
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1492
  else:
1493
    return []
1494

    
1495

    
1496
def RemoveExport(export):
1497
  """Remove an existing export from the node.
1498

1499
  Args:
1500
    export: the name of the export to remove
1501

1502
  Returns:
1503
    False in case of error, True otherwise.
1504

1505
  """
1506
  target = os.path.join(constants.EXPORT_DIR, export)
1507

    
1508
  shutil.rmtree(target)
1509
  # TODO: catch some of the relevant exceptions and provide a pretty
1510
  # error message if rmtree fails.
1511

    
1512
  return True
1513

    
1514

    
1515
def RenameBlockDevices(devlist):
1516
  """Rename a list of block devices.
1517

1518
  The devlist argument is a list of tuples (disk, new_logical,
1519
  new_physical). The return value will be a combined boolean result
1520
  (True only if all renames succeeded).
1521

1522
  """
1523
  result = True
1524
  for disk, unique_id in devlist:
1525
    dev = _RecursiveFindBD(disk)
1526
    if dev is None:
1527
      result = False
1528
      continue
1529
    try:
1530
      old_rpath = dev.dev_path
1531
      dev.Rename(unique_id)
1532
      new_rpath = dev.dev_path
1533
      if old_rpath != new_rpath:
1534
        DevCacheManager.RemoveCache(old_rpath)
1535
        # FIXME: we should add the new cache information here, like:
1536
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1537
        # but we don't have the owner here - maybe parse from existing
1538
        # cache? for now, we only lose lvm data when we rename, which
1539
        # is less critical than DRBD or MD
1540
    except errors.BlockDeviceError, err:
1541
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1542
      result = False
1543
  return result
1544

    
1545

    
1546
def _TransformFileStorageDir(file_storage_dir):
1547
  """Checks whether given file_storage_dir is valid.
1548

1549
  Checks wheter the given file_storage_dir is within the cluster-wide
1550
  default file_storage_dir stored in SimpleStore. Only paths under that
1551
  directory are allowed.
1552

1553
  Args:
1554
    file_storage_dir: string with path
1555

1556
  Returns:
1557
    normalized file_storage_dir (string) if valid, None otherwise
1558

1559
  """
1560
  file_storage_dir = os.path.normpath(file_storage_dir)
1561
  base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1562
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1563
      base_file_storage_dir):
1564
    logging.error("file storage directory '%s' is not under base file"
1565
                  " storage directory '%s'",
1566
                  file_storage_dir, base_file_storage_dir)
1567
    return None
1568
  return file_storage_dir
1569

    
1570

    
1571
def CreateFileStorageDir(file_storage_dir):
1572
  """Create file storage directory.
1573

1574
  Args:
1575
    file_storage_dir: string containing the path
1576

1577
  Returns:
1578
    tuple with first element a boolean indicating wheter dir
1579
    creation was successful or not
1580

1581
  """
1582
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1583
  result = True,
1584
  if not file_storage_dir:
1585
    result = False,
1586
  else:
1587
    if os.path.exists(file_storage_dir):
1588
      if not os.path.isdir(file_storage_dir):
1589
        logging.error("'%s' is not a directory", file_storage_dir)
1590
        result = False,
1591
    else:
1592
      try:
1593
        os.makedirs(file_storage_dir, 0750)
1594
      except OSError, err:
1595
        logging.error("Cannot create file storage directory '%s': %s",
1596
                      file_storage_dir, err)
1597
        result = False,
1598
  return result
1599

    
1600

    
1601
def RemoveFileStorageDir(file_storage_dir):
1602
  """Remove file storage directory.
1603

1604
  Remove it only if it's empty. If not log an error and return.
1605

1606
  Args:
1607
    file_storage_dir: string containing the path
1608

1609
  Returns:
1610
    tuple with first element a boolean indicating wheter dir
1611
    removal was successful or not
1612

1613
  """
1614
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1615
  result = True,
1616
  if not file_storage_dir:
1617
    result = False,
1618
  else:
1619
    if os.path.exists(file_storage_dir):
1620
      if not os.path.isdir(file_storage_dir):
1621
        logging.error("'%s' is not a directory", file_storage_dir)
1622
        result = False,
1623
      # deletes dir only if empty, otherwise we want to return False
1624
      try:
1625
        os.rmdir(file_storage_dir)
1626
      except OSError, err:
1627
        logging.exception("Cannot remove file storage directory '%s'",
1628
                          file_storage_dir)
1629
        result = False,
1630
  return result
1631

    
1632

    
1633
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1634
  """Rename the file storage directory.
1635

1636
  Args:
1637
    old_file_storage_dir: string containing the old path
1638
    new_file_storage_dir: string containing the new path
1639

1640
  Returns:
1641
    tuple with first element a boolean indicating wheter dir
1642
    rename was successful or not
1643

1644
  """
1645
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1646
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1647
  result = True,
1648
  if not old_file_storage_dir or not new_file_storage_dir:
1649
    result = False,
1650
  else:
1651
    if not os.path.exists(new_file_storage_dir):
1652
      if os.path.isdir(old_file_storage_dir):
1653
        try:
1654
          os.rename(old_file_storage_dir, new_file_storage_dir)
1655
        except OSError, err:
1656
          logging.exception("Cannot rename '%s' to '%s'",
1657
                            old_file_storage_dir, new_file_storage_dir)
1658
          result =  False,
1659
      else:
1660
        logging.error("'%s' is not a directory", old_file_storage_dir)
1661
        result = False,
1662
    else:
1663
      if os.path.exists(old_file_storage_dir):
1664
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1665
                      old_file_storage_dir, new_file_storage_dir)
1666
        result = False,
1667
  return result
1668

    
1669

    
1670
def _IsJobQueueFile(file_name):
1671
  """Checks whether the given filename is in the queue directory.
1672

1673
  """
1674
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1675
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1676

    
1677
  if not result:
1678
    logging.error("'%s' is not a file in the queue directory",
1679
                  file_name)
1680

    
1681
  return result
1682

    
1683

    
1684
def JobQueueUpdate(file_name, content):
1685
  """Updates a file in the queue directory.
1686

1687
  """
1688
  if not _IsJobQueueFile(file_name):
1689
    return False
1690

    
1691
  # Write and replace the file atomically
1692
  utils.WriteFile(file_name, data=content)
1693

    
1694
  return True
1695

    
1696

    
1697
def JobQueuePurge():
1698
  """Removes job queue files and archived jobs
1699

1700
  """
1701
  _CleanDirectory(constants.QUEUE_DIR)
1702
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
1703

    
1704

    
1705
def JobQueueRename(old, new):
1706
  """Renames a job queue file.
1707

1708
  """
1709
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1710
    return False
1711

    
1712
  os.rename(old, new)
1713

    
1714
  return True
1715

    
1716

    
1717
def CloseBlockDevices(disks):
1718
  """Closes the given block devices.
1719

1720
  This means they will be switched to secondary mode (in case of DRBD).
1721

1722
  """
1723
  bdevs = []
1724
  for cf in disks:
1725
    rd = _RecursiveFindBD(cf)
1726
    if rd is None:
1727
      return (False, "Can't find device %s" % cf)
1728
    bdevs.append(rd)
1729

    
1730
  msg = []
1731
  for rd in bdevs:
1732
    try:
1733
      rd.Close()
1734
    except errors.BlockDeviceError, err:
1735
      msg.append(str(err))
1736
  if msg:
1737
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1738
  else:
1739
    return (True, "All devices secondary")
1740

    
1741

    
1742
class HooksRunner(object):
1743
  """Hook runner.
1744

1745
  This class is instantiated on the node side (ganeti-noded) and not on
1746
  the master side.
1747

1748
  """
1749
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1750

    
1751
  def __init__(self, hooks_base_dir=None):
1752
    """Constructor for hooks runner.
1753

1754
    Args:
1755
      - hooks_base_dir: if not None, this overrides the
1756
        constants.HOOKS_BASE_DIR (useful for unittests)
1757

1758
    """
1759
    if hooks_base_dir is None:
1760
      hooks_base_dir = constants.HOOKS_BASE_DIR
1761
    self._BASE_DIR = hooks_base_dir
1762

    
1763
  @staticmethod
1764
  def ExecHook(script, env):
1765
    """Exec one hook script.
1766

1767
    Args:
1768
     - script: the full path to the script
1769
     - env: the environment with which to exec the script
1770

1771
    """
1772
    # exec the process using subprocess and log the output
1773
    fdstdin = None
1774
    try:
1775
      fdstdin = open("/dev/null", "r")
1776
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1777
                               stderr=subprocess.STDOUT, close_fds=True,
1778
                               shell=False, cwd="/", env=env)
1779
      output = ""
1780
      try:
1781
        output = child.stdout.read(4096)
1782
        child.stdout.close()
1783
      except EnvironmentError, err:
1784
        output += "Hook script error: %s" % str(err)
1785

    
1786
      while True:
1787
        try:
1788
          result = child.wait()
1789
          break
1790
        except EnvironmentError, err:
1791
          if err.errno == errno.EINTR:
1792
            continue
1793
          raise
1794
    finally:
1795
      # try not to leak fds
1796
      for fd in (fdstdin, ):
1797
        if fd is not None:
1798
          try:
1799
            fd.close()
1800
          except EnvironmentError, err:
1801
            # just log the error
1802
            #logging.exception("Error while closing fd %s", fd)
1803
            pass
1804

    
1805
    return result == 0, output
1806

    
1807
  def RunHooks(self, hpath, phase, env):
1808
    """Run the scripts in the hooks directory.
1809

1810
    This method will not be usually overriden by child opcodes.
1811

1812
    """
1813
    if phase == constants.HOOKS_PHASE_PRE:
1814
      suffix = "pre"
1815
    elif phase == constants.HOOKS_PHASE_POST:
1816
      suffix = "post"
1817
    else:
1818
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1819
    rr = []
1820

    
1821
    subdir = "%s-%s.d" % (hpath, suffix)
1822
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1823
    try:
1824
      dir_contents = utils.ListVisibleFiles(dir_name)
1825
    except OSError, err:
1826
      # must log
1827
      return rr
1828

    
1829
    # we use the standard python sort order,
1830
    # so 00name is the recommended naming scheme
1831
    dir_contents.sort()
1832
    for relname in dir_contents:
1833
      fname = os.path.join(dir_name, relname)
1834
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1835
          self.RE_MASK.match(relname) is not None):
1836
        rrval = constants.HKR_SKIP
1837
        output = ""
1838
      else:
1839
        result, output = self.ExecHook(fname, env)
1840
        if not result:
1841
          rrval = constants.HKR_FAIL
1842
        else:
1843
          rrval = constants.HKR_SUCCESS
1844
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1845

    
1846
    return rr
1847

    
1848

    
1849
class IAllocatorRunner(object):
1850
  """IAllocator runner.
1851

1852
  This class is instantiated on the node side (ganeti-noded) and not on
1853
  the master side.
1854

1855
  """
1856
  def Run(self, name, idata):
1857
    """Run an iallocator script.
1858

1859
    Return value: tuple of:
1860
       - run status (one of the IARUN_ constants)
1861
       - stdout
1862
       - stderr
1863
       - fail reason (as from utils.RunResult)
1864

1865
    """
1866
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1867
                                  os.path.isfile)
1868
    if alloc_script is None:
1869
      return (constants.IARUN_NOTFOUND, None, None, None)
1870

    
1871
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1872
    try:
1873
      os.write(fd, idata)
1874
      os.close(fd)
1875
      result = utils.RunCmd([alloc_script, fin_name])
1876
      if result.failed:
1877
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1878
                result.fail_reason)
1879
    finally:
1880
      os.unlink(fin_name)
1881

    
1882
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1883

    
1884

    
1885
class DevCacheManager(object):
1886
  """Simple class for managing a cache of block device information.
1887

1888
  """
1889
  _DEV_PREFIX = "/dev/"
1890
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1891

    
1892
  @classmethod
1893
  def _ConvertPath(cls, dev_path):
1894
    """Converts a /dev/name path to the cache file name.
1895

1896
    This replaces slashes with underscores and strips the /dev
1897
    prefix. It then returns the full path to the cache file
1898

1899
    """
1900
    if dev_path.startswith(cls._DEV_PREFIX):
1901
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1902
    dev_path = dev_path.replace("/", "_")
1903
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1904
    return fpath
1905

    
1906
  @classmethod
1907
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1908
    """Updates the cache information for a given device.
1909

1910
    """
1911
    if dev_path is None:
1912
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1913
      return
1914
    fpath = cls._ConvertPath(dev_path)
1915
    if on_primary:
1916
      state = "primary"
1917
    else:
1918
      state = "secondary"
1919
    if iv_name is None:
1920
      iv_name = "not_visible"
1921
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1922
    try:
1923
      utils.WriteFile(fpath, data=fdata)
1924
    except EnvironmentError, err:
1925
      logging.exception("Can't update bdev cache for %s", dev_path)
1926

    
1927
  @classmethod
1928
  def RemoveCache(cls, dev_path):
1929
    """Remove data for a dev_path.
1930

1931
    """
1932
    if dev_path is None:
1933
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
1934
      return
1935
    fpath = cls._ConvertPath(dev_path)
1936
    try:
1937
      utils.RemoveFile(fpath)
1938
    except EnvironmentError, err:
1939
      logging.exception("Can't update bdev cache for %s", dev_path)