Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 58f6e5ca

History | View | Annotate | Download (58.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetConfig():
48
  return ssconf.SimpleConfigReader()
49

    
50

    
51
def _GetSshRunner(cluster_name):
52
  return ssh.SshRunner(cluster_name)
53

    
54

    
55
def _CleanDirectory(path, exclude=[]):
56
  """Removes all regular files in a directory.
57

58
  @param exclude: List of files to be excluded.
59
  @type exclude: list
60

61
  """
62
  if not os.path.isdir(path):
63
    return
64

    
65
  # Normalize excluded paths
66
  exclude = [os.path.normpath(i) for i in exclude]
67

    
68
  for rel_name in utils.ListVisibleFiles(path):
69
    full_name = os.path.normpath(os.path.join(path, rel_name))
70
    if full_name in exclude:
71
      continue
72
    if os.path.isfile(full_name) and not os.path.islink(full_name):
73
      utils.RemoveFile(full_name)
74

    
75

    
76
def JobQueuePurge():
77
  """Removes job queue files and archived jobs
78

79
  """
80
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82

    
83

    
84
def GetMasterInfo():
85
  """Returns master information.
86

87
  This is an utility function to compute master information, either
88
  for consumption here or from the node daemon.
89

90
  @rtype: tuple
91
  @return: (master_netdev, master_ip, master_name)
92

93
  """
94
  try:
95
    cfg = _GetConfig()
96
    master_netdev = cfg.GetMasterNetdev()
97
    master_ip = cfg.GetMasterIP()
98
    master_node = cfg.GetMasterNode()
99
  except errors.ConfigurationError, err:
100
    logging.exception("Cluster configuration incomplete")
101
    return (None, None)
102
  return (master_netdev, master_ip, master_node)
103

    
104

    
105
def StartMaster(start_daemons):
106
  """Activate local node as master node.
107

108
  The function will always try activate the IP address of the master
109
  (if someone else has it, then it won't). Then, if the start_daemons
110
  parameter is True, it will also start the master daemons
111
  (ganet-masterd and ganeti-rapi).
112

113
  """
114
  ok = True
115
  master_netdev, master_ip, _ = GetMasterInfo()
116
  if not master_netdev:
117
    return False
118

    
119
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120
    if utils.OwnIpAddress(master_ip):
121
      # we already have the ip:
122
      logging.debug("Already started")
123
    else:
124
      logging.error("Someone else has the master ip, not activating")
125
      ok = False
126
  else:
127
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128
                           "dev", master_netdev, "label",
129
                           "%s:0" % master_netdev])
130
    if result.failed:
131
      logging.error("Can't activate master IP: %s", result.output)
132
      ok = False
133

    
134
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135
                           "-s", master_ip, master_ip])
136
    # we'll ignore the exit code of arping
137

    
138
  # and now start the master and rapi daemons
139
  if start_daemons:
140
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
141
      result = utils.RunCmd([daemon])
142
      if result.failed:
143
        logging.error("Can't start daemon %s: %s", daemon, result.output)
144
        ok = False
145
  return ok
146

    
147

    
148
def StopMaster(stop_daemons):
149
  """Deactivate this node as master.
150

151
  The function will always try to deactivate the IP address of the
152
  master. Then, if the stop_daemons parameter is True, it will also
153
  stop the master daemons (ganet-masterd and ganeti-rapi).
154

155
  """
156
  master_netdev, master_ip, _ = GetMasterInfo()
157
  if not master_netdev:
158
    return False
159

    
160
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161
                         "dev", master_netdev])
162
  if result.failed:
163
    logging.error("Can't remove the master IP, error: %s", result.output)
164
    # but otherwise ignore the failure
165

    
166
  if stop_daemons:
167
    # stop/kill the rapi and the master daemon
168
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
170

    
171
  return True
172

    
173

    
174
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175
  """Joins this node to the cluster.
176

177
  This does the following:
178
      - updates the hostkeys of the machine (rsa and dsa)
179
      - adds the ssh private key to the user
180
      - adds the ssh public key to the users' authorized_keys file
181

182
  """
183
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187
  for name, content, mode in sshd_keys:
188
    utils.WriteFile(name, data=content, mode=mode)
189

    
190
  try:
191
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
192
                                                    mkdir=True)
193
  except errors.OpExecError, err:
194
    logging.exception("Error while processing user ssh files")
195
    return False
196

    
197
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198
    utils.WriteFile(name, data=content, mode=0600)
199

    
200
  utils.AddAuthorizedKey(auth_keys, sshpub)
201

    
202
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
203

    
204
  return True
205

    
206

    
207
def LeaveCluster():
208
  """Cleans up the current node and prepares it to be removed from the cluster.
209

210
  """
211
  _CleanDirectory(constants.DATA_DIR)
212
  JobQueuePurge()
213

    
214
  try:
215
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216
  except errors.OpExecError:
217
    logging.exception("Error while processing ssh files")
218
    return
219

    
220
  f = open(pub_key, 'r')
221
  try:
222
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
223
  finally:
224
    f.close()
225

    
226
  utils.RemoveFile(priv_key)
227
  utils.RemoveFile(pub_key)
228

    
229
  # Return a reassuring string to the caller, and quit
230
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
231

    
232

    
233
def GetNodeInfo(vgname, hypervisor_type):
234
  """Gives back a hash with different informations about the node.
235

236
  @type vgname: C{string}
237
  @param vgname: the name of the volume group to ask for disk space information
238
  @type hypervisor_type: C{str}
239
  @param hypervisor_type: the name of the hypervisor to ask for
240
      memory information
241
  @rtype: C{dict}
242
  @return: dictionary with the following keys:
243
      - vg_size is the size of the configured volume group in MiB
244
      - vg_free is the free size of the volume group in MiB
245
      - memory_dom0 is the memory allocated for domain0 in MiB
246
      - memory_free is the currently available (free) ram in MiB
247
      - memory_total is the total number of ram in MiB
248

249
  """
250
  outputarray = {}
251
  vginfo = _GetVGInfo(vgname)
252
  outputarray['vg_size'] = vginfo['vg_size']
253
  outputarray['vg_free'] = vginfo['vg_free']
254

    
255
  hyper = hypervisor.GetHypervisor(hypervisor_type)
256
  hyp_info = hyper.GetNodeInfo()
257
  if hyp_info is not None:
258
    outputarray.update(hyp_info)
259

    
260
  f = open("/proc/sys/kernel/random/boot_id", 'r')
261
  try:
262
    outputarray["bootid"] = f.read(128).rstrip("\n")
263
  finally:
264
    f.close()
265

    
266
  return outputarray
267

    
268

    
269
def VerifyNode(what, cluster_name):
270
  """Verify the status of the local node.
271

272
  Based on the input L{what} parameter, various checks are done on the
273
  local node.
274

275
  If the I{filelist} key is present, this list of
276
  files is checksummed and the file/checksum pairs are returned.
277

278
  If the I{nodelist} key is present, we check that we have
279
  connectivity via ssh with the target nodes (and check the hostname
280
  report).
281

282
  If the I{node-net-test} key is present, we check that we have
283
  connectivity to the given nodes via both primary IP and, if
284
  applicable, secondary IPs.
285

286
  @type what: C{dict}
287
  @param what: a dictionary of things to check:
288
      - filelist: list of files for which to compute checksums
289
      - nodelist: list of nodes we should check ssh communication with
290
      - node-net-test: list of nodes we should check node daemon port
291
        connectivity with
292
      - hypervisor: list with hypervisors to run the verify for
293

294
  """
295
  result = {}
296

    
297
  if 'hypervisor' in what:
298
    result['hypervisor'] = my_dict = {}
299
    for hv_name in what['hypervisor']:
300
      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
301

    
302
  if 'filelist' in what:
303
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
304

    
305
  if 'nodelist' in what:
306
    result['nodelist'] = {}
307
    random.shuffle(what['nodelist'])
308
    for node in what['nodelist']:
309
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
310
      if not success:
311
        result['nodelist'][node] = message
312
  if 'node-net-test' in what:
313
    result['node-net-test'] = {}
314
    my_name = utils.HostInfo().name
315
    my_pip = my_sip = None
316
    for name, pip, sip in what['node-net-test']:
317
      if name == my_name:
318
        my_pip = pip
319
        my_sip = sip
320
        break
321
    if not my_pip:
322
      result['node-net-test'][my_name] = ("Can't find my own"
323
                                          " primary/secondary IP"
324
                                          " in the node list")
325
    else:
326
      port = utils.GetNodeDaemonPort()
327
      for name, pip, sip in what['node-net-test']:
328
        fail = []
329
        if not utils.TcpPing(pip, port, source=my_pip):
330
          fail.append("primary")
331
        if sip != pip:
332
          if not utils.TcpPing(sip, port, source=my_sip):
333
            fail.append("secondary")
334
        if fail:
335
          result['node-net-test'][name] = ("failure using the %s"
336
                                           " interface(s)" %
337
                                           " and ".join(fail))
338

    
339
  return result
340

    
341

    
342
def GetVolumeList(vg_name):
343
  """Compute list of logical volumes and their size.
344

345
  Returns:
346
    dictionary of all partions (key) with their size (in MiB), inactive
347
    and online status:
348
    {'test1': ('20.06', True, True)}
349

350
  """
351
  lvs = {}
352
  sep = '|'
353
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354
                         "--separator=%s" % sep,
355
                         "-olv_name,lv_size,lv_attr", vg_name])
356
  if result.failed:
357
    logging.error("Failed to list logical volumes, lvs output: %s",
358
                  result.output)
359
    return result.output
360

    
361
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362
  for line in result.stdout.splitlines():
363
    line = line.strip()
364
    match = valid_line_re.match(line)
365
    if not match:
366
      logging.error("Invalid line returned from lvs output: '%s'", line)
367
      continue
368
    name, size, attr = match.groups()
369
    inactive = attr[4] == '-'
370
    online = attr[5] == 'o'
371
    lvs[name] = (size, inactive, online)
372

    
373
  return lvs
374

    
375

    
376
def ListVolumeGroups():
377
  """List the volume groups and their size.
378

379
  Returns:
380
    Dictionary with keys volume name and values the size of the volume
381

382
  """
383
  return utils.ListVolumeGroups()
384

    
385

    
386
def NodeVolumes():
387
  """List all volumes on this node.
388

389
  """
390
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
391
                         "--separator=|",
392
                         "--options=lv_name,lv_size,devices,vg_name"])
393
  if result.failed:
394
    logging.error("Failed to list logical volumes, lvs output: %s",
395
                  result.output)
396
    return {}
397

    
398
  def parse_dev(dev):
399
    if '(' in dev:
400
      return dev.split('(')[0]
401
    else:
402
      return dev
403

    
404
  def map_line(line):
405
    return {
406
      'name': line[0].strip(),
407
      'size': line[1].strip(),
408
      'dev': parse_dev(line[2].strip()),
409
      'vg': line[3].strip(),
410
    }
411

    
412
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
413
          if line.count('|') >= 3]
414

    
415

    
416
def BridgesExist(bridges_list):
417
  """Check if a list of bridges exist on the current node.
418

419
  Returns:
420
    True if all of them exist, false otherwise
421

422
  """
423
  for bridge in bridges_list:
424
    if not utils.BridgeExists(bridge):
425
      return False
426

    
427
  return True
428

    
429

    
430
def GetInstanceList(hypervisor_list):
431
  """Provides a list of instances.
432

433
  @type hypervisor_list: list
434
  @param hypervisor_list: the list of hypervisors to query information
435

436
  @rtype: list
437
  @return: a list of all running instances on the current node
438
             - instance1.example.com
439
             - instance2.example.com
440

441
  """
442
  results = []
443
  for hname in hypervisor_list:
444
    try:
445
      names = hypervisor.GetHypervisor(hname).ListInstances()
446
      results.extend(names)
447
    except errors.HypervisorError, err:
448
      logging.exception("Error enumerating instances for hypevisor %s", hname)
449
      # FIXME: should we somehow not propagate this to the master?
450
      raise
451

    
452
  return results
453

    
454

    
455
def GetInstanceInfo(instance, hname):
456
  """Gives back the informations about an instance as a dictionary.
457

458
  @type instance: string
459
  @param instance: the instance name
460
  @type hname: string
461
  @param hname: the hypervisor type of the instance
462

463
  @rtype: dict
464
  @return: dictionary with the following keys:
465
      - memory: memory size of instance (int)
466
      - state: xen state of instance (string)
467
      - time: cpu time of instance (float)
468

469
  """
470
  output = {}
471

    
472
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473
  if iinfo is not None:
474
    output['memory'] = iinfo[2]
475
    output['state'] = iinfo[4]
476
    output['time'] = iinfo[5]
477

    
478
  return output
479

    
480

    
481
def GetAllInstancesInfo(hypervisor_list):
482
  """Gather data about all instances.
483

484
  This is the equivalent of `GetInstanceInfo()`, except that it
485
  computes data for all instances at once, thus being faster if one
486
  needs data about more than one instance.
487

488
  @type hypervisor_list: list
489
  @param hypervisor_list: list of hypervisors to query for instance data
490

491
  @rtype: dict of dicts
492
  @return: dictionary of instance: data, with data having the following keys:
493
      - memory: memory size of instance (int)
494
      - state: xen state of instance (string)
495
      - time: cpu time of instance (float)
496
      - vcpuus: the number of vcpus
497

498
  """
499
  output = {}
500

    
501
  for hname in hypervisor_list:
502
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
503
    if iinfo:
504
      for name, inst_id, memory, vcpus, state, times in iinfo:
505
        value = {
506
          'memory': memory,
507
          'vcpus': vcpus,
508
          'state': state,
509
          'time': times,
510
          }
511
        if name in output and output[name] != value:
512
          raise errors.HypervisorError("Instance %s running duplicate"
513
                                       " with different parameters" % name)
514
        output[name] = value
515

    
516
  return output
517

    
518

    
519
def AddOSToInstance(instance, os_disk, swap_disk):
520
  """Add an OS to an instance.
521

522
  Args:
523
    instance: the instance object
524
    os_disk: the instance-visible name of the os device
525
    swap_disk: the instance-visible name of the swap device
526

527
  """
528
  inst_os = OSFromDisk(instance.os)
529

    
530
  create_script = inst_os.create_script
531
  create_env = OSEnvironment(instance)
532

    
533
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
534
                                     instance.name, int(time.time()))
535
  if not os.path.exists(constants.LOG_OS_DIR):
536
    os.mkdir(constants.LOG_OS_DIR, 0750)
537

    
538
  command = utils.BuildShellCmd("cd %s && %s &>%s",
539
                                inst_os.path, create_script, logfile)
540

    
541
  result = utils.RunCmd(command, env=create_env)
542
  if result.failed:
543
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
544
                  " output: %s", command, result.fail_reason, logfile,
545
                  result.output)
546
    return False
547

    
548
  return True
549

    
550

    
551
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
552
  """Run the OS rename script for an instance.
553

554
  Args:
555
    instance: the instance object
556
    old_name: the old name of the instance
557
    os_disk: the instance-visible name of the os device
558
    swap_disk: the instance-visible name of the swap device
559

560
  """
561
  inst_os = OSFromDisk(instance.os)
562

    
563
  script = inst_os.rename_script
564
  rename_env = OSEnvironment(instance)
565
  rename_env['OLD_INSTANCE_NAME'] = old_name
566

    
567
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
568
                                           old_name,
569
                                           instance.name, int(time.time()))
570
  if not os.path.exists(constants.LOG_OS_DIR):
571
    os.mkdir(constants.LOG_OS_DIR, 0750)
572

    
573
  command = utils.BuildShellCmd("cd %s && %s &>%s",
574
                                inst_os.path, script, logfile)
575

    
576
  result = utils.RunCmd(command, env=rename_env)
577

    
578
  if result.failed:
579
    logging.error("os create command '%s' returned error: %s output: %s",
580
                  command, result.fail_reason, result.output)
581
    return False
582

    
583
  return True
584

    
585

    
586
def _GetVGInfo(vg_name):
587
  """Get informations about the volume group.
588

589
  Args:
590
    vg_name: the volume group
591

592
  Returns:
593
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
594
    where
595
    vg_size is the total size of the volume group in MiB
596
    vg_free is the free size of the volume group in MiB
597
    pv_count are the number of physical disks in that vg
598

599
  If an error occurs during gathering of data, we return the same dict
600
  with keys all set to None.
601

602
  """
603
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
604

    
605
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
606
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
607

    
608
  if retval.failed:
609
    logging.error("volume group %s not present", vg_name)
610
    return retdic
611
  valarr = retval.stdout.strip().rstrip(':').split(':')
612
  if len(valarr) == 3:
613
    try:
614
      retdic = {
615
        "vg_size": int(round(float(valarr[0]), 0)),
616
        "vg_free": int(round(float(valarr[1]), 0)),
617
        "pv_count": int(valarr[2]),
618
        }
619
    except ValueError, err:
620
      logging.exception("Fail to parse vgs output")
621
  else:
622
    logging.error("vgs output has the wrong number of fields (expected"
623
                  " three): %s", str(valarr))
624
  return retdic
625

    
626

    
627
def _GatherBlockDevs(instance):
628
  """Set up an instance's block device(s).
629

630
  This is run on the primary node at instance startup. The block
631
  devices must be already assembled.
632

633
  """
634
  block_devices = []
635
  for disk in instance.disks:
636
    device = _RecursiveFindBD(disk)
637
    if device is None:
638
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
639
                                    str(disk))
640
    device.Open()
641
    block_devices.append((disk, device))
642
  return block_devices
643

    
644

    
645
def StartInstance(instance, extra_args):
646
  """Start an instance.
647

648
  @type instance: instance object
649
  @param instance: the instance object
650
  @rtype: boolean
651
  @return: whether the startup was successful or not
652

653
  """
654
  running_instances = GetInstanceList([instance.hypervisor])
655

    
656
  if instance.name in running_instances:
657
    return True
658

    
659
  block_devices = _GatherBlockDevs(instance)
660
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
661

    
662
  try:
663
    hyper.StartInstance(instance, block_devices, extra_args)
664
  except errors.HypervisorError, err:
665
    logging.exception("Failed to start instance")
666
    return False
667

    
668
  return True
669

    
670

    
671
def ShutdownInstance(instance):
672
  """Shut an instance down.
673

674
  @type instance: instance object
675
  @param instance: the instance object
676
  @rtype: boolean
677
  @return: whether the startup was successful or not
678

679
  """
680
  hv_name = instance.hypervisor
681
  running_instances = GetInstanceList([hv_name])
682

    
683
  if instance.name not in running_instances:
684
    return True
685

    
686
  hyper = hypervisor.GetHypervisor(hv_name)
687
  try:
688
    hyper.StopInstance(instance)
689
  except errors.HypervisorError, err:
690
    logging.error("Failed to stop instance")
691
    return False
692

    
693
  # test every 10secs for 2min
694
  shutdown_ok = False
695

    
696
  time.sleep(1)
697
  for dummy in range(11):
698
    if instance.name not in GetInstanceList([hv_name]):
699
      break
700
    time.sleep(10)
701
  else:
702
    # the shutdown did not succeed
703
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
704

    
705
    try:
706
      hyper.StopInstance(instance, force=True)
707
    except errors.HypervisorError, err:
708
      logging.exception("Failed to stop instance")
709
      return False
710

    
711
    time.sleep(1)
712
    if instance.name in GetInstanceList([hv_name]):
713
      logging.error("could not shutdown instance '%s' even by destroy",
714
                    instance.name)
715
      return False
716

    
717
  return True
718

    
719

    
720
def RebootInstance(instance, reboot_type, extra_args):
721
  """Reboot an instance.
722

723
  Args:
724
    instance    - name of instance to reboot
725
    reboot_type - how to reboot [soft,hard,full]
726

727
  """
728
  running_instances = GetInstanceList([instance.hypervisor])
729

    
730
  if instance.name not in running_instances:
731
    logging.error("Cannot reboot instance that is not running")
732
    return False
733

    
734
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
735
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
736
    try:
737
      hyper.RebootInstance(instance)
738
    except errors.HypervisorError, err:
739
      logging.exception("Failed to soft reboot instance")
740
      return False
741
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
742
    try:
743
      ShutdownInstance(instance)
744
      StartInstance(instance, extra_args)
745
    except errors.HypervisorError, err:
746
      logging.exception("Failed to hard reboot instance")
747
      return False
748
  else:
749
    raise errors.ParameterError("reboot_type invalid")
750

    
751
  return True
752

    
753

    
754
def MigrateInstance(instance, target, live):
755
  """Migrates an instance to another node.
756

757
  @type instance: C{objects.Instance}
758
  @param instance: the instance definition
759
  @type target: string
760
  @param target: the target node name
761
  @type live: boolean
762
  @param live: whether the migration should be done live or not (the
763
      interpretation of this parameter is left to the hypervisor)
764
  @rtype: tuple
765
  @return: a tuple of (success, msg) where:
766
      - succes is a boolean denoting the success/failure of the operation
767
      - msg is a string with details in case of failure
768

769
  """
770
  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
771

    
772
  try:
773
    hyper.MigrateInstance(instance.name, target, live)
774
  except errors.HypervisorError, err:
775
    msg = "Failed to migrate instance: %s" % str(err)
776
    logging.error(msg)
777
    return (False, msg)
778
  return (True, "Migration successfull")
779

    
780

    
781
def CreateBlockDevice(disk, size, owner, on_primary, info):
782
  """Creates a block device for an instance.
783

784
  Args:
785
   disk: a ganeti.objects.Disk object
786
   size: the size of the physical underlying device
787
   owner: a string with the name of the instance
788
   on_primary: a boolean indicating if it is the primary node or not
789
   info: string that will be sent to the physical device creation
790

791
  Returns:
792
    the new unique_id of the device (this can sometime be
793
    computed only after creation), or None. On secondary nodes,
794
    it's not required to return anything.
795

796
  """
797
  clist = []
798
  if disk.children:
799
    for child in disk.children:
800
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
801
      if on_primary or disk.AssembleOnSecondary():
802
        # we need the children open in case the device itself has to
803
        # be assembled
804
        crdev.Open()
805
      clist.append(crdev)
806
  try:
807
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
808
    if device is not None:
809
      logging.info("removing existing device %s", disk)
810
      device.Remove()
811
  except errors.BlockDeviceError, err:
812
    pass
813

    
814
  device = bdev.Create(disk.dev_type, disk.physical_id,
815
                       clist, size)
816
  if device is None:
817
    raise ValueError("Can't create child device for %s, %s" %
818
                     (disk, size))
819
  if on_primary or disk.AssembleOnSecondary():
820
    if not device.Assemble():
821
      errorstring = "Can't assemble device after creation"
822
      logging.error(errorstring)
823
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
824
                                    " daemon logs" % errorstring)
825
    device.SetSyncSpeed(constants.SYNC_SPEED)
826
    if on_primary or disk.OpenOnSecondary():
827
      device.Open(force=True)
828
    DevCacheManager.UpdateCache(device.dev_path, owner,
829
                                on_primary, disk.iv_name)
830

    
831
  device.SetInfo(info)
832

    
833
  physical_id = device.unique_id
834
  return physical_id
835

    
836

    
837
def RemoveBlockDevice(disk):
838
  """Remove a block device.
839

840
  This is intended to be called recursively.
841

842
  """
843
  try:
844
    # since we are removing the device, allow a partial match
845
    # this allows removal of broken mirrors
846
    rdev = _RecursiveFindBD(disk, allow_partial=True)
847
  except errors.BlockDeviceError, err:
848
    # probably can't attach
849
    logging.info("Can't attach to device %s in remove", disk)
850
    rdev = None
851
  if rdev is not None:
852
    r_path = rdev.dev_path
853
    result = rdev.Remove()
854
    if result:
855
      DevCacheManager.RemoveCache(r_path)
856
  else:
857
    result = True
858
  if disk.children:
859
    for child in disk.children:
860
      result = result and RemoveBlockDevice(child)
861
  return result
862

    
863

    
864
def _RecursiveAssembleBD(disk, owner, as_primary):
865
  """Activate a block device for an instance.
866

867
  This is run on the primary and secondary nodes for an instance.
868

869
  This function is called recursively.
870

871
  Args:
872
    disk: a objects.Disk object
873
    as_primary: if we should make the block device read/write
874

875
  Returns:
876
    the assembled device or None (in case no device was assembled)
877

878
  If the assembly is not successful, an exception is raised.
879

880
  """
881
  children = []
882
  if disk.children:
883
    mcn = disk.ChildrenNeeded()
884
    if mcn == -1:
885
      mcn = 0 # max number of Nones allowed
886
    else:
887
      mcn = len(disk.children) - mcn # max number of Nones
888
    for chld_disk in disk.children:
889
      try:
890
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
891
      except errors.BlockDeviceError, err:
892
        if children.count(None) >= mcn:
893
          raise
894
        cdev = None
895
        logging.debug("Error in child activation: %s", str(err))
896
      children.append(cdev)
897

    
898
  if as_primary or disk.AssembleOnSecondary():
899
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
900
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
901
    result = r_dev
902
    if as_primary or disk.OpenOnSecondary():
903
      r_dev.Open()
904
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
905
                                as_primary, disk.iv_name)
906

    
907
  else:
908
    result = True
909
  return result
910

    
911

    
912
def AssembleBlockDevice(disk, owner, as_primary):
913
  """Activate a block device for an instance.
914

915
  This is a wrapper over _RecursiveAssembleBD.
916

917
  Returns:
918
    a /dev path for primary nodes
919
    True for secondary nodes
920

921
  """
922
  result = _RecursiveAssembleBD(disk, owner, as_primary)
923
  if isinstance(result, bdev.BlockDev):
924
    result = result.dev_path
925
  return result
926

    
927

    
928
def ShutdownBlockDevice(disk):
929
  """Shut down a block device.
930

931
  First, if the device is assembled (can `Attach()`), then the device
932
  is shutdown. Then the children of the device are shutdown.
933

934
  This function is called recursively. Note that we don't cache the
935
  children or such, as oppossed to assemble, shutdown of different
936
  devices doesn't require that the upper device was active.
937

938
  """
939
  r_dev = _RecursiveFindBD(disk)
940
  if r_dev is not None:
941
    r_path = r_dev.dev_path
942
    result = r_dev.Shutdown()
943
    if result:
944
      DevCacheManager.RemoveCache(r_path)
945
  else:
946
    result = True
947
  if disk.children:
948
    for child in disk.children:
949
      result = result and ShutdownBlockDevice(child)
950
  return result
951

    
952

    
953
def MirrorAddChildren(parent_cdev, new_cdevs):
954
  """Extend a mirrored block device.
955

956
  """
957
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
958
  if parent_bdev is None:
959
    logging.error("Can't find parent device")
960
    return False
961
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
962
  if new_bdevs.count(None) > 0:
963
    logging.error("Can't find new device(s) to add: %s:%s",
964
                  new_bdevs, new_cdevs)
965
    return False
966
  parent_bdev.AddChildren(new_bdevs)
967
  return True
968

    
969

    
970
def MirrorRemoveChildren(parent_cdev, new_cdevs):
971
  """Shrink a mirrored block device.
972

973
  """
974
  parent_bdev = _RecursiveFindBD(parent_cdev)
975
  if parent_bdev is None:
976
    logging.error("Can't find parent in remove children: %s", parent_cdev)
977
    return False
978
  devs = []
979
  for disk in new_cdevs:
980
    rpath = disk.StaticDevPath()
981
    if rpath is None:
982
      bd = _RecursiveFindBD(disk)
983
      if bd is None:
984
        logging.error("Can't find dynamic device %s while removing children",
985
                      disk)
986
        return False
987
      else:
988
        devs.append(bd.dev_path)
989
    else:
990
      devs.append(rpath)
991
  parent_bdev.RemoveChildren(devs)
992
  return True
993

    
994

    
995
def GetMirrorStatus(disks):
996
  """Get the mirroring status of a list of devices.
997

998
  Args:
999
    disks: list of `objects.Disk`
1000

1001
  Returns:
1002
    list of (mirror_done, estimated_time) tuples, which
1003
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1004

1005
  """
1006
  stats = []
1007
  for dsk in disks:
1008
    rbd = _RecursiveFindBD(dsk)
1009
    if rbd is None:
1010
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1011
    stats.append(rbd.CombinedSyncStatus())
1012
  return stats
1013

    
1014

    
1015
def _RecursiveFindBD(disk, allow_partial=False):
1016
  """Check if a device is activated.
1017

1018
  If so, return informations about the real device.
1019

1020
  Args:
1021
    disk: the objects.Disk instance
1022
    allow_partial: don't abort the find if a child of the
1023
                   device can't be found; this is intended to be
1024
                   used when repairing mirrors
1025

1026
  Returns:
1027
    None if the device can't be found
1028
    otherwise the device instance
1029

1030
  """
1031
  children = []
1032
  if disk.children:
1033
    for chdisk in disk.children:
1034
      children.append(_RecursiveFindBD(chdisk))
1035

    
1036
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1037

    
1038

    
1039
def FindBlockDevice(disk):
1040
  """Check if a device is activated.
1041

1042
  If so, return informations about the real device.
1043

1044
  Args:
1045
    disk: the objects.Disk instance
1046
  Returns:
1047
    None if the device can't be found
1048
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1049

1050
  """
1051
  rbd = _RecursiveFindBD(disk)
1052
  if rbd is None:
1053
    return rbd
1054
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1055

    
1056

    
1057
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1058
  """Write a file to the filesystem.
1059

1060
  This allows the master to overwrite(!) a file. It will only perform
1061
  the operation if the file belongs to a list of configuration files.
1062

1063
  """
1064
  if not os.path.isabs(file_name):
1065
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1066
                  file_name)
1067
    return False
1068

    
1069
  allowed_files = [
1070
    constants.CLUSTER_CONF_FILE,
1071
    constants.ETC_HOSTS,
1072
    constants.SSH_KNOWN_HOSTS_FILE,
1073
    constants.VNC_PASSWORD_FILE,
1074
    ]
1075

    
1076
  if file_name not in allowed_files:
1077
    logging.error("Filename passed to UploadFile not in allowed"
1078
                 " upload targets: '%s'", file_name)
1079
    return False
1080

    
1081
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1082
                  atime=atime, mtime=mtime)
1083
  return True
1084

    
1085

    
1086
def _ErrnoOrStr(err):
1087
  """Format an EnvironmentError exception.
1088

1089
  If the `err` argument has an errno attribute, it will be looked up
1090
  and converted into a textual EXXXX description. Otherwise the string
1091
  representation of the error will be returned.
1092

1093
  """
1094
  if hasattr(err, 'errno'):
1095
    detail = errno.errorcode[err.errno]
1096
  else:
1097
    detail = str(err)
1098
  return detail
1099

    
1100

    
1101
def _OSOndiskVersion(name, os_dir):
1102
  """Compute and return the API version of a given OS.
1103

1104
  This function will try to read the API version of the os given by
1105
  the 'name' parameter and residing in the 'os_dir' directory.
1106

1107
  Return value will be either an integer denoting the version or None in the
1108
  case when this is not a valid OS name.
1109

1110
  """
1111
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1112

    
1113
  try:
1114
    st = os.stat(api_file)
1115
  except EnvironmentError, err:
1116
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1117
                           " found (%s)" % _ErrnoOrStr(err))
1118

    
1119
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1120
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1121
                           " a regular file")
1122

    
1123
  try:
1124
    f = open(api_file)
1125
    try:
1126
      api_versions = f.readlines()
1127
    finally:
1128
      f.close()
1129
  except EnvironmentError, err:
1130
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1131
                           " API version (%s)" % _ErrnoOrStr(err))
1132

    
1133
  api_versions = [version.strip() for version in api_versions]
1134
  try:
1135
    api_versions = [int(version) for version in api_versions]
1136
  except (TypeError, ValueError), err:
1137
    raise errors.InvalidOS(name, os_dir,
1138
                           "API version is not integer (%s)" % str(err))
1139

    
1140
  return api_versions
1141

    
1142

    
1143
def DiagnoseOS(top_dirs=None):
1144
  """Compute the validity for all OSes.
1145

1146
  Returns an OS object for each name in all the given top directories
1147
  (if not given defaults to constants.OS_SEARCH_PATH)
1148

1149
  Returns:
1150
    list of OS objects
1151

1152
  """
1153
  if top_dirs is None:
1154
    top_dirs = constants.OS_SEARCH_PATH
1155

    
1156
  result = []
1157
  for dir_name in top_dirs:
1158
    if os.path.isdir(dir_name):
1159
      try:
1160
        f_names = utils.ListVisibleFiles(dir_name)
1161
      except EnvironmentError, err:
1162
        logging.exception("Can't list the OS directory %s", dir_name)
1163
        break
1164
      for name in f_names:
1165
        try:
1166
          os_inst = OSFromDisk(name, base_dir=dir_name)
1167
          result.append(os_inst)
1168
        except errors.InvalidOS, err:
1169
          result.append(objects.OS.FromInvalidOS(err))
1170

    
1171
  return result
1172

    
1173

    
1174
def OSFromDisk(name, base_dir=None):
1175
  """Create an OS instance from disk.
1176

1177
  This function will return an OS instance if the given name is a
1178
  valid OS name. Otherwise, it will raise an appropriate
1179
  `errors.InvalidOS` exception, detailing why this is not a valid
1180
  OS.
1181

1182
  @type base_dir: string
1183
  @keyword base_dir: Base directory containing OS installations.
1184
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1185

1186
  """
1187

    
1188
  if base_dir is None:
1189
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1190
    if os_dir is None:
1191
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1192
  else:
1193
    os_dir = os.path.sep.join([base_dir, name])
1194

    
1195
  api_versions = _OSOndiskVersion(name, os_dir)
1196

    
1197
  if constants.OS_API_VERSION not in api_versions:
1198
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1199
                           " (found %s want %s)"
1200
                           % (api_versions, constants.OS_API_VERSION))
1201

    
1202
  # OS Scripts dictionary, we will populate it with the actual script names
1203
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1204

    
1205
  for script in os_scripts:
1206
    os_scripts[script] = os.path.sep.join([os_dir, script])
1207

    
1208
    try:
1209
      st = os.stat(os_scripts[script])
1210
    except EnvironmentError, err:
1211
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1212
                             (script, _ErrnoOrStr(err)))
1213

    
1214
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1215
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1216
                             script)
1217

    
1218
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1219
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1220
                             script)
1221

    
1222

    
1223
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1224
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1225
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1226
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1227
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1228
                    api_versions=api_versions)
1229

    
1230
def OSEnvironment(instance, debug=0):
1231
  """Calculate the environment for an os script.
1232

1233
  @type instance: instance object
1234
  @param instance: target instance for the os script run
1235
  @type debug: integer
1236
  @param debug: debug level (0 or 1, for os api 10)
1237
  @rtype: dict
1238
  @return: dict of environment variables
1239

1240
  """
1241
  result = {}
1242
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1243
  result['INSTANCE_NAME'] = instance.name
1244
  result['HYPERVISOR'] = instance.hypervisor
1245
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1246
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1247
  result['DEBUG_LEVEL'] = '%d' % debug
1248
  for idx, disk in enumerate(instance.disks):
1249
    real_disk = _RecursiveFindBD(disk)
1250
    if real_disk is None:
1251
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1252
                                    str(disk))
1253
    real_disk.Open()
1254
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1255
    # FIXME: When disks will have read-only mode, populate this
1256
    result['DISK_%d_ACCESS' % idx] = 'W'
1257
    if constants.HV_DISK_TYPE in instance.hvparams:
1258
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1259
        instance.hvparams[constants.HV_DISK_TYPE]
1260
    if disk.dev_type in constants.LDS_BLOCK:
1261
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1262
    elif disk.dev_type == constants.LD_FILE:
1263
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1264
        'file:%s' % disk.physical_id[0]
1265
  for idx, nic in enumerate(instance.nics):
1266
    result['NIC_%d_MAC' % idx] = nic.mac
1267
    if nic.ip:
1268
      result['NIC_%d_IP' % idx] = nic.ip
1269
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1270
    if constants.HV_NIC_TYPE in instance.hvparams:
1271
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1272
        instance.hvparams[constants.HV_NIC_TYPE]
1273

    
1274
  return result
1275

    
1276
def GrowBlockDevice(disk, amount):
1277
  """Grow a stack of block devices.
1278

1279
  This function is called recursively, with the childrens being the
1280
  first one resize.
1281

1282
  Args:
1283
    disk: the disk to be grown
1284

1285
  Returns: a tuple of (status, result), with:
1286
    status: the result (true/false) of the operation
1287
    result: the error message if the operation failed, otherwise not used
1288

1289
  """
1290
  r_dev = _RecursiveFindBD(disk)
1291
  if r_dev is None:
1292
    return False, "Cannot find block device %s" % (disk,)
1293

    
1294
  try:
1295
    r_dev.Grow(amount)
1296
  except errors.BlockDeviceError, err:
1297
    return False, str(err)
1298

    
1299
  return True, None
1300

    
1301

    
1302
def SnapshotBlockDevice(disk):
1303
  """Create a snapshot copy of a block device.
1304

1305
  This function is called recursively, and the snapshot is actually created
1306
  just for the leaf lvm backend device.
1307

1308
  Args:
1309
    disk: the disk to be snapshotted
1310

1311
  Returns:
1312
    a config entry for the actual lvm device snapshotted.
1313

1314
  """
1315
  if disk.children:
1316
    if len(disk.children) == 1:
1317
      # only one child, let's recurse on it
1318
      return SnapshotBlockDevice(disk.children[0])
1319
    else:
1320
      # more than one child, choose one that matches
1321
      for child in disk.children:
1322
        if child.size == disk.size:
1323
          # return implies breaking the loop
1324
          return SnapshotBlockDevice(child)
1325
  elif disk.dev_type == constants.LD_LV:
1326
    r_dev = _RecursiveFindBD(disk)
1327
    if r_dev is not None:
1328
      # let's stay on the safe side and ask for the full size, for now
1329
      return r_dev.Snapshot(disk.size)
1330
    else:
1331
      return None
1332
  else:
1333
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1334
                                 " '%s' of type '%s'" %
1335
                                 (disk.unique_id, disk.dev_type))
1336

    
1337

    
1338
def ExportSnapshot(disk, dest_node, instance, cluster_name):
1339
  """Export a block device snapshot to a remote node.
1340

1341
  Args:
1342
    disk: the snapshot block device
1343
    dest_node: the node to send the image to
1344
    instance: instance being exported
1345

1346
  Returns:
1347
    True if successful, False otherwise.
1348

1349
  """
1350
  inst_os = OSFromDisk(instance.os)
1351
  export_script = inst_os.export_script
1352

    
1353
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1354
                                     instance.name, int(time.time()))
1355
  if not os.path.exists(constants.LOG_OS_DIR):
1356
    os.mkdir(constants.LOG_OS_DIR, 0750)
1357

    
1358
  real_os_dev = _RecursiveFindBD(disk)
1359
  if real_os_dev is None:
1360
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1361
                                  str(disk))
1362
  real_os_dev.Open()
1363

    
1364
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1365
  destfile = disk.physical_id[1]
1366

    
1367
  # the target command is built out of three individual commands,
1368
  # which are joined by pipes; we check each individual command for
1369
  # valid parameters
1370

    
1371
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1372
                               export_script, instance.name,
1373
                               real_os_dev.dev_path, logfile)
1374

    
1375
  comprcmd = "gzip"
1376

    
1377
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1378
                                destdir, destdir, destfile)
1379
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1380
                                                   constants.GANETI_RUNAS,
1381
                                                   destcmd)
1382

    
1383
  # all commands have been checked, so we're safe to combine them
1384
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1385

    
1386
  result = utils.RunCmd(command)
1387

    
1388
  if result.failed:
1389
    logging.error("os snapshot export command '%s' returned error: %s"
1390
                  " output: %s", command, result.fail_reason, result.output)
1391
    return False
1392

    
1393
  return True
1394

    
1395

    
1396
def FinalizeExport(instance, snap_disks):
1397
  """Write out the export configuration information.
1398

1399
  Args:
1400
    instance: instance configuration
1401
    snap_disks: snapshot block devices
1402

1403
  Returns:
1404
    False in case of error, True otherwise.
1405

1406
  """
1407
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1408
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1409

    
1410
  config = objects.SerializableConfigParser()
1411

    
1412
  config.add_section(constants.INISECT_EXP)
1413
  config.set(constants.INISECT_EXP, 'version', '0')
1414
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1415
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1416
  config.set(constants.INISECT_EXP, 'os', instance.os)
1417
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1418

    
1419
  config.add_section(constants.INISECT_INS)
1420
  config.set(constants.INISECT_INS, 'name', instance.name)
1421
  config.set(constants.INISECT_INS, 'memory', '%d' %
1422
             instance.beparams[constants.BE_MEMORY])
1423
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1424
             instance.beparams[constants.BE_VCPUS])
1425
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1426

    
1427
  nic_count = 0
1428
  for nic_count, nic in enumerate(instance.nics):
1429
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1430
               nic_count, '%s' % nic.mac)
1431
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1432
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1433
               '%s' % nic.bridge)
1434
  # TODO: redundant: on load can read nics until it doesn't exist
1435
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1436

    
1437
  disk_count = 0
1438
  for disk_count, disk in enumerate(snap_disks):
1439
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1440
               ('%s' % disk.iv_name))
1441
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1442
               ('%s' % disk.physical_id[1]))
1443
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1444
               ('%d' % disk.size))
1445
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1446

    
1447
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1448
  cfo = open(cff, 'w')
1449
  try:
1450
    config.write(cfo)
1451
  finally:
1452
    cfo.close()
1453

    
1454
  shutil.rmtree(finaldestdir, True)
1455
  shutil.move(destdir, finaldestdir)
1456

    
1457
  return True
1458

    
1459

    
1460
def ExportInfo(dest):
1461
  """Get export configuration information.
1462

1463
  Args:
1464
    dest: directory containing the export
1465

1466
  Returns:
1467
    A serializable config file containing the export info.
1468

1469
  """
1470
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1471

    
1472
  config = objects.SerializableConfigParser()
1473
  config.read(cff)
1474

    
1475
  if (not config.has_section(constants.INISECT_EXP) or
1476
      not config.has_section(constants.INISECT_INS)):
1477
    return None
1478

    
1479
  return config
1480

    
1481

    
1482
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1483
                         cluster_name):
1484
  """Import an os image into an instance.
1485

1486
  Args:
1487
    instance: the instance object
1488
    os_disk: the instance-visible name of the os device
1489
    swap_disk: the instance-visible name of the swap device
1490
    src_node: node holding the source image
1491
    src_image: path to the source image on src_node
1492

1493
  Returns:
1494
    False in case of error, True otherwise.
1495

1496
  """
1497
  inst_os = OSFromDisk(instance.os)
1498
  import_script = inst_os.import_script
1499

    
1500
  os_device = instance.FindDisk(os_disk)
1501
  if os_device is None:
1502
    logging.error("Can't find this device-visible name '%s'", os_disk)
1503
    return False
1504

    
1505
  swap_device = instance.FindDisk(swap_disk)
1506
  if swap_device is None:
1507
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1508
    return False
1509

    
1510
  real_os_dev = _RecursiveFindBD(os_device)
1511
  if real_os_dev is None:
1512
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1513
                                  str(os_device))
1514
  real_os_dev.Open()
1515

    
1516
  real_swap_dev = _RecursiveFindBD(swap_device)
1517
  if real_swap_dev is None:
1518
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1519
                                  str(swap_device))
1520
  real_swap_dev.Open()
1521

    
1522
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1523
                                        instance.name, int(time.time()))
1524
  if not os.path.exists(constants.LOG_OS_DIR):
1525
    os.mkdir(constants.LOG_OS_DIR, 0750)
1526

    
1527
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1528
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1529
                                                   constants.GANETI_RUNAS,
1530
                                                   destcmd)
1531

    
1532
  comprcmd = "gunzip"
1533
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1534
                               inst_os.path, import_script, instance.name,
1535
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1536
                               logfile)
1537

    
1538
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1539
  env = {'HYPERVISOR': instance.hypervisor}
1540

    
1541
  result = utils.RunCmd(command, env=env)
1542

    
1543
  if result.failed:
1544
    logging.error("os import command '%s' returned error: %s"
1545
                  " output: %s", command, result.fail_reason, result.output)
1546
    return False
1547

    
1548
  return True
1549

    
1550

    
1551
def ListExports():
1552
  """Return a list of exports currently available on this machine.
1553

1554
  """
1555
  if os.path.isdir(constants.EXPORT_DIR):
1556
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1557
  else:
1558
    return []
1559

    
1560

    
1561
def RemoveExport(export):
1562
  """Remove an existing export from the node.
1563

1564
  Args:
1565
    export: the name of the export to remove
1566

1567
  Returns:
1568
    False in case of error, True otherwise.
1569

1570
  """
1571
  target = os.path.join(constants.EXPORT_DIR, export)
1572

    
1573
  shutil.rmtree(target)
1574
  # TODO: catch some of the relevant exceptions and provide a pretty
1575
  # error message if rmtree fails.
1576

    
1577
  return True
1578

    
1579

    
1580
def RenameBlockDevices(devlist):
1581
  """Rename a list of block devices.
1582

1583
  The devlist argument is a list of tuples (disk, new_logical,
1584
  new_physical). The return value will be a combined boolean result
1585
  (True only if all renames succeeded).
1586

1587
  """
1588
  result = True
1589
  for disk, unique_id in devlist:
1590
    dev = _RecursiveFindBD(disk)
1591
    if dev is None:
1592
      result = False
1593
      continue
1594
    try:
1595
      old_rpath = dev.dev_path
1596
      dev.Rename(unique_id)
1597
      new_rpath = dev.dev_path
1598
      if old_rpath != new_rpath:
1599
        DevCacheManager.RemoveCache(old_rpath)
1600
        # FIXME: we should add the new cache information here, like:
1601
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1602
        # but we don't have the owner here - maybe parse from existing
1603
        # cache? for now, we only lose lvm data when we rename, which
1604
        # is less critical than DRBD or MD
1605
    except errors.BlockDeviceError, err:
1606
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1607
      result = False
1608
  return result
1609

    
1610

    
1611
def _TransformFileStorageDir(file_storage_dir):
1612
  """Checks whether given file_storage_dir is valid.
1613

1614
  Checks wheter the given file_storage_dir is within the cluster-wide
1615
  default file_storage_dir stored in SimpleStore. Only paths under that
1616
  directory are allowed.
1617

1618
  Args:
1619
    file_storage_dir: string with path
1620

1621
  Returns:
1622
    normalized file_storage_dir (string) if valid, None otherwise
1623

1624
  """
1625
  cfg = _GetConfig()
1626
  file_storage_dir = os.path.normpath(file_storage_dir)
1627
  base_file_storage_dir = cfg.GetFileStorageDir()
1628
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1629
      base_file_storage_dir):
1630
    logging.error("file storage directory '%s' is not under base file"
1631
                  " storage directory '%s'",
1632
                  file_storage_dir, base_file_storage_dir)
1633
    return None
1634
  return file_storage_dir
1635

    
1636

    
1637
def CreateFileStorageDir(file_storage_dir):
1638
  """Create file storage directory.
1639

1640
  Args:
1641
    file_storage_dir: string containing the path
1642

1643
  Returns:
1644
    tuple with first element a boolean indicating wheter dir
1645
    creation was successful or not
1646

1647
  """
1648
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1649
  result = True,
1650
  if not file_storage_dir:
1651
    result = False,
1652
  else:
1653
    if os.path.exists(file_storage_dir):
1654
      if not os.path.isdir(file_storage_dir):
1655
        logging.error("'%s' is not a directory", file_storage_dir)
1656
        result = False,
1657
    else:
1658
      try:
1659
        os.makedirs(file_storage_dir, 0750)
1660
      except OSError, err:
1661
        logging.error("Cannot create file storage directory '%s': %s",
1662
                      file_storage_dir, err)
1663
        result = False,
1664
  return result
1665

    
1666

    
1667
def RemoveFileStorageDir(file_storage_dir):
1668
  """Remove file storage directory.
1669

1670
  Remove it only if it's empty. If not log an error and return.
1671

1672
  Args:
1673
    file_storage_dir: string containing the path
1674

1675
  Returns:
1676
    tuple with first element a boolean indicating wheter dir
1677
    removal was successful or not
1678

1679
  """
1680
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1681
  result = True,
1682
  if not file_storage_dir:
1683
    result = False,
1684
  else:
1685
    if os.path.exists(file_storage_dir):
1686
      if not os.path.isdir(file_storage_dir):
1687
        logging.error("'%s' is not a directory", file_storage_dir)
1688
        result = False,
1689
      # deletes dir only if empty, otherwise we want to return False
1690
      try:
1691
        os.rmdir(file_storage_dir)
1692
      except OSError, err:
1693
        logging.exception("Cannot remove file storage directory '%s'",
1694
                          file_storage_dir)
1695
        result = False,
1696
  return result
1697

    
1698

    
1699
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1700
  """Rename the file storage directory.
1701

1702
  Args:
1703
    old_file_storage_dir: string containing the old path
1704
    new_file_storage_dir: string containing the new path
1705

1706
  Returns:
1707
    tuple with first element a boolean indicating wheter dir
1708
    rename was successful or not
1709

1710
  """
1711
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1712
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1713
  result = True,
1714
  if not old_file_storage_dir or not new_file_storage_dir:
1715
    result = False,
1716
  else:
1717
    if not os.path.exists(new_file_storage_dir):
1718
      if os.path.isdir(old_file_storage_dir):
1719
        try:
1720
          os.rename(old_file_storage_dir, new_file_storage_dir)
1721
        except OSError, err:
1722
          logging.exception("Cannot rename '%s' to '%s'",
1723
                            old_file_storage_dir, new_file_storage_dir)
1724
          result =  False,
1725
      else:
1726
        logging.error("'%s' is not a directory", old_file_storage_dir)
1727
        result = False,
1728
    else:
1729
      if os.path.exists(old_file_storage_dir):
1730
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1731
                      old_file_storage_dir, new_file_storage_dir)
1732
        result = False,
1733
  return result
1734

    
1735

    
1736
def _IsJobQueueFile(file_name):
1737
  """Checks whether the given filename is in the queue directory.
1738

1739
  """
1740
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1741
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1742

    
1743
  if not result:
1744
    logging.error("'%s' is not a file in the queue directory",
1745
                  file_name)
1746

    
1747
  return result
1748

    
1749

    
1750
def JobQueueUpdate(file_name, content):
1751
  """Updates a file in the queue directory.
1752

1753
  """
1754
  if not _IsJobQueueFile(file_name):
1755
    return False
1756

    
1757
  # Write and replace the file atomically
1758
  utils.WriteFile(file_name, data=content)
1759

    
1760
  return True
1761

    
1762

    
1763
def JobQueueRename(old, new):
1764
  """Renames a job queue file.
1765

1766
  """
1767
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1768
    return False
1769

    
1770
  os.rename(old, new)
1771

    
1772
  return True
1773

    
1774

    
1775
def JobQueueSetDrainFlag(drain_flag):
1776
  """Set the drain flag for the queue.
1777

1778
  This will set or unset the queue drain flag.
1779

1780
  @type drain_flag: bool
1781
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
1782

1783
  """
1784
  if drain_flag:
1785
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1786
  else:
1787
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1788

    
1789
  return True
1790

    
1791

    
1792
def CloseBlockDevices(disks):
1793
  """Closes the given block devices.
1794

1795
  This means they will be switched to secondary mode (in case of DRBD).
1796

1797
  """
1798
  bdevs = []
1799
  for cf in disks:
1800
    rd = _RecursiveFindBD(cf)
1801
    if rd is None:
1802
      return (False, "Can't find device %s" % cf)
1803
    bdevs.append(rd)
1804

    
1805
  msg = []
1806
  for rd in bdevs:
1807
    try:
1808
      rd.Close()
1809
    except errors.BlockDeviceError, err:
1810
      msg.append(str(err))
1811
  if msg:
1812
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1813
  else:
1814
    return (True, "All devices secondary")
1815

    
1816

    
1817
def ValidateHVParams(hvname, hvparams):
1818
  """Validates the given hypervisor parameters.
1819

1820
  @type hvname: string
1821
  @param hvname: the hypervisor name
1822
  @type hvparams: dict
1823
  @param hvparams: the hypervisor parameters to be validated
1824
  @rtype: tuple (bool, str)
1825
  @return: tuple of (success, message)
1826

1827
  """
1828
  try:
1829
    hv_type = hypervisor.GetHypervisor(hvname)
1830
    hv_type.ValidateParameters(hvparams)
1831
    return (True, "Validation passed")
1832
  except errors.HypervisorError, err:
1833
    return (False, str(err))
1834

    
1835

    
1836
class HooksRunner(object):
1837
  """Hook runner.
1838

1839
  This class is instantiated on the node side (ganeti-noded) and not on
1840
  the master side.
1841

1842
  """
1843
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1844

    
1845
  def __init__(self, hooks_base_dir=None):
1846
    """Constructor for hooks runner.
1847

1848
    Args:
1849
      - hooks_base_dir: if not None, this overrides the
1850
        constants.HOOKS_BASE_DIR (useful for unittests)
1851

1852
    """
1853
    if hooks_base_dir is None:
1854
      hooks_base_dir = constants.HOOKS_BASE_DIR
1855
    self._BASE_DIR = hooks_base_dir
1856

    
1857
  @staticmethod
1858
  def ExecHook(script, env):
1859
    """Exec one hook script.
1860

1861
    Args:
1862
     - script: the full path to the script
1863
     - env: the environment with which to exec the script
1864

1865
    """
1866
    # exec the process using subprocess and log the output
1867
    fdstdin = None
1868
    try:
1869
      fdstdin = open("/dev/null", "r")
1870
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1871
                               stderr=subprocess.STDOUT, close_fds=True,
1872
                               shell=False, cwd="/", env=env)
1873
      output = ""
1874
      try:
1875
        output = child.stdout.read(4096)
1876
        child.stdout.close()
1877
      except EnvironmentError, err:
1878
        output += "Hook script error: %s" % str(err)
1879

    
1880
      while True:
1881
        try:
1882
          result = child.wait()
1883
          break
1884
        except EnvironmentError, err:
1885
          if err.errno == errno.EINTR:
1886
            continue
1887
          raise
1888
    finally:
1889
      # try not to leak fds
1890
      for fd in (fdstdin, ):
1891
        if fd is not None:
1892
          try:
1893
            fd.close()
1894
          except EnvironmentError, err:
1895
            # just log the error
1896
            #logging.exception("Error while closing fd %s", fd)
1897
            pass
1898

    
1899
    return result == 0, output
1900

    
1901
  def RunHooks(self, hpath, phase, env):
1902
    """Run the scripts in the hooks directory.
1903

1904
    This method will not be usually overriden by child opcodes.
1905

1906
    """
1907
    if phase == constants.HOOKS_PHASE_PRE:
1908
      suffix = "pre"
1909
    elif phase == constants.HOOKS_PHASE_POST:
1910
      suffix = "post"
1911
    else:
1912
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1913
    rr = []
1914

    
1915
    subdir = "%s-%s.d" % (hpath, suffix)
1916
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1917
    try:
1918
      dir_contents = utils.ListVisibleFiles(dir_name)
1919
    except OSError, err:
1920
      # must log
1921
      return rr
1922

    
1923
    # we use the standard python sort order,
1924
    # so 00name is the recommended naming scheme
1925
    dir_contents.sort()
1926
    for relname in dir_contents:
1927
      fname = os.path.join(dir_name, relname)
1928
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1929
          self.RE_MASK.match(relname) is not None):
1930
        rrval = constants.HKR_SKIP
1931
        output = ""
1932
      else:
1933
        result, output = self.ExecHook(fname, env)
1934
        if not result:
1935
          rrval = constants.HKR_FAIL
1936
        else:
1937
          rrval = constants.HKR_SUCCESS
1938
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1939

    
1940
    return rr
1941

    
1942

    
1943
class IAllocatorRunner(object):
1944
  """IAllocator runner.
1945

1946
  This class is instantiated on the node side (ganeti-noded) and not on
1947
  the master side.
1948

1949
  """
1950
  def Run(self, name, idata):
1951
    """Run an iallocator script.
1952

1953
    Return value: tuple of:
1954
       - run status (one of the IARUN_ constants)
1955
       - stdout
1956
       - stderr
1957
       - fail reason (as from utils.RunResult)
1958

1959
    """
1960
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1961
                                  os.path.isfile)
1962
    if alloc_script is None:
1963
      return (constants.IARUN_NOTFOUND, None, None, None)
1964

    
1965
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1966
    try:
1967
      os.write(fd, idata)
1968
      os.close(fd)
1969
      result = utils.RunCmd([alloc_script, fin_name])
1970
      if result.failed:
1971
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1972
                result.fail_reason)
1973
    finally:
1974
      os.unlink(fin_name)
1975

    
1976
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1977

    
1978

    
1979
class DevCacheManager(object):
1980
  """Simple class for managing a cache of block device information.
1981

1982
  """
1983
  _DEV_PREFIX = "/dev/"
1984
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1985

    
1986
  @classmethod
1987
  def _ConvertPath(cls, dev_path):
1988
    """Converts a /dev/name path to the cache file name.
1989

1990
    This replaces slashes with underscores and strips the /dev
1991
    prefix. It then returns the full path to the cache file
1992

1993
    """
1994
    if dev_path.startswith(cls._DEV_PREFIX):
1995
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1996
    dev_path = dev_path.replace("/", "_")
1997
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1998
    return fpath
1999

    
2000
  @classmethod
2001
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2002
    """Updates the cache information for a given device.
2003

2004
    """
2005
    if dev_path is None:
2006
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2007
      return
2008
    fpath = cls._ConvertPath(dev_path)
2009
    if on_primary:
2010
      state = "primary"
2011
    else:
2012
      state = "secondary"
2013
    if iv_name is None:
2014
      iv_name = "not_visible"
2015
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2016
    try:
2017
      utils.WriteFile(fpath, data=fdata)
2018
    except EnvironmentError, err:
2019
      logging.exception("Can't update bdev cache for %s", dev_path)
2020

    
2021
  @classmethod
2022
  def RemoveCache(cls, dev_path):
2023
    """Remove data for a dev_path.
2024

2025
    """
2026
    if dev_path is None:
2027
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2028
      return
2029
    fpath = cls._ConvertPath(dev_path)
2030
    try:
2031
      utils.RemoveFile(fpath)
2032
    except EnvironmentError, err:
2033
      logging.exception("Can't update bdev cache for %s", dev_path)