Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ a1578d63

History | View | Annotate | Download (55.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetSshRunner():
48
  return ssh.SshRunner()
49

    
50

    
51
def _CleanDirectory(path, exclude=[]):
52
  """Removes all regular files in a directory.
53

54
  @param exclude: List of files to be excluded.
55
  @type exclude: list
56

57
  """
58
  if not os.path.isdir(path):
59
    return
60

    
61
  # Normalize excluded paths
62
  exclude = [os.path.normpath(i) for i in exclude]
63

    
64
  for rel_name in utils.ListVisibleFiles(path):
65
    full_name = os.path.normpath(os.path.join(path, rel_name))
66
    if full_name in exclude:
67
      continue
68
    if os.path.isfile(full_name) and not os.path.islink(full_name):
69
      utils.RemoveFile(full_name)
70

    
71

    
72
def JobQueuePurge():
73
  """Removes job queue files and archived jobs
74

75
  """
76
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
77
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
78

    
79

    
80
def GetMasterInfo():
81
  """Returns master information.
82

83
  This is an utility function to compute master information, either
84
  for consumption here or from the node daemon.
85

86
  @rtype: tuple
87
  @return: (master_netdev, master_ip, master_name)
88

89
  """
90
  try:
91
    ss = ssconf.SimpleStore()
92
    master_netdev = ss.GetMasterNetdev()
93
    master_ip = ss.GetMasterIP()
94
    master_node = ss.GetMasterNode()
95
  except errors.ConfigurationError, err:
96
    logging.exception("Cluster configuration incomplete")
97
    return (None, None)
98
  return (master_netdev, master_ip, master_node)
99

    
100

    
101
def StartMaster(start_daemons):
102
  """Activate local node as master node.
103

104
  The function will always try activate the IP address of the master
105
  (if someone else has it, then it won't). Then, if the start_daemons
106
  parameter is True, it will also start the master daemons
107
  (ganet-masterd and ganeti-rapi).
108

109
  """
110
  ok = True
111
  master_netdev, master_ip, _ = GetMasterInfo()
112
  if not master_netdev:
113
    return False
114

    
115
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
116
    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
117
                     source=constants.LOCALHOST_IP_ADDRESS):
118
      # we already have the ip:
119
      logging.debug("Already started")
120
    else:
121
      logging.error("Someone else has the master ip, not activating")
122
      ok = False
123
  else:
124
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
125
                           "dev", master_netdev, "label",
126
                           "%s:0" % master_netdev])
127
    if result.failed:
128
      logging.error("Can't activate master IP: %s", result.output)
129
      ok = False
130

    
131
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
132
                           "-s", master_ip, master_ip])
133
    # we'll ignore the exit code of arping
134

    
135
  # and now start the master and rapi daemons
136
  if start_daemons:
137
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
138
      result = utils.RunCmd([daemon])
139
      if result.failed:
140
        logging.error("Can't start daemon %s: %s", daemon, result.output)
141
        ok = False
142
  return ok
143

    
144

    
145
def StopMaster(stop_daemons):
146
  """Deactivate this node as master.
147

148
  The function will always try to deactivate the IP address of the
149
  master. Then, if the stop_daemons parameter is True, it will also
150
  stop the master daemons (ganet-masterd and ganeti-rapi).
151

152
  """
153
  master_netdev, master_ip, _ = GetMasterInfo()
154
  if not master_netdev:
155
    return False
156

    
157
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
158
                         "dev", master_netdev])
159
  if result.failed:
160
    logging.error("Can't remove the master IP, error: %s", result.output)
161
    # but otherwise ignore the failure
162

    
163
  if stop_daemons:
164
    # stop/kill the rapi and the master daemon
165
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
166
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
167

    
168
  return True
169

    
170

    
171
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
172
  """Joins this node to the cluster.
173

174
  This does the following:
175
      - updates the hostkeys of the machine (rsa and dsa)
176
      - adds the ssh private key to the user
177
      - adds the ssh public key to the users' authorized_keys file
178

179
  """
180
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
181
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
182
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
183
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
184
  for name, content, mode in sshd_keys:
185
    utils.WriteFile(name, data=content, mode=mode)
186

    
187
  try:
188
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
189
                                                    mkdir=True)
190
  except errors.OpExecError, err:
191
    logging.exception("Error while processing user ssh files")
192
    return False
193

    
194
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
195
    utils.WriteFile(name, data=content, mode=0600)
196

    
197
  utils.AddAuthorizedKey(auth_keys, sshpub)
198

    
199
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
200

    
201
  return True
202

    
203

    
204
def LeaveCluster():
205
  """Cleans up the current node and prepares it to be removed from the cluster.
206

207
  """
208
  _CleanDirectory(constants.DATA_DIR)
209
  JobQueuePurge()
210

    
211
  try:
212
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
213
  except errors.OpExecError:
214
    logging.exception("Error while processing ssh files")
215
    return
216

    
217
  f = open(pub_key, 'r')
218
  try:
219
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
220
  finally:
221
    f.close()
222

    
223
  utils.RemoveFile(priv_key)
224
  utils.RemoveFile(pub_key)
225

    
226
  # Return a reassuring string to the caller, and quit
227
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
228

    
229

    
230
def GetNodeInfo(vgname):
231
  """Gives back a hash with different informations about the node.
232

233
  Returns:
234
    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
235
      'memory_free' : xxx, 'memory_total' : xxx }
236
    where
237
    vg_size is the size of the configured volume group in MiB
238
    vg_free is the free size of the volume group in MiB
239
    memory_dom0 is the memory allocated for domain0 in MiB
240
    memory_free is the currently available (free) ram in MiB
241
    memory_total is the total number of ram in MiB
242

243
  """
244
  outputarray = {}
245
  vginfo = _GetVGInfo(vgname)
246
  outputarray['vg_size'] = vginfo['vg_size']
247
  outputarray['vg_free'] = vginfo['vg_free']
248

    
249
  hyper = hypervisor.GetHypervisor()
250
  hyp_info = hyper.GetNodeInfo()
251
  if hyp_info is not None:
252
    outputarray.update(hyp_info)
253

    
254
  f = open("/proc/sys/kernel/random/boot_id", 'r')
255
  try:
256
    outputarray["bootid"] = f.read(128).rstrip("\n")
257
  finally:
258
    f.close()
259

    
260
  return outputarray
261

    
262

    
263
def VerifyNode(what):
264
  """Verify the status of the local node.
265

266
  Args:
267
    what - a dictionary of things to check:
268
      'filelist' : list of files for which to compute checksums
269
      'nodelist' : list of nodes we should check communication with
270
      'hypervisor': run the hypervisor-specific verify
271

272
  Requested files on local node are checksummed and the result returned.
273

274
  The nodelist is traversed, with the following checks being made
275
  for each node:
276
  - known_hosts key correct
277
  - correct resolving of node name (target node returns its own hostname
278
    by ssh-execution of 'hostname', result compared against name in list.
279

280
  """
281
  result = {}
282

    
283
  if 'hypervisor' in what:
284
    result['hypervisor'] = hypervisor.GetHypervisor().Verify()
285

    
286
  if 'filelist' in what:
287
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
288

    
289
  if 'nodelist' in what:
290
    result['nodelist'] = {}
291
    random.shuffle(what['nodelist'])
292
    for node in what['nodelist']:
293
      success, message = _GetSshRunner().VerifyNodeHostname(node)
294
      if not success:
295
        result['nodelist'][node] = message
296
  if 'node-net-test' in what:
297
    result['node-net-test'] = {}
298
    my_name = utils.HostInfo().name
299
    my_pip = my_sip = None
300
    for name, pip, sip in what['node-net-test']:
301
      if name == my_name:
302
        my_pip = pip
303
        my_sip = sip
304
        break
305
    if not my_pip:
306
      result['node-net-test'][my_name] = ("Can't find my own"
307
                                          " primary/secondary IP"
308
                                          " in the node list")
309
    else:
310
      port = ssconf.SimpleStore().GetNodeDaemonPort()
311
      for name, pip, sip in what['node-net-test']:
312
        fail = []
313
        if not utils.TcpPing(pip, port, source=my_pip):
314
          fail.append("primary")
315
        if sip != pip:
316
          if not utils.TcpPing(sip, port, source=my_sip):
317
            fail.append("secondary")
318
        if fail:
319
          result['node-net-test'][name] = ("failure using the %s"
320
                                           " interface(s)" %
321
                                           " and ".join(fail))
322

    
323
  return result
324

    
325

    
326
def GetVolumeList(vg_name):
327
  """Compute list of logical volumes and their size.
328

329
  Returns:
330
    dictionary of all partions (key) with their size (in MiB), inactive
331
    and online status:
332
    {'test1': ('20.06', True, True)}
333

334
  """
335
  lvs = {}
336
  sep = '|'
337
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
338
                         "--separator=%s" % sep,
339
                         "-olv_name,lv_size,lv_attr", vg_name])
340
  if result.failed:
341
    logging.error("Failed to list logical volumes, lvs output: %s",
342
                  result.output)
343
    return result.output
344

    
345
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
346
  for line in result.stdout.splitlines():
347
    line = line.strip()
348
    match = valid_line_re.match(line)
349
    if not match:
350
      logging.error("Invalid line returned from lvs output: '%s'", line)
351
      continue
352
    name, size, attr = match.groups()
353
    inactive = attr[4] == '-'
354
    online = attr[5] == 'o'
355
    lvs[name] = (size, inactive, online)
356

    
357
  return lvs
358

    
359

    
360
def ListVolumeGroups():
361
  """List the volume groups and their size.
362

363
  Returns:
364
    Dictionary with keys volume name and values the size of the volume
365

366
  """
367
  return utils.ListVolumeGroups()
368

    
369

    
370
def NodeVolumes():
371
  """List all volumes on this node.
372

373
  """
374
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
375
                         "--separator=|",
376
                         "--options=lv_name,lv_size,devices,vg_name"])
377
  if result.failed:
378
    logging.error("Failed to list logical volumes, lvs output: %s",
379
                  result.output)
380
    return {}
381

    
382
  def parse_dev(dev):
383
    if '(' in dev:
384
      return dev.split('(')[0]
385
    else:
386
      return dev
387

    
388
  def map_line(line):
389
    return {
390
      'name': line[0].strip(),
391
      'size': line[1].strip(),
392
      'dev': parse_dev(line[2].strip()),
393
      'vg': line[3].strip(),
394
    }
395

    
396
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
397
          if line.count('|') >= 3]
398

    
399

    
400
def BridgesExist(bridges_list):
401
  """Check if a list of bridges exist on the current node.
402

403
  Returns:
404
    True if all of them exist, false otherwise
405

406
  """
407
  for bridge in bridges_list:
408
    if not utils.BridgeExists(bridge):
409
      return False
410

    
411
  return True
412

    
413

    
414
def GetInstanceList():
415
  """Provides a list of instances.
416

417
  Returns:
418
    A list of all running instances on the current node
419
    - instance1.example.com
420
    - instance2.example.com
421

422
  """
423
  try:
424
    names = hypervisor.GetHypervisor().ListInstances()
425
  except errors.HypervisorError, err:
426
    logging.exception("Error enumerating instances")
427
    raise
428

    
429
  return names
430

    
431

    
432
def GetInstanceInfo(instance):
433
  """Gives back the informations about an instance as a dictionary.
434

435
  Args:
436
    instance: name of the instance (ex. instance1.example.com)
437

438
  Returns:
439
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
440
    where
441
    memory: memory size of instance (int)
442
    state: xen state of instance (string)
443
    time: cpu time of instance (float)
444

445
  """
446
  output = {}
447

    
448
  iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
449
  if iinfo is not None:
450
    output['memory'] = iinfo[2]
451
    output['state'] = iinfo[4]
452
    output['time'] = iinfo[5]
453

    
454
  return output
455

    
456

    
457
def GetAllInstancesInfo():
458
  """Gather data about all instances.
459

460
  This is the equivalent of `GetInstanceInfo()`, except that it
461
  computes data for all instances at once, thus being faster if one
462
  needs data about more than one instance.
463

464
  Returns: a dictionary of dictionaries, keys being the instance name,
465
    and with values:
466
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
467
    where
468
    memory: memory size of instance (int)
469
    state: xen state of instance (string)
470
    time: cpu time of instance (float)
471
    vcpus: the number of cpus
472

473
  """
474
  output = {}
475

    
476
  iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
477
  if iinfo:
478
    for name, inst_id, memory, vcpus, state, times in iinfo:
479
      output[name] = {
480
        'memory': memory,
481
        'vcpus': vcpus,
482
        'state': state,
483
        'time': times,
484
        }
485

    
486
  return output
487

    
488

    
489
def AddOSToInstance(instance, os_disk, swap_disk):
490
  """Add an OS to an instance.
491

492
  Args:
493
    instance: the instance object
494
    os_disk: the instance-visible name of the os device
495
    swap_disk: the instance-visible name of the swap device
496

497
  """
498
  inst_os = OSFromDisk(instance.os)
499

    
500
  create_script = inst_os.create_script
501

    
502
  os_device = instance.FindDisk(os_disk)
503
  if os_device is None:
504
    logging.error("Can't find this device-visible name '%s'", os_disk)
505
    return False
506

    
507
  swap_device = instance.FindDisk(swap_disk)
508
  if swap_device is None:
509
    logging.error("Can't find this device-visible name '%s'", swap_disk)
510
    return False
511

    
512
  real_os_dev = _RecursiveFindBD(os_device)
513
  if real_os_dev is None:
514
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
515
                                  str(os_device))
516
  real_os_dev.Open()
517

    
518
  real_swap_dev = _RecursiveFindBD(swap_device)
519
  if real_swap_dev is None:
520
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
521
                                  str(swap_device))
522
  real_swap_dev.Open()
523

    
524
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
525
                                     instance.name, int(time.time()))
526
  if not os.path.exists(constants.LOG_OS_DIR):
527
    os.mkdir(constants.LOG_OS_DIR, 0750)
528

    
529
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
530
                                inst_os.path, create_script, instance.name,
531
                                real_os_dev.dev_path, real_swap_dev.dev_path,
532
                                logfile)
533
  env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
534

    
535
  result = utils.RunCmd(command, env=env)
536
  if result.failed:
537
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
538
                  " output: %s", command, result.fail_reason, logfile,
539
                  result.output)
540
    return False
541

    
542
  return True
543

    
544

    
545
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
546
  """Run the OS rename script for an instance.
547

548
  Args:
549
    instance: the instance object
550
    old_name: the old name of the instance
551
    os_disk: the instance-visible name of the os device
552
    swap_disk: the instance-visible name of the swap device
553

554
  """
555
  inst_os = OSFromDisk(instance.os)
556

    
557
  script = inst_os.rename_script
558

    
559
  os_device = instance.FindDisk(os_disk)
560
  if os_device is None:
561
    logging.error("Can't find this device-visible name '%s'", os_disk)
562
    return False
563

    
564
  swap_device = instance.FindDisk(swap_disk)
565
  if swap_device is None:
566
    logging.error("Can't find this device-visible name '%s'", swap_disk)
567
    return False
568

    
569
  real_os_dev = _RecursiveFindBD(os_device)
570
  if real_os_dev is None:
571
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
572
                                  str(os_device))
573
  real_os_dev.Open()
574

    
575
  real_swap_dev = _RecursiveFindBD(swap_device)
576
  if real_swap_dev is None:
577
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
578
                                  str(swap_device))
579
  real_swap_dev.Open()
580

    
581
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
582
                                           old_name,
583
                                           instance.name, int(time.time()))
584
  if not os.path.exists(constants.LOG_OS_DIR):
585
    os.mkdir(constants.LOG_OS_DIR, 0750)
586

    
587
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
588
                                inst_os.path, script, old_name, instance.name,
589
                                real_os_dev.dev_path, real_swap_dev.dev_path,
590
                                logfile)
591

    
592
  result = utils.RunCmd(command)
593

    
594
  if result.failed:
595
    logging.error("os create command '%s' returned error: %s output: %s",
596
                  command, result.fail_reason, result.output)
597
    return False
598

    
599
  return True
600

    
601

    
602
def _GetVGInfo(vg_name):
603
  """Get informations about the volume group.
604

605
  Args:
606
    vg_name: the volume group
607

608
  Returns:
609
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
610
    where
611
    vg_size is the total size of the volume group in MiB
612
    vg_free is the free size of the volume group in MiB
613
    pv_count are the number of physical disks in that vg
614

615
  If an error occurs during gathering of data, we return the same dict
616
  with keys all set to None.
617

618
  """
619
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
620

    
621
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
622
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
623

    
624
  if retval.failed:
625
    logging.error("volume group %s not present", vg_name)
626
    return retdic
627
  valarr = retval.stdout.strip().rstrip(':').split(':')
628
  if len(valarr) == 3:
629
    try:
630
      retdic = {
631
        "vg_size": int(round(float(valarr[0]), 0)),
632
        "vg_free": int(round(float(valarr[1]), 0)),
633
        "pv_count": int(valarr[2]),
634
        }
635
    except ValueError, err:
636
      logging.exception("Fail to parse vgs output")
637
  else:
638
    logging.error("vgs output has the wrong number of fields (expected"
639
                  " three): %s", str(valarr))
640
  return retdic
641

    
642

    
643
def _GatherBlockDevs(instance):
644
  """Set up an instance's block device(s).
645

646
  This is run on the primary node at instance startup. The block
647
  devices must be already assembled.
648

649
  """
650
  block_devices = []
651
  for disk in instance.disks:
652
    device = _RecursiveFindBD(disk)
653
    if device is None:
654
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
655
                                    str(disk))
656
    device.Open()
657
    block_devices.append((disk, device))
658
  return block_devices
659

    
660

    
661
def StartInstance(instance, extra_args):
662
  """Start an instance.
663

664
  Args:
665
    instance - name of instance to start.
666

667
  """
668
  running_instances = GetInstanceList()
669

    
670
  if instance.name in running_instances:
671
    return True
672

    
673
  block_devices = _GatherBlockDevs(instance)
674
  hyper = hypervisor.GetHypervisor()
675

    
676
  try:
677
    hyper.StartInstance(instance, block_devices, extra_args)
678
  except errors.HypervisorError, err:
679
    logging.exception("Failed to start instance")
680
    return False
681

    
682
  return True
683

    
684

    
685
def ShutdownInstance(instance):
686
  """Shut an instance down.
687

688
  Args:
689
    instance - name of instance to shutdown.
690

691
  """
692
  running_instances = GetInstanceList()
693

    
694
  if instance.name not in running_instances:
695
    return True
696

    
697
  hyper = hypervisor.GetHypervisor()
698
  try:
699
    hyper.StopInstance(instance)
700
  except errors.HypervisorError, err:
701
    logging.error("Failed to stop instance")
702
    return False
703

    
704
  # test every 10secs for 2min
705
  shutdown_ok = False
706

    
707
  time.sleep(1)
708
  for dummy in range(11):
709
    if instance.name not in GetInstanceList():
710
      break
711
    time.sleep(10)
712
  else:
713
    # the shutdown did not succeed
714
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
715

    
716
    try:
717
      hyper.StopInstance(instance, force=True)
718
    except errors.HypervisorError, err:
719
      logging.exception("Failed to stop instance")
720
      return False
721

    
722
    time.sleep(1)
723
    if instance.name in GetInstanceList():
724
      logging.error("could not shutdown instance '%s' even by destroy",
725
                    instance.name)
726
      return False
727

    
728
  return True
729

    
730

    
731
def RebootInstance(instance, reboot_type, extra_args):
732
  """Reboot an instance.
733

734
  Args:
735
    instance    - name of instance to reboot
736
    reboot_type - how to reboot [soft,hard,full]
737

738
  """
739
  running_instances = GetInstanceList()
740

    
741
  if instance.name not in running_instances:
742
    logging.error("Cannot reboot instance that is not running")
743
    return False
744

    
745
  hyper = hypervisor.GetHypervisor()
746
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
747
    try:
748
      hyper.RebootInstance(instance)
749
    except errors.HypervisorError, err:
750
      logging.exception("Failed to soft reboot instance")
751
      return False
752
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
753
    try:
754
      ShutdownInstance(instance)
755
      StartInstance(instance, extra_args)
756
    except errors.HypervisorError, err:
757
      logging.exception("Failed to hard reboot instance")
758
      return False
759
  else:
760
    raise errors.ParameterError("reboot_type invalid")
761

    
762

    
763
  return True
764

    
765

    
766
def MigrateInstance(instance, target, live):
767
  """Migrates an instance to another node.
768

769
  """
770
  hyper = hypervisor.GetHypervisor()
771

    
772
  try:
773
    hyper.MigrateInstance(instance, target, live)
774
  except errors.HypervisorError, err:
775
    msg = "Failed to migrate instance: %s" % str(err)
776
    logging.error(msg)
777
    return (False, msg)
778
  return (True, "Migration successfull")
779

    
780

    
781
def CreateBlockDevice(disk, size, owner, on_primary, info):
782
  """Creates a block device for an instance.
783

784
  Args:
785
   disk: a ganeti.objects.Disk object
786
   size: the size of the physical underlying device
787
   owner: a string with the name of the instance
788
   on_primary: a boolean indicating if it is the primary node or not
789
   info: string that will be sent to the physical device creation
790

791
  Returns:
792
    the new unique_id of the device (this can sometime be
793
    computed only after creation), or None. On secondary nodes,
794
    it's not required to return anything.
795

796
  """
797
  clist = []
798
  if disk.children:
799
    for child in disk.children:
800
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
801
      if on_primary or disk.AssembleOnSecondary():
802
        # we need the children open in case the device itself has to
803
        # be assembled
804
        crdev.Open()
805
      clist.append(crdev)
806
  try:
807
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
808
    if device is not None:
809
      logging.info("removing existing device %s", disk)
810
      device.Remove()
811
  except errors.BlockDeviceError, err:
812
    pass
813

    
814
  device = bdev.Create(disk.dev_type, disk.physical_id,
815
                       clist, size)
816
  if device is None:
817
    raise ValueError("Can't create child device for %s, %s" %
818
                     (disk, size))
819
  if on_primary or disk.AssembleOnSecondary():
820
    if not device.Assemble():
821
      errorstring = "Can't assemble device after creation"
822
      logging.error(errorstring)
823
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
824
                                    " daemon logs" % errorstring)
825
    device.SetSyncSpeed(constants.SYNC_SPEED)
826
    if on_primary or disk.OpenOnSecondary():
827
      device.Open(force=True)
828
    DevCacheManager.UpdateCache(device.dev_path, owner,
829
                                on_primary, disk.iv_name)
830

    
831
  device.SetInfo(info)
832

    
833
  physical_id = device.unique_id
834
  return physical_id
835

    
836

    
837
def RemoveBlockDevice(disk):
838
  """Remove a block device.
839

840
  This is intended to be called recursively.
841

842
  """
843
  try:
844
    # since we are removing the device, allow a partial match
845
    # this allows removal of broken mirrors
846
    rdev = _RecursiveFindBD(disk, allow_partial=True)
847
  except errors.BlockDeviceError, err:
848
    # probably can't attach
849
    logging.info("Can't attach to device %s in remove", disk)
850
    rdev = None
851
  if rdev is not None:
852
    r_path = rdev.dev_path
853
    result = rdev.Remove()
854
    if result:
855
      DevCacheManager.RemoveCache(r_path)
856
  else:
857
    result = True
858
  if disk.children:
859
    for child in disk.children:
860
      result = result and RemoveBlockDevice(child)
861
  return result
862

    
863

    
864
def _RecursiveAssembleBD(disk, owner, as_primary):
865
  """Activate a block device for an instance.
866

867
  This is run on the primary and secondary nodes for an instance.
868

869
  This function is called recursively.
870

871
  Args:
872
    disk: a objects.Disk object
873
    as_primary: if we should make the block device read/write
874

875
  Returns:
876
    the assembled device or None (in case no device was assembled)
877

878
  If the assembly is not successful, an exception is raised.
879

880
  """
881
  children = []
882
  if disk.children:
883
    mcn = disk.ChildrenNeeded()
884
    if mcn == -1:
885
      mcn = 0 # max number of Nones allowed
886
    else:
887
      mcn = len(disk.children) - mcn # max number of Nones
888
    for chld_disk in disk.children:
889
      try:
890
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
891
      except errors.BlockDeviceError, err:
892
        if children.count(None) >= mcn:
893
          raise
894
        cdev = None
895
        logging.debug("Error in child activation: %s", str(err))
896
      children.append(cdev)
897

    
898
  if as_primary or disk.AssembleOnSecondary():
899
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
900
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
901
    result = r_dev
902
    if as_primary or disk.OpenOnSecondary():
903
      r_dev.Open()
904
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
905
                                as_primary, disk.iv_name)
906

    
907
  else:
908
    result = True
909
  return result
910

    
911

    
912
def AssembleBlockDevice(disk, owner, as_primary):
913
  """Activate a block device for an instance.
914

915
  This is a wrapper over _RecursiveAssembleBD.
916

917
  Returns:
918
    a /dev path for primary nodes
919
    True for secondary nodes
920

921
  """
922
  result = _RecursiveAssembleBD(disk, owner, as_primary)
923
  if isinstance(result, bdev.BlockDev):
924
    result = result.dev_path
925
  return result
926

    
927

    
928
def ShutdownBlockDevice(disk):
929
  """Shut down a block device.
930

931
  First, if the device is assembled (can `Attach()`), then the device
932
  is shutdown. Then the children of the device are shutdown.
933

934
  This function is called recursively. Note that we don't cache the
935
  children or such, as oppossed to assemble, shutdown of different
936
  devices doesn't require that the upper device was active.
937

938
  """
939
  r_dev = _RecursiveFindBD(disk)
940
  if r_dev is not None:
941
    r_path = r_dev.dev_path
942
    result = r_dev.Shutdown()
943
    if result:
944
      DevCacheManager.RemoveCache(r_path)
945
  else:
946
    result = True
947
  if disk.children:
948
    for child in disk.children:
949
      result = result and ShutdownBlockDevice(child)
950
  return result
951

    
952

    
953
def MirrorAddChildren(parent_cdev, new_cdevs):
954
  """Extend a mirrored block device.
955

956
  """
957
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
958
  if parent_bdev is None:
959
    logging.error("Can't find parent device")
960
    return False
961
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
962
  if new_bdevs.count(None) > 0:
963
    logging.error("Can't find new device(s) to add: %s:%s",
964
                  new_bdevs, new_cdevs)
965
    return False
966
  parent_bdev.AddChildren(new_bdevs)
967
  return True
968

    
969

    
970
def MirrorRemoveChildren(parent_cdev, new_cdevs):
971
  """Shrink a mirrored block device.
972

973
  """
974
  parent_bdev = _RecursiveFindBD(parent_cdev)
975
  if parent_bdev is None:
976
    logging.error("Can't find parent in remove children: %s", parent_cdev)
977
    return False
978
  devs = []
979
  for disk in new_cdevs:
980
    rpath = disk.StaticDevPath()
981
    if rpath is None:
982
      bd = _RecursiveFindBD(disk)
983
      if bd is None:
984
        logging.error("Can't find dynamic device %s while removing children",
985
                      disk)
986
        return False
987
      else:
988
        devs.append(bd.dev_path)
989
    else:
990
      devs.append(rpath)
991
  parent_bdev.RemoveChildren(devs)
992
  return True
993

    
994

    
995
def GetMirrorStatus(disks):
996
  """Get the mirroring status of a list of devices.
997

998
  Args:
999
    disks: list of `objects.Disk`
1000

1001
  Returns:
1002
    list of (mirror_done, estimated_time) tuples, which
1003
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1004

1005
  """
1006
  stats = []
1007
  for dsk in disks:
1008
    rbd = _RecursiveFindBD(dsk)
1009
    if rbd is None:
1010
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1011
    stats.append(rbd.CombinedSyncStatus())
1012
  return stats
1013

    
1014

    
1015
def _RecursiveFindBD(disk, allow_partial=False):
1016
  """Check if a device is activated.
1017

1018
  If so, return informations about the real device.
1019

1020
  Args:
1021
    disk: the objects.Disk instance
1022
    allow_partial: don't abort the find if a child of the
1023
                   device can't be found; this is intended to be
1024
                   used when repairing mirrors
1025

1026
  Returns:
1027
    None if the device can't be found
1028
    otherwise the device instance
1029

1030
  """
1031
  children = []
1032
  if disk.children:
1033
    for chdisk in disk.children:
1034
      children.append(_RecursiveFindBD(chdisk))
1035

    
1036
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1037

    
1038

    
1039
def FindBlockDevice(disk):
1040
  """Check if a device is activated.
1041

1042
  If so, return informations about the real device.
1043

1044
  Args:
1045
    disk: the objects.Disk instance
1046
  Returns:
1047
    None if the device can't be found
1048
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1049

1050
  """
1051
  rbd = _RecursiveFindBD(disk)
1052
  if rbd is None:
1053
    return rbd
1054
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1055

    
1056

    
1057
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1058
  """Write a file to the filesystem.
1059

1060
  This allows the master to overwrite(!) a file. It will only perform
1061
  the operation if the file belongs to a list of configuration files.
1062

1063
  """
1064
  if not os.path.isabs(file_name):
1065
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1066
                  file_name)
1067
    return False
1068

    
1069
  allowed_files = [
1070
    constants.CLUSTER_CONF_FILE,
1071
    constants.ETC_HOSTS,
1072
    constants.SSH_KNOWN_HOSTS_FILE,
1073
    constants.VNC_PASSWORD_FILE,
1074
    ]
1075
  allowed_files.extend(ssconf.SimpleStore().GetFileList())
1076

    
1077
  if file_name not in allowed_files:
1078
    logging.error("Filename passed to UploadFile not in allowed"
1079
                 " upload targets: '%s'", file_name)
1080
    return False
1081

    
1082
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1083
                  atime=atime, mtime=mtime)
1084
  return True
1085

    
1086

    
1087
def _ErrnoOrStr(err):
1088
  """Format an EnvironmentError exception.
1089

1090
  If the `err` argument has an errno attribute, it will be looked up
1091
  and converted into a textual EXXXX description. Otherwise the string
1092
  representation of the error will be returned.
1093

1094
  """
1095
  if hasattr(err, 'errno'):
1096
    detail = errno.errorcode[err.errno]
1097
  else:
1098
    detail = str(err)
1099
  return detail
1100

    
1101

    
1102
def _OSOndiskVersion(name, os_dir):
1103
  """Compute and return the API version of a given OS.
1104

1105
  This function will try to read the API version of the os given by
1106
  the 'name' parameter and residing in the 'os_dir' directory.
1107

1108
  Return value will be either an integer denoting the version or None in the
1109
  case when this is not a valid OS name.
1110

1111
  """
1112
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1113

    
1114
  try:
1115
    st = os.stat(api_file)
1116
  except EnvironmentError, err:
1117
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1118
                           " found (%s)" % _ErrnoOrStr(err))
1119

    
1120
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1121
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1122
                           " a regular file")
1123

    
1124
  try:
1125
    f = open(api_file)
1126
    try:
1127
      api_version = f.read(256)
1128
    finally:
1129
      f.close()
1130
  except EnvironmentError, err:
1131
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1132
                           " API version (%s)" % _ErrnoOrStr(err))
1133

    
1134
  api_version = api_version.strip()
1135
  try:
1136
    api_version = int(api_version)
1137
  except (TypeError, ValueError), err:
1138
    raise errors.InvalidOS(name, os_dir,
1139
                           "API version is not integer (%s)" % str(err))
1140

    
1141
  return api_version
1142

    
1143

    
1144
def DiagnoseOS(top_dirs=None):
1145
  """Compute the validity for all OSes.
1146

1147
  Returns an OS object for each name in all the given top directories
1148
  (if not given defaults to constants.OS_SEARCH_PATH)
1149

1150
  Returns:
1151
    list of OS objects
1152

1153
  """
1154
  if top_dirs is None:
1155
    top_dirs = constants.OS_SEARCH_PATH
1156

    
1157
  result = []
1158
  for dir_name in top_dirs:
1159
    if os.path.isdir(dir_name):
1160
      try:
1161
        f_names = utils.ListVisibleFiles(dir_name)
1162
      except EnvironmentError, err:
1163
        logging.exception("Can't list the OS directory %s", dir_name)
1164
        break
1165
      for name in f_names:
1166
        try:
1167
          os_inst = OSFromDisk(name, base_dir=dir_name)
1168
          result.append(os_inst)
1169
        except errors.InvalidOS, err:
1170
          result.append(objects.OS.FromInvalidOS(err))
1171

    
1172
  return result
1173

    
1174

    
1175
def OSFromDisk(name, base_dir=None):
1176
  """Create an OS instance from disk.
1177

1178
  This function will return an OS instance if the given name is a
1179
  valid OS name. Otherwise, it will raise an appropriate
1180
  `errors.InvalidOS` exception, detailing why this is not a valid
1181
  OS.
1182

1183
  Args:
1184
    os_dir: Directory containing the OS scripts. Defaults to a search
1185
            in all the OS_SEARCH_PATH directories.
1186

1187
  """
1188

    
1189
  if base_dir is None:
1190
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1191
    if os_dir is None:
1192
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1193
  else:
1194
    os_dir = os.path.sep.join([base_dir, name])
1195

    
1196
  api_version = _OSOndiskVersion(name, os_dir)
1197

    
1198
  if api_version != constants.OS_API_VERSION:
1199
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1200
                           " (found %s want %s)"
1201
                           % (api_version, constants.OS_API_VERSION))
1202

    
1203
  # OS Scripts dictionary, we will populate it with the actual script names
1204
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1205

    
1206
  for script in os_scripts:
1207
    os_scripts[script] = os.path.sep.join([os_dir, script])
1208

    
1209
    try:
1210
      st = os.stat(os_scripts[script])
1211
    except EnvironmentError, err:
1212
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1213
                             (script, _ErrnoOrStr(err)))
1214

    
1215
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1216
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1217
                             script)
1218

    
1219
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1220
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1221
                             script)
1222

    
1223

    
1224
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1225
                    create_script=os_scripts['create'],
1226
                    export_script=os_scripts['export'],
1227
                    import_script=os_scripts['import'],
1228
                    rename_script=os_scripts['rename'],
1229
                    api_version=api_version)
1230

    
1231

    
1232
def GrowBlockDevice(disk, amount):
1233
  """Grow a stack of block devices.
1234

1235
  This function is called recursively, with the childrens being the
1236
  first one resize.
1237

1238
  Args:
1239
    disk: the disk to be grown
1240

1241
  Returns: a tuple of (status, result), with:
1242
    status: the result (true/false) of the operation
1243
    result: the error message if the operation failed, otherwise not used
1244

1245
  """
1246
  r_dev = _RecursiveFindBD(disk)
1247
  if r_dev is None:
1248
    return False, "Cannot find block device %s" % (disk,)
1249

    
1250
  try:
1251
    r_dev.Grow(amount)
1252
  except errors.BlockDeviceError, err:
1253
    return False, str(err)
1254

    
1255
  return True, None
1256

    
1257

    
1258
def SnapshotBlockDevice(disk):
1259
  """Create a snapshot copy of a block device.
1260

1261
  This function is called recursively, and the snapshot is actually created
1262
  just for the leaf lvm backend device.
1263

1264
  Args:
1265
    disk: the disk to be snapshotted
1266

1267
  Returns:
1268
    a config entry for the actual lvm device snapshotted.
1269

1270
  """
1271
  if disk.children:
1272
    if len(disk.children) == 1:
1273
      # only one child, let's recurse on it
1274
      return SnapshotBlockDevice(disk.children[0])
1275
    else:
1276
      # more than one child, choose one that matches
1277
      for child in disk.children:
1278
        if child.size == disk.size:
1279
          # return implies breaking the loop
1280
          return SnapshotBlockDevice(child)
1281
  elif disk.dev_type == constants.LD_LV:
1282
    r_dev = _RecursiveFindBD(disk)
1283
    if r_dev is not None:
1284
      # let's stay on the safe side and ask for the full size, for now
1285
      return r_dev.Snapshot(disk.size)
1286
    else:
1287
      return None
1288
  else:
1289
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1290
                                 " '%s' of type '%s'" %
1291
                                 (disk.unique_id, disk.dev_type))
1292

    
1293

    
1294
def ExportSnapshot(disk, dest_node, instance):
1295
  """Export a block device snapshot to a remote node.
1296

1297
  Args:
1298
    disk: the snapshot block device
1299
    dest_node: the node to send the image to
1300
    instance: instance being exported
1301

1302
  Returns:
1303
    True if successful, False otherwise.
1304

1305
  """
1306
  inst_os = OSFromDisk(instance.os)
1307
  export_script = inst_os.export_script
1308

    
1309
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1310
                                     instance.name, int(time.time()))
1311
  if not os.path.exists(constants.LOG_OS_DIR):
1312
    os.mkdir(constants.LOG_OS_DIR, 0750)
1313

    
1314
  real_os_dev = _RecursiveFindBD(disk)
1315
  if real_os_dev is None:
1316
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1317
                                  str(disk))
1318
  real_os_dev.Open()
1319

    
1320
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1321
  destfile = disk.physical_id[1]
1322

    
1323
  # the target command is built out of three individual commands,
1324
  # which are joined by pipes; we check each individual command for
1325
  # valid parameters
1326

    
1327
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1328
                               export_script, instance.name,
1329
                               real_os_dev.dev_path, logfile)
1330

    
1331
  comprcmd = "gzip"
1332

    
1333
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1334
                                destdir, destdir, destfile)
1335
  remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1336
                                       destcmd)
1337

    
1338
  # all commands have been checked, so we're safe to combine them
1339
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1340

    
1341
  result = utils.RunCmd(command)
1342

    
1343
  if result.failed:
1344
    logging.error("os snapshot export command '%s' returned error: %s"
1345
                  " output: %s", command, result.fail_reason, result.output)
1346
    return False
1347

    
1348
  return True
1349

    
1350

    
1351
def FinalizeExport(instance, snap_disks):
1352
  """Write out the export configuration information.
1353

1354
  Args:
1355
    instance: instance configuration
1356
    snap_disks: snapshot block devices
1357

1358
  Returns:
1359
    False in case of error, True otherwise.
1360

1361
  """
1362
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1363
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1364

    
1365
  config = objects.SerializableConfigParser()
1366

    
1367
  config.add_section(constants.INISECT_EXP)
1368
  config.set(constants.INISECT_EXP, 'version', '0')
1369
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1370
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1371
  config.set(constants.INISECT_EXP, 'os', instance.os)
1372
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1373

    
1374
  config.add_section(constants.INISECT_INS)
1375
  config.set(constants.INISECT_INS, 'name', instance.name)
1376
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1377
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1378
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1379

    
1380
  nic_count = 0
1381
  for nic_count, nic in enumerate(instance.nics):
1382
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1383
               nic_count, '%s' % nic.mac)
1384
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1385
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1386
               '%s' % nic.bridge)
1387
  # TODO: redundant: on load can read nics until it doesn't exist
1388
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1389

    
1390
  disk_count = 0
1391
  for disk_count, disk in enumerate(snap_disks):
1392
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1393
               ('%s' % disk.iv_name))
1394
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1395
               ('%s' % disk.physical_id[1]))
1396
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1397
               ('%d' % disk.size))
1398
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1399

    
1400
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1401
  cfo = open(cff, 'w')
1402
  try:
1403
    config.write(cfo)
1404
  finally:
1405
    cfo.close()
1406

    
1407
  shutil.rmtree(finaldestdir, True)
1408
  shutil.move(destdir, finaldestdir)
1409

    
1410
  return True
1411

    
1412

    
1413
def ExportInfo(dest):
1414
  """Get export configuration information.
1415

1416
  Args:
1417
    dest: directory containing the export
1418

1419
  Returns:
1420
    A serializable config file containing the export info.
1421

1422
  """
1423
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1424

    
1425
  config = objects.SerializableConfigParser()
1426
  config.read(cff)
1427

    
1428
  if (not config.has_section(constants.INISECT_EXP) or
1429
      not config.has_section(constants.INISECT_INS)):
1430
    return None
1431

    
1432
  return config
1433

    
1434

    
1435
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1436
  """Import an os image into an instance.
1437

1438
  Args:
1439
    instance: the instance object
1440
    os_disk: the instance-visible name of the os device
1441
    swap_disk: the instance-visible name of the swap device
1442
    src_node: node holding the source image
1443
    src_image: path to the source image on src_node
1444

1445
  Returns:
1446
    False in case of error, True otherwise.
1447

1448
  """
1449
  inst_os = OSFromDisk(instance.os)
1450
  import_script = inst_os.import_script
1451

    
1452
  os_device = instance.FindDisk(os_disk)
1453
  if os_device is None:
1454
    logging.error("Can't find this device-visible name '%s'", os_disk)
1455
    return False
1456

    
1457
  swap_device = instance.FindDisk(swap_disk)
1458
  if swap_device is None:
1459
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1460
    return False
1461

    
1462
  real_os_dev = _RecursiveFindBD(os_device)
1463
  if real_os_dev is None:
1464
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1465
                                  str(os_device))
1466
  real_os_dev.Open()
1467

    
1468
  real_swap_dev = _RecursiveFindBD(swap_device)
1469
  if real_swap_dev is None:
1470
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1471
                                  str(swap_device))
1472
  real_swap_dev.Open()
1473

    
1474
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1475
                                        instance.name, int(time.time()))
1476
  if not os.path.exists(constants.LOG_OS_DIR):
1477
    os.mkdir(constants.LOG_OS_DIR, 0750)
1478

    
1479
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1480
  remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1481
                                       destcmd)
1482

    
1483
  comprcmd = "gunzip"
1484
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1485
                               inst_os.path, import_script, instance.name,
1486
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1487
                               logfile)
1488

    
1489
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1490
  env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
1491

    
1492
  result = utils.RunCmd(command, env=env)
1493

    
1494
  if result.failed:
1495
    logging.error("os import command '%s' returned error: %s"
1496
                  " output: %s", command, result.fail_reason, result.output)
1497
    return False
1498

    
1499
  return True
1500

    
1501

    
1502
def ListExports():
1503
  """Return a list of exports currently available on this machine.
1504

1505
  """
1506
  if os.path.isdir(constants.EXPORT_DIR):
1507
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1508
  else:
1509
    return []
1510

    
1511

    
1512
def RemoveExport(export):
1513
  """Remove an existing export from the node.
1514

1515
  Args:
1516
    export: the name of the export to remove
1517

1518
  Returns:
1519
    False in case of error, True otherwise.
1520

1521
  """
1522
  target = os.path.join(constants.EXPORT_DIR, export)
1523

    
1524
  shutil.rmtree(target)
1525
  # TODO: catch some of the relevant exceptions and provide a pretty
1526
  # error message if rmtree fails.
1527

    
1528
  return True
1529

    
1530

    
1531
def RenameBlockDevices(devlist):
1532
  """Rename a list of block devices.
1533

1534
  The devlist argument is a list of tuples (disk, new_logical,
1535
  new_physical). The return value will be a combined boolean result
1536
  (True only if all renames succeeded).
1537

1538
  """
1539
  result = True
1540
  for disk, unique_id in devlist:
1541
    dev = _RecursiveFindBD(disk)
1542
    if dev is None:
1543
      result = False
1544
      continue
1545
    try:
1546
      old_rpath = dev.dev_path
1547
      dev.Rename(unique_id)
1548
      new_rpath = dev.dev_path
1549
      if old_rpath != new_rpath:
1550
        DevCacheManager.RemoveCache(old_rpath)
1551
        # FIXME: we should add the new cache information here, like:
1552
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1553
        # but we don't have the owner here - maybe parse from existing
1554
        # cache? for now, we only lose lvm data when we rename, which
1555
        # is less critical than DRBD or MD
1556
    except errors.BlockDeviceError, err:
1557
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1558
      result = False
1559
  return result
1560

    
1561

    
1562
def _TransformFileStorageDir(file_storage_dir):
1563
  """Checks whether given file_storage_dir is valid.
1564

1565
  Checks wheter the given file_storage_dir is within the cluster-wide
1566
  default file_storage_dir stored in SimpleStore. Only paths under that
1567
  directory are allowed.
1568

1569
  Args:
1570
    file_storage_dir: string with path
1571

1572
  Returns:
1573
    normalized file_storage_dir (string) if valid, None otherwise
1574

1575
  """
1576
  file_storage_dir = os.path.normpath(file_storage_dir)
1577
  base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1578
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1579
      base_file_storage_dir):
1580
    logging.error("file storage directory '%s' is not under base file"
1581
                  " storage directory '%s'",
1582
                  file_storage_dir, base_file_storage_dir)
1583
    return None
1584
  return file_storage_dir
1585

    
1586

    
1587
def CreateFileStorageDir(file_storage_dir):
1588
  """Create file storage directory.
1589

1590
  Args:
1591
    file_storage_dir: string containing the path
1592

1593
  Returns:
1594
    tuple with first element a boolean indicating wheter dir
1595
    creation was successful or not
1596

1597
  """
1598
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1599
  result = True,
1600
  if not file_storage_dir:
1601
    result = False,
1602
  else:
1603
    if os.path.exists(file_storage_dir):
1604
      if not os.path.isdir(file_storage_dir):
1605
        logging.error("'%s' is not a directory", file_storage_dir)
1606
        result = False,
1607
    else:
1608
      try:
1609
        os.makedirs(file_storage_dir, 0750)
1610
      except OSError, err:
1611
        logging.error("Cannot create file storage directory '%s': %s",
1612
                      file_storage_dir, err)
1613
        result = False,
1614
  return result
1615

    
1616

    
1617
def RemoveFileStorageDir(file_storage_dir):
1618
  """Remove file storage directory.
1619

1620
  Remove it only if it's empty. If not log an error and return.
1621

1622
  Args:
1623
    file_storage_dir: string containing the path
1624

1625
  Returns:
1626
    tuple with first element a boolean indicating wheter dir
1627
    removal was successful or not
1628

1629
  """
1630
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1631
  result = True,
1632
  if not file_storage_dir:
1633
    result = False,
1634
  else:
1635
    if os.path.exists(file_storage_dir):
1636
      if not os.path.isdir(file_storage_dir):
1637
        logging.error("'%s' is not a directory", file_storage_dir)
1638
        result = False,
1639
      # deletes dir only if empty, otherwise we want to return False
1640
      try:
1641
        os.rmdir(file_storage_dir)
1642
      except OSError, err:
1643
        logging.exception("Cannot remove file storage directory '%s'",
1644
                          file_storage_dir)
1645
        result = False,
1646
  return result
1647

    
1648

    
1649
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1650
  """Rename the file storage directory.
1651

1652
  Args:
1653
    old_file_storage_dir: string containing the old path
1654
    new_file_storage_dir: string containing the new path
1655

1656
  Returns:
1657
    tuple with first element a boolean indicating wheter dir
1658
    rename was successful or not
1659

1660
  """
1661
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1662
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1663
  result = True,
1664
  if not old_file_storage_dir or not new_file_storage_dir:
1665
    result = False,
1666
  else:
1667
    if not os.path.exists(new_file_storage_dir):
1668
      if os.path.isdir(old_file_storage_dir):
1669
        try:
1670
          os.rename(old_file_storage_dir, new_file_storage_dir)
1671
        except OSError, err:
1672
          logging.exception("Cannot rename '%s' to '%s'",
1673
                            old_file_storage_dir, new_file_storage_dir)
1674
          result =  False,
1675
      else:
1676
        logging.error("'%s' is not a directory", old_file_storage_dir)
1677
        result = False,
1678
    else:
1679
      if os.path.exists(old_file_storage_dir):
1680
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1681
                      old_file_storage_dir, new_file_storage_dir)
1682
        result = False,
1683
  return result
1684

    
1685

    
1686
def _IsJobQueueFile(file_name):
1687
  """Checks whether the given filename is in the queue directory.
1688

1689
  """
1690
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1691
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1692

    
1693
  if not result:
1694
    logging.error("'%s' is not a file in the queue directory",
1695
                  file_name)
1696

    
1697
  return result
1698

    
1699

    
1700
def JobQueueUpdate(file_name, content):
1701
  """Updates a file in the queue directory.
1702

1703
  """
1704
  if not _IsJobQueueFile(file_name):
1705
    return False
1706

    
1707
  # Write and replace the file atomically
1708
  utils.WriteFile(file_name, data=content)
1709

    
1710
  return True
1711

    
1712

    
1713
def JobQueueRename(old, new):
1714
  """Renames a job queue file.
1715

1716
  """
1717
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1718
    return False
1719

    
1720
  os.rename(old, new)
1721

    
1722
  return True
1723

    
1724

    
1725
def CloseBlockDevices(disks):
1726
  """Closes the given block devices.
1727

1728
  This means they will be switched to secondary mode (in case of DRBD).
1729

1730
  """
1731
  bdevs = []
1732
  for cf in disks:
1733
    rd = _RecursiveFindBD(cf)
1734
    if rd is None:
1735
      return (False, "Can't find device %s" % cf)
1736
    bdevs.append(rd)
1737

    
1738
  msg = []
1739
  for rd in bdevs:
1740
    try:
1741
      rd.Close()
1742
    except errors.BlockDeviceError, err:
1743
      msg.append(str(err))
1744
  if msg:
1745
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1746
  else:
1747
    return (True, "All devices secondary")
1748

    
1749

    
1750
class HooksRunner(object):
1751
  """Hook runner.
1752

1753
  This class is instantiated on the node side (ganeti-noded) and not on
1754
  the master side.
1755

1756
  """
1757
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1758

    
1759
  def __init__(self, hooks_base_dir=None):
1760
    """Constructor for hooks runner.
1761

1762
    Args:
1763
      - hooks_base_dir: if not None, this overrides the
1764
        constants.HOOKS_BASE_DIR (useful for unittests)
1765

1766
    """
1767
    if hooks_base_dir is None:
1768
      hooks_base_dir = constants.HOOKS_BASE_DIR
1769
    self._BASE_DIR = hooks_base_dir
1770

    
1771
  @staticmethod
1772
  def ExecHook(script, env):
1773
    """Exec one hook script.
1774

1775
    Args:
1776
     - script: the full path to the script
1777
     - env: the environment with which to exec the script
1778

1779
    """
1780
    # exec the process using subprocess and log the output
1781
    fdstdin = None
1782
    try:
1783
      fdstdin = open("/dev/null", "r")
1784
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1785
                               stderr=subprocess.STDOUT, close_fds=True,
1786
                               shell=False, cwd="/", env=env)
1787
      output = ""
1788
      try:
1789
        output = child.stdout.read(4096)
1790
        child.stdout.close()
1791
      except EnvironmentError, err:
1792
        output += "Hook script error: %s" % str(err)
1793

    
1794
      while True:
1795
        try:
1796
          result = child.wait()
1797
          break
1798
        except EnvironmentError, err:
1799
          if err.errno == errno.EINTR:
1800
            continue
1801
          raise
1802
    finally:
1803
      # try not to leak fds
1804
      for fd in (fdstdin, ):
1805
        if fd is not None:
1806
          try:
1807
            fd.close()
1808
          except EnvironmentError, err:
1809
            # just log the error
1810
            #logging.exception("Error while closing fd %s", fd)
1811
            pass
1812

    
1813
    return result == 0, output
1814

    
1815
  def RunHooks(self, hpath, phase, env):
1816
    """Run the scripts in the hooks directory.
1817

1818
    This method will not be usually overriden by child opcodes.
1819

1820
    """
1821
    if phase == constants.HOOKS_PHASE_PRE:
1822
      suffix = "pre"
1823
    elif phase == constants.HOOKS_PHASE_POST:
1824
      suffix = "post"
1825
    else:
1826
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1827
    rr = []
1828

    
1829
    subdir = "%s-%s.d" % (hpath, suffix)
1830
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1831
    try:
1832
      dir_contents = utils.ListVisibleFiles(dir_name)
1833
    except OSError, err:
1834
      # must log
1835
      return rr
1836

    
1837
    # we use the standard python sort order,
1838
    # so 00name is the recommended naming scheme
1839
    dir_contents.sort()
1840
    for relname in dir_contents:
1841
      fname = os.path.join(dir_name, relname)
1842
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1843
          self.RE_MASK.match(relname) is not None):
1844
        rrval = constants.HKR_SKIP
1845
        output = ""
1846
      else:
1847
        result, output = self.ExecHook(fname, env)
1848
        if not result:
1849
          rrval = constants.HKR_FAIL
1850
        else:
1851
          rrval = constants.HKR_SUCCESS
1852
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1853

    
1854
    return rr
1855

    
1856

    
1857
class IAllocatorRunner(object):
1858
  """IAllocator runner.
1859

1860
  This class is instantiated on the node side (ganeti-noded) and not on
1861
  the master side.
1862

1863
  """
1864
  def Run(self, name, idata):
1865
    """Run an iallocator script.
1866

1867
    Return value: tuple of:
1868
       - run status (one of the IARUN_ constants)
1869
       - stdout
1870
       - stderr
1871
       - fail reason (as from utils.RunResult)
1872

1873
    """
1874
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1875
                                  os.path.isfile)
1876
    if alloc_script is None:
1877
      return (constants.IARUN_NOTFOUND, None, None, None)
1878

    
1879
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1880
    try:
1881
      os.write(fd, idata)
1882
      os.close(fd)
1883
      result = utils.RunCmd([alloc_script, fin_name])
1884
      if result.failed:
1885
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1886
                result.fail_reason)
1887
    finally:
1888
      os.unlink(fin_name)
1889

    
1890
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1891

    
1892

    
1893
class DevCacheManager(object):
1894
  """Simple class for managing a cache of block device information.
1895

1896
  """
1897
  _DEV_PREFIX = "/dev/"
1898
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1899

    
1900
  @classmethod
1901
  def _ConvertPath(cls, dev_path):
1902
    """Converts a /dev/name path to the cache file name.
1903

1904
    This replaces slashes with underscores and strips the /dev
1905
    prefix. It then returns the full path to the cache file
1906

1907
    """
1908
    if dev_path.startswith(cls._DEV_PREFIX):
1909
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1910
    dev_path = dev_path.replace("/", "_")
1911
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1912
    return fpath
1913

    
1914
  @classmethod
1915
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1916
    """Updates the cache information for a given device.
1917

1918
    """
1919
    if dev_path is None:
1920
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1921
      return
1922
    fpath = cls._ConvertPath(dev_path)
1923
    if on_primary:
1924
      state = "primary"
1925
    else:
1926
      state = "secondary"
1927
    if iv_name is None:
1928
      iv_name = "not_visible"
1929
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1930
    try:
1931
      utils.WriteFile(fpath, data=fdata)
1932
    except EnvironmentError, err:
1933
      logging.exception("Can't update bdev cache for %s", dev_path)
1934

    
1935
  @classmethod
1936
  def RemoveCache(cls, dev_path):
1937
    """Remove data for a dev_path.
1938

1939
    """
1940
    if dev_path is None:
1941
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
1942
      return
1943
    fpath = cls._ConvertPath(dev_path)
1944
    try:
1945
      utils.RemoveFile(fpath)
1946
    except EnvironmentError, err:
1947
      logging.exception("Can't update bdev cache for %s", dev_path)