Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ b1206984

History | View | Annotate | Download (59 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetConfig():
48
  return ssconf.SimpleConfigReader()
49

    
50

    
51
def _GetSshRunner(cluster_name):
52
  return ssh.SshRunner(cluster_name)
53

    
54

    
55
def _CleanDirectory(path, exclude=[]):
56
  """Removes all regular files in a directory.
57

58
  @param exclude: List of files to be excluded.
59
  @type exclude: list
60

61
  """
62
  if not os.path.isdir(path):
63
    return
64

    
65
  # Normalize excluded paths
66
  exclude = [os.path.normpath(i) for i in exclude]
67

    
68
  for rel_name in utils.ListVisibleFiles(path):
69
    full_name = os.path.normpath(os.path.join(path, rel_name))
70
    if full_name in exclude:
71
      continue
72
    if os.path.isfile(full_name) and not os.path.islink(full_name):
73
      utils.RemoveFile(full_name)
74

    
75

    
76
def JobQueuePurge():
77
  """Removes job queue files and archived jobs
78

79
  """
80
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82

    
83

    
84
def GetMasterInfo():
85
  """Returns master information.
86

87
  This is an utility function to compute master information, either
88
  for consumption here or from the node daemon.
89

90
  @rtype: tuple
91
  @return: (master_netdev, master_ip, master_name)
92

93
  """
94
  try:
95
    cfg = _GetConfig()
96
    master_netdev = cfg.GetMasterNetdev()
97
    master_ip = cfg.GetMasterIP()
98
    master_node = cfg.GetMasterNode()
99
  except errors.ConfigurationError, err:
100
    logging.exception("Cluster configuration incomplete")
101
    return (None, None)
102
  return (master_netdev, master_ip, master_node)
103

    
104

    
105
def StartMaster(start_daemons):
106
  """Activate local node as master node.
107

108
  The function will always try activate the IP address of the master
109
  (if someone else has it, then it won't). Then, if the start_daemons
110
  parameter is True, it will also start the master daemons
111
  (ganet-masterd and ganeti-rapi).
112

113
  """
114
  ok = True
115
  master_netdev, master_ip, _ = GetMasterInfo()
116
  if not master_netdev:
117
    return False
118

    
119
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120
    if utils.OwnIpAddress(master_ip):
121
      # we already have the ip:
122
      logging.debug("Already started")
123
    else:
124
      logging.error("Someone else has the master ip, not activating")
125
      ok = False
126
  else:
127
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128
                           "dev", master_netdev, "label",
129
                           "%s:0" % master_netdev])
130
    if result.failed:
131
      logging.error("Can't activate master IP: %s", result.output)
132
      ok = False
133

    
134
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135
                           "-s", master_ip, master_ip])
136
    # we'll ignore the exit code of arping
137

    
138
  # and now start the master and rapi daemons
139
  if start_daemons:
140
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
141
      result = utils.RunCmd([daemon])
142
      if result.failed:
143
        logging.error("Can't start daemon %s: %s", daemon, result.output)
144
        ok = False
145
  return ok
146

    
147

    
148
def StopMaster(stop_daemons):
149
  """Deactivate this node as master.
150

151
  The function will always try to deactivate the IP address of the
152
  master. Then, if the stop_daemons parameter is True, it will also
153
  stop the master daemons (ganet-masterd and ganeti-rapi).
154

155
  """
156
  master_netdev, master_ip, _ = GetMasterInfo()
157
  if not master_netdev:
158
    return False
159

    
160
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161
                         "dev", master_netdev])
162
  if result.failed:
163
    logging.error("Can't remove the master IP, error: %s", result.output)
164
    # but otherwise ignore the failure
165

    
166
  if stop_daemons:
167
    # stop/kill the rapi and the master daemon
168
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
170

    
171
  return True
172

    
173

    
174
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175
  """Joins this node to the cluster.
176

177
  This does the following:
178
      - updates the hostkeys of the machine (rsa and dsa)
179
      - adds the ssh private key to the user
180
      - adds the ssh public key to the users' authorized_keys file
181

182
  """
183
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187
  for name, content, mode in sshd_keys:
188
    utils.WriteFile(name, data=content, mode=mode)
189

    
190
  try:
191
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
192
                                                    mkdir=True)
193
  except errors.OpExecError, err:
194
    logging.exception("Error while processing user ssh files")
195
    return False
196

    
197
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198
    utils.WriteFile(name, data=content, mode=0600)
199

    
200
  utils.AddAuthorizedKey(auth_keys, sshpub)
201

    
202
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
203

    
204
  return True
205

    
206

    
207
def LeaveCluster():
208
  """Cleans up the current node and prepares it to be removed from the cluster.
209

210
  """
211
  _CleanDirectory(constants.DATA_DIR)
212
  JobQueuePurge()
213

    
214
  try:
215
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216
  except errors.OpExecError:
217
    logging.exception("Error while processing ssh files")
218
    return
219

    
220
  f = open(pub_key, 'r')
221
  try:
222
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
223
  finally:
224
    f.close()
225

    
226
  utils.RemoveFile(priv_key)
227
  utils.RemoveFile(pub_key)
228

    
229
  # Return a reassuring string to the caller, and quit
230
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
231

    
232

    
233
def GetNodeInfo(vgname, hypervisor_type):
234
  """Gives back a hash with different informations about the node.
235

236
  @type vgname: C{string}
237
  @param vgname: the name of the volume group to ask for disk space information
238
  @type hypervisor_type: C{str}
239
  @param hypervisor_type: the name of the hypervisor to ask for
240
      memory information
241
  @rtype: C{dict}
242
  @return: dictionary with the following keys:
243
      - vg_size is the size of the configured volume group in MiB
244
      - vg_free is the free size of the volume group in MiB
245
      - memory_dom0 is the memory allocated for domain0 in MiB
246
      - memory_free is the currently available (free) ram in MiB
247
      - memory_total is the total number of ram in MiB
248

249
  """
250
  outputarray = {}
251
  vginfo = _GetVGInfo(vgname)
252
  outputarray['vg_size'] = vginfo['vg_size']
253
  outputarray['vg_free'] = vginfo['vg_free']
254

    
255
  hyper = hypervisor.GetHypervisor(hypervisor_type)
256
  hyp_info = hyper.GetNodeInfo()
257
  if hyp_info is not None:
258
    outputarray.update(hyp_info)
259

    
260
  f = open("/proc/sys/kernel/random/boot_id", 'r')
261
  try:
262
    outputarray["bootid"] = f.read(128).rstrip("\n")
263
  finally:
264
    f.close()
265

    
266
  return outputarray
267

    
268

    
269
def VerifyNode(what, cluster_name):
270
  """Verify the status of the local node.
271

272
  Based on the input L{what} parameter, various checks are done on the
273
  local node.
274

275
  If the I{filelist} key is present, this list of
276
  files is checksummed and the file/checksum pairs are returned.
277

278
  If the I{nodelist} key is present, we check that we have
279
  connectivity via ssh with the target nodes (and check the hostname
280
  report).
281

282
  If the I{node-net-test} key is present, we check that we have
283
  connectivity to the given nodes via both primary IP and, if
284
  applicable, secondary IPs.
285

286
  @type what: C{dict}
287
  @param what: a dictionary of things to check:
288
      - filelist: list of files for which to compute checksums
289
      - nodelist: list of nodes we should check ssh communication with
290
      - node-net-test: list of nodes we should check node daemon port
291
        connectivity with
292
      - hypervisor: list with hypervisors to run the verify for
293

294
  """
295
  result = {}
296

    
297
  if 'hypervisor' in what:
298
    result['hypervisor'] = my_dict = {}
299
    for hv_name in what['hypervisor']:
300
      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
301

    
302
  if 'filelist' in what:
303
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
304

    
305
  if 'nodelist' in what:
306
    result['nodelist'] = {}
307
    random.shuffle(what['nodelist'])
308
    for node in what['nodelist']:
309
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
310
      if not success:
311
        result['nodelist'][node] = message
312
  if 'node-net-test' in what:
313
    result['node-net-test'] = {}
314
    my_name = utils.HostInfo().name
315
    my_pip = my_sip = None
316
    for name, pip, sip in what['node-net-test']:
317
      if name == my_name:
318
        my_pip = pip
319
        my_sip = sip
320
        break
321
    if not my_pip:
322
      result['node-net-test'][my_name] = ("Can't find my own"
323
                                          " primary/secondary IP"
324
                                          " in the node list")
325
    else:
326
      port = utils.GetNodeDaemonPort()
327
      for name, pip, sip in what['node-net-test']:
328
        fail = []
329
        if not utils.TcpPing(pip, port, source=my_pip):
330
          fail.append("primary")
331
        if sip != pip:
332
          if not utils.TcpPing(sip, port, source=my_sip):
333
            fail.append("secondary")
334
        if fail:
335
          result['node-net-test'][name] = ("failure using the %s"
336
                                           " interface(s)" %
337
                                           " and ".join(fail))
338

    
339
  return result
340

    
341

    
342
def GetVolumeList(vg_name):
343
  """Compute list of logical volumes and their size.
344

345
  Returns:
346
    dictionary of all partions (key) with their size (in MiB), inactive
347
    and online status:
348
    {'test1': ('20.06', True, True)}
349

350
  """
351
  lvs = {}
352
  sep = '|'
353
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354
                         "--separator=%s" % sep,
355
                         "-olv_name,lv_size,lv_attr", vg_name])
356
  if result.failed:
357
    logging.error("Failed to list logical volumes, lvs output: %s",
358
                  result.output)
359
    return result.output
360

    
361
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362
  for line in result.stdout.splitlines():
363
    line = line.strip()
364
    match = valid_line_re.match(line)
365
    if not match:
366
      logging.error("Invalid line returned from lvs output: '%s'", line)
367
      continue
368
    name, size, attr = match.groups()
369
    inactive = attr[4] == '-'
370
    online = attr[5] == 'o'
371
    lvs[name] = (size, inactive, online)
372

    
373
  return lvs
374

    
375

    
376
def ListVolumeGroups():
377
  """List the volume groups and their size.
378

379
  Returns:
380
    Dictionary with keys volume name and values the size of the volume
381

382
  """
383
  return utils.ListVolumeGroups()
384

    
385

    
386
def NodeVolumes():
387
  """List all volumes on this node.
388

389
  """
390
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
391
                         "--separator=|",
392
                         "--options=lv_name,lv_size,devices,vg_name"])
393
  if result.failed:
394
    logging.error("Failed to list logical volumes, lvs output: %s",
395
                  result.output)
396
    return {}
397

    
398
  def parse_dev(dev):
399
    if '(' in dev:
400
      return dev.split('(')[0]
401
    else:
402
      return dev
403

    
404
  def map_line(line):
405
    return {
406
      'name': line[0].strip(),
407
      'size': line[1].strip(),
408
      'dev': parse_dev(line[2].strip()),
409
      'vg': line[3].strip(),
410
    }
411

    
412
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
413
          if line.count('|') >= 3]
414

    
415

    
416
def BridgesExist(bridges_list):
417
  """Check if a list of bridges exist on the current node.
418

419
  @rtype: boolean
420
  @return: C{True} if all of them exist, C{False} otherwise
421

422
  """
423
  for bridge in bridges_list:
424
    if not utils.BridgeExists(bridge):
425
      return False
426

    
427
  return True
428

    
429

    
430
def GetInstanceList(hypervisor_list):
431
  """Provides a list of instances.
432

433
  @type hypervisor_list: list
434
  @param hypervisor_list: the list of hypervisors to query information
435

436
  @rtype: list
437
  @return: a list of all running instances on the current node
438
             - instance1.example.com
439
             - instance2.example.com
440

441
  """
442
  results = []
443
  for hname in hypervisor_list:
444
    try:
445
      names = hypervisor.GetHypervisor(hname).ListInstances()
446
      results.extend(names)
447
    except errors.HypervisorError, err:
448
      logging.exception("Error enumerating instances for hypevisor %s", hname)
449
      # FIXME: should we somehow not propagate this to the master?
450
      raise
451

    
452
  return results
453

    
454

    
455
def GetInstanceInfo(instance, hname):
456
  """Gives back the informations about an instance as a dictionary.
457

458
  @type instance: string
459
  @param instance: the instance name
460
  @type hname: string
461
  @param hname: the hypervisor type of the instance
462

463
  @rtype: dict
464
  @return: dictionary with the following keys:
465
      - memory: memory size of instance (int)
466
      - state: xen state of instance (string)
467
      - time: cpu time of instance (float)
468

469
  """
470
  output = {}
471

    
472
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473
  if iinfo is not None:
474
    output['memory'] = iinfo[2]
475
    output['state'] = iinfo[4]
476
    output['time'] = iinfo[5]
477

    
478
  return output
479

    
480

    
481
def GetAllInstancesInfo(hypervisor_list):
482
  """Gather data about all instances.
483

484
  This is the equivalent of `GetInstanceInfo()`, except that it
485
  computes data for all instances at once, thus being faster if one
486
  needs data about more than one instance.
487

488
  @type hypervisor_list: list
489
  @param hypervisor_list: list of hypervisors to query for instance data
490

491
  @rtype: dict of dicts
492
  @return: dictionary of instance: data, with data having the following keys:
493
      - memory: memory size of instance (int)
494
      - state: xen state of instance (string)
495
      - time: cpu time of instance (float)
496
      - vcpuus: the number of vcpus
497

498
  """
499
  output = {}
500

    
501
  for hname in hypervisor_list:
502
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
503
    if iinfo:
504
      for name, inst_id, memory, vcpus, state, times in iinfo:
505
        value = {
506
          'memory': memory,
507
          'vcpus': vcpus,
508
          'state': state,
509
          'time': times,
510
          }
511
        if name in output and output[name] != value:
512
          raise errors.HypervisorError("Instance %s running duplicate"
513
                                       " with different parameters" % name)
514
        output[name] = value
515

    
516
  return output
517

    
518

    
519
def AddOSToInstance(instance):
520
  """Add an OS to an instance.
521

522
  @type instance: L{objects.Instance}
523
  @param instance: Instance whose OS is to be installed
524

525
  """
526
  inst_os = OSFromDisk(instance.os)
527

    
528
  create_script = inst_os.create_script
529
  create_env = OSEnvironment(instance)
530

    
531
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
532
                                     instance.name, int(time.time()))
533
  if not os.path.exists(constants.LOG_OS_DIR):
534
    os.mkdir(constants.LOG_OS_DIR, 0750)
535

    
536
  command = utils.BuildShellCmd("cd %s && %s &>%s",
537
                                inst_os.path, create_script, logfile)
538

    
539
  result = utils.RunCmd(command, env=create_env)
540
  if result.failed:
541
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
542
                  " output: %s", command, result.fail_reason, logfile,
543
                  result.output)
544
    return False
545

    
546
  return True
547

    
548

    
549
def RunRenameInstance(instance, old_name):
550
  """Run the OS rename script for an instance.
551

552
  @type instance: L{objects.Instance}
553
  @param instance: Instance whose OS is to be installed
554
  @type old_name: string
555
  @param old_name: previous instance name
556

557
  """
558
  inst_os = OSFromDisk(instance.os)
559

    
560
  script = inst_os.rename_script
561
  rename_env = OSEnvironment(instance)
562
  rename_env['OLD_INSTANCE_NAME'] = old_name
563

    
564
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
565
                                           old_name,
566
                                           instance.name, int(time.time()))
567
  if not os.path.exists(constants.LOG_OS_DIR):
568
    os.mkdir(constants.LOG_OS_DIR, 0750)
569

    
570
  command = utils.BuildShellCmd("cd %s && %s &>%s",
571
                                inst_os.path, script, logfile)
572

    
573
  result = utils.RunCmd(command, env=rename_env)
574

    
575
  if result.failed:
576
    logging.error("os create command '%s' returned error: %s output: %s",
577
                  command, result.fail_reason, result.output)
578
    return False
579

    
580
  return True
581

    
582

    
583
def _GetVGInfo(vg_name):
584
  """Get informations about the volume group.
585

586
  Args:
587
    vg_name: the volume group
588

589
  Returns:
590
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
591
    where
592
    vg_size is the total size of the volume group in MiB
593
    vg_free is the free size of the volume group in MiB
594
    pv_count are the number of physical disks in that vg
595

596
  If an error occurs during gathering of data, we return the same dict
597
  with keys all set to None.
598

599
  """
600
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
601

    
602
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
603
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
604

    
605
  if retval.failed:
606
    logging.error("volume group %s not present", vg_name)
607
    return retdic
608
  valarr = retval.stdout.strip().rstrip(':').split(':')
609
  if len(valarr) == 3:
610
    try:
611
      retdic = {
612
        "vg_size": int(round(float(valarr[0]), 0)),
613
        "vg_free": int(round(float(valarr[1]), 0)),
614
        "pv_count": int(valarr[2]),
615
        }
616
    except ValueError, err:
617
      logging.exception("Fail to parse vgs output")
618
  else:
619
    logging.error("vgs output has the wrong number of fields (expected"
620
                  " three): %s", str(valarr))
621
  return retdic
622

    
623

    
624
def _GatherBlockDevs(instance):
625
  """Set up an instance's block device(s).
626

627
  This is run on the primary node at instance startup. The block
628
  devices must be already assembled.
629

630
  """
631
  block_devices = []
632
  for disk in instance.disks:
633
    device = _RecursiveFindBD(disk)
634
    if device is None:
635
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
636
                                    str(disk))
637
    device.Open()
638
    block_devices.append((disk, device))
639
  return block_devices
640

    
641

    
642
def StartInstance(instance, extra_args):
643
  """Start an instance.
644

645
  @type instance: instance object
646
  @param instance: the instance object
647
  @rtype: boolean
648
  @return: whether the startup was successful or not
649

650
  """
651
  running_instances = GetInstanceList([instance.hypervisor])
652

    
653
  if instance.name in running_instances:
654
    return True
655

    
656
  block_devices = _GatherBlockDevs(instance)
657
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
658

    
659
  try:
660
    hyper.StartInstance(instance, block_devices, extra_args)
661
  except errors.HypervisorError, err:
662
    logging.exception("Failed to start instance")
663
    return False
664

    
665
  return True
666

    
667

    
668
def ShutdownInstance(instance):
669
  """Shut an instance down.
670

671
  @type instance: instance object
672
  @param instance: the instance object
673
  @rtype: boolean
674
  @return: whether the startup was successful or not
675

676
  """
677
  hv_name = instance.hypervisor
678
  running_instances = GetInstanceList([hv_name])
679

    
680
  if instance.name not in running_instances:
681
    return True
682

    
683
  hyper = hypervisor.GetHypervisor(hv_name)
684
  try:
685
    hyper.StopInstance(instance)
686
  except errors.HypervisorError, err:
687
    logging.error("Failed to stop instance")
688
    return False
689

    
690
  # test every 10secs for 2min
691
  shutdown_ok = False
692

    
693
  time.sleep(1)
694
  for dummy in range(11):
695
    if instance.name not in GetInstanceList([hv_name]):
696
      break
697
    time.sleep(10)
698
  else:
699
    # the shutdown did not succeed
700
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
701

    
702
    try:
703
      hyper.StopInstance(instance, force=True)
704
    except errors.HypervisorError, err:
705
      logging.exception("Failed to stop instance")
706
      return False
707

    
708
    time.sleep(1)
709
    if instance.name in GetInstanceList([hv_name]):
710
      logging.error("could not shutdown instance '%s' even by destroy",
711
                    instance.name)
712
      return False
713

    
714
  return True
715

    
716

    
717
def RebootInstance(instance, reboot_type, extra_args):
718
  """Reboot an instance.
719

720
  Args:
721
    instance    - name of instance to reboot
722
    reboot_type - how to reboot [soft,hard,full]
723

724
  """
725
  running_instances = GetInstanceList([instance.hypervisor])
726

    
727
  if instance.name not in running_instances:
728
    logging.error("Cannot reboot instance that is not running")
729
    return False
730

    
731
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
732
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
733
    try:
734
      hyper.RebootInstance(instance)
735
    except errors.HypervisorError, err:
736
      logging.exception("Failed to soft reboot instance")
737
      return False
738
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
739
    try:
740
      ShutdownInstance(instance)
741
      StartInstance(instance, extra_args)
742
    except errors.HypervisorError, err:
743
      logging.exception("Failed to hard reboot instance")
744
      return False
745
  else:
746
    raise errors.ParameterError("reboot_type invalid")
747

    
748
  return True
749

    
750

    
751
def MigrateInstance(instance, target, live):
752
  """Migrates an instance to another node.
753

754
  @type instance: L{objects.Instance}
755
  @param instance: the instance definition
756
  @type target: string
757
  @param target: the target node name
758
  @type live: boolean
759
  @param live: whether the migration should be done live or not (the
760
      interpretation of this parameter is left to the hypervisor)
761
  @rtype: tuple
762
  @return: a tuple of (success, msg) where:
763
      - succes is a boolean denoting the success/failure of the operation
764
      - msg is a string with details in case of failure
765

766
  """
767
  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
768

    
769
  try:
770
    hyper.MigrateInstance(instance.name, target, live)
771
  except errors.HypervisorError, err:
772
    msg = "Failed to migrate instance: %s" % str(err)
773
    logging.error(msg)
774
    return (False, msg)
775
  return (True, "Migration successfull")
776

    
777

    
778
def CreateBlockDevice(disk, size, owner, on_primary, info):
779
  """Creates a block device for an instance.
780

781
  @type disk: L{objects.Disk}
782
  @param disk: the object describing the disk we should create
783
  @type size: int
784
  @param size: the size of the physical underlying device, in MiB
785
  @type owner: str
786
  @param owner: the name of the instance for which disk is created,
787
      used for device cache data
788
  @type on_primary: boolean
789
  @param on_primary:  indicates if it is the primary node or not
790
  @type info: string
791
  @param info: string that will be sent to the physical device
792
      creation, used for example to set (LVM) tags on LVs
793

794
  @return: the new unique_id of the device (this can sometime be
795
      computed only after creation), or None. On secondary nodes,
796
      it's not required to return anything.
797

798
  """
799
  clist = []
800
  if disk.children:
801
    for child in disk.children:
802
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
803
      if on_primary or disk.AssembleOnSecondary():
804
        # we need the children open in case the device itself has to
805
        # be assembled
806
        crdev.Open()
807
      clist.append(crdev)
808
  try:
809
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
810
    if device is not None:
811
      logging.info("removing existing device %s", disk)
812
      device.Remove()
813
  except errors.BlockDeviceError, err:
814
    pass
815

    
816
  device = bdev.Create(disk.dev_type, disk.physical_id,
817
                       clist, size)
818
  if device is None:
819
    raise ValueError("Can't create child device for %s, %s" %
820
                     (disk, size))
821
  if on_primary or disk.AssembleOnSecondary():
822
    if not device.Assemble():
823
      errorstring = "Can't assemble device after creation"
824
      logging.error(errorstring)
825
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
826
                                    " daemon logs" % errorstring)
827
    device.SetSyncSpeed(constants.SYNC_SPEED)
828
    if on_primary or disk.OpenOnSecondary():
829
      device.Open(force=True)
830
    DevCacheManager.UpdateCache(device.dev_path, owner,
831
                                on_primary, disk.iv_name)
832

    
833
  device.SetInfo(info)
834

    
835
  physical_id = device.unique_id
836
  return physical_id
837

    
838

    
839
def RemoveBlockDevice(disk):
840
  """Remove a block device.
841

842
  This is intended to be called recursively.
843

844
  """
845
  try:
846
    # since we are removing the device, allow a partial match
847
    # this allows removal of broken mirrors
848
    rdev = _RecursiveFindBD(disk, allow_partial=True)
849
  except errors.BlockDeviceError, err:
850
    # probably can't attach
851
    logging.info("Can't attach to device %s in remove", disk)
852
    rdev = None
853
  if rdev is not None:
854
    r_path = rdev.dev_path
855
    result = rdev.Remove()
856
    if result:
857
      DevCacheManager.RemoveCache(r_path)
858
  else:
859
    result = True
860
  if disk.children:
861
    for child in disk.children:
862
      result = result and RemoveBlockDevice(child)
863
  return result
864

    
865

    
866
def _RecursiveAssembleBD(disk, owner, as_primary):
867
  """Activate a block device for an instance.
868

869
  This is run on the primary and secondary nodes for an instance.
870

871
  This function is called recursively.
872

873
  Args:
874
    disk: a objects.Disk object
875
    as_primary: if we should make the block device read/write
876

877
  Returns:
878
    the assembled device or None (in case no device was assembled)
879

880
  If the assembly is not successful, an exception is raised.
881

882
  """
883
  children = []
884
  if disk.children:
885
    mcn = disk.ChildrenNeeded()
886
    if mcn == -1:
887
      mcn = 0 # max number of Nones allowed
888
    else:
889
      mcn = len(disk.children) - mcn # max number of Nones
890
    for chld_disk in disk.children:
891
      try:
892
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
893
      except errors.BlockDeviceError, err:
894
        if children.count(None) >= mcn:
895
          raise
896
        cdev = None
897
        logging.debug("Error in child activation: %s", str(err))
898
      children.append(cdev)
899

    
900
  if as_primary or disk.AssembleOnSecondary():
901
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
902
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
903
    result = r_dev
904
    if as_primary or disk.OpenOnSecondary():
905
      r_dev.Open()
906
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
907
                                as_primary, disk.iv_name)
908

    
909
  else:
910
    result = True
911
  return result
912

    
913

    
914
def AssembleBlockDevice(disk, owner, as_primary):
915
  """Activate a block device for an instance.
916

917
  This is a wrapper over _RecursiveAssembleBD.
918

919
  @rtype: str or boolean
920
  @return: a C{/dev/...} path for primary nodes, and
921
      C{True} for secondary nodes
922

923
  """
924
  result = _RecursiveAssembleBD(disk, owner, as_primary)
925
  if isinstance(result, bdev.BlockDev):
926
    result = result.dev_path
927
  return result
928

    
929

    
930
def ShutdownBlockDevice(disk):
931
  """Shut down a block device.
932

933
  First, if the device is assembled (can `Attach()`), then the device
934
  is shutdown. Then the children of the device are shutdown.
935

936
  This function is called recursively. Note that we don't cache the
937
  children or such, as oppossed to assemble, shutdown of different
938
  devices doesn't require that the upper device was active.
939

940
  """
941
  r_dev = _RecursiveFindBD(disk)
942
  if r_dev is not None:
943
    r_path = r_dev.dev_path
944
    result = r_dev.Shutdown()
945
    if result:
946
      DevCacheManager.RemoveCache(r_path)
947
  else:
948
    result = True
949
  if disk.children:
950
    for child in disk.children:
951
      result = result and ShutdownBlockDevice(child)
952
  return result
953

    
954

    
955
def MirrorAddChildren(parent_cdev, new_cdevs):
956
  """Extend a mirrored block device.
957

958
  """
959
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
960
  if parent_bdev is None:
961
    logging.error("Can't find parent device")
962
    return False
963
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
964
  if new_bdevs.count(None) > 0:
965
    logging.error("Can't find new device(s) to add: %s:%s",
966
                  new_bdevs, new_cdevs)
967
    return False
968
  parent_bdev.AddChildren(new_bdevs)
969
  return True
970

    
971

    
972
def MirrorRemoveChildren(parent_cdev, new_cdevs):
973
  """Shrink a mirrored block device.
974

975
  """
976
  parent_bdev = _RecursiveFindBD(parent_cdev)
977
  if parent_bdev is None:
978
    logging.error("Can't find parent in remove children: %s", parent_cdev)
979
    return False
980
  devs = []
981
  for disk in new_cdevs:
982
    rpath = disk.StaticDevPath()
983
    if rpath is None:
984
      bd = _RecursiveFindBD(disk)
985
      if bd is None:
986
        logging.error("Can't find dynamic device %s while removing children",
987
                      disk)
988
        return False
989
      else:
990
        devs.append(bd.dev_path)
991
    else:
992
      devs.append(rpath)
993
  parent_bdev.RemoveChildren(devs)
994
  return True
995

    
996

    
997
def GetMirrorStatus(disks):
998
  """Get the mirroring status of a list of devices.
999

1000
  Args:
1001
    disks: list of `objects.Disk`
1002

1003
  Returns:
1004
    list of (mirror_done, estimated_time) tuples, which
1005
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1006

1007
  """
1008
  stats = []
1009
  for dsk in disks:
1010
    rbd = _RecursiveFindBD(dsk)
1011
    if rbd is None:
1012
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1013
    stats.append(rbd.CombinedSyncStatus())
1014
  return stats
1015

    
1016

    
1017
def _RecursiveFindBD(disk, allow_partial=False):
1018
  """Check if a device is activated.
1019

1020
  If so, return informations about the real device.
1021

1022
  Args:
1023
    disk: the objects.Disk instance
1024
    allow_partial: don't abort the find if a child of the
1025
                   device can't be found; this is intended to be
1026
                   used when repairing mirrors
1027

1028
  Returns:
1029
    None if the device can't be found
1030
    otherwise the device instance
1031

1032
  """
1033
  children = []
1034
  if disk.children:
1035
    for chdisk in disk.children:
1036
      children.append(_RecursiveFindBD(chdisk))
1037

    
1038
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1039

    
1040

    
1041
def FindBlockDevice(disk):
1042
  """Check if a device is activated.
1043

1044
  If so, return informations about the real device.
1045

1046
  Args:
1047
    disk: the objects.Disk instance
1048
  Returns:
1049
    None if the device can't be found
1050
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1051

1052
  """
1053
  rbd = _RecursiveFindBD(disk)
1054
  if rbd is None:
1055
    return rbd
1056
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1057

    
1058

    
1059
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1060
  """Write a file to the filesystem.
1061

1062
  This allows the master to overwrite(!) a file. It will only perform
1063
  the operation if the file belongs to a list of configuration files.
1064

1065
  """
1066
  if not os.path.isabs(file_name):
1067
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1068
                  file_name)
1069
    return False
1070

    
1071
  allowed_files = [
1072
    constants.CLUSTER_CONF_FILE,
1073
    constants.ETC_HOSTS,
1074
    constants.SSH_KNOWN_HOSTS_FILE,
1075
    constants.VNC_PASSWORD_FILE,
1076
    ]
1077

    
1078
  if file_name not in allowed_files:
1079
    logging.error("Filename passed to UploadFile not in allowed"
1080
                 " upload targets: '%s'", file_name)
1081
    return False
1082

    
1083
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1084
                  atime=atime, mtime=mtime)
1085
  return True
1086

    
1087

    
1088
def _ErrnoOrStr(err):
1089
  """Format an EnvironmentError exception.
1090

1091
  If the `err` argument has an errno attribute, it will be looked up
1092
  and converted into a textual EXXXX description. Otherwise the string
1093
  representation of the error will be returned.
1094

1095
  """
1096
  if hasattr(err, 'errno'):
1097
    detail = errno.errorcode[err.errno]
1098
  else:
1099
    detail = str(err)
1100
  return detail
1101

    
1102

    
1103
def _OSOndiskVersion(name, os_dir):
1104
  """Compute and return the API version of a given OS.
1105

1106
  This function will try to read the API version of the os given by
1107
  the 'name' parameter and residing in the 'os_dir' directory.
1108

1109
  Return value will be either an integer denoting the version or None in the
1110
  case when this is not a valid OS name.
1111

1112
  """
1113
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1114

    
1115
  try:
1116
    st = os.stat(api_file)
1117
  except EnvironmentError, err:
1118
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1119
                           " found (%s)" % _ErrnoOrStr(err))
1120

    
1121
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1122
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1123
                           " a regular file")
1124

    
1125
  try:
1126
    f = open(api_file)
1127
    try:
1128
      api_versions = f.readlines()
1129
    finally:
1130
      f.close()
1131
  except EnvironmentError, err:
1132
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1133
                           " API version (%s)" % _ErrnoOrStr(err))
1134

    
1135
  api_versions = [version.strip() for version in api_versions]
1136
  try:
1137
    api_versions = [int(version) for version in api_versions]
1138
  except (TypeError, ValueError), err:
1139
    raise errors.InvalidOS(name, os_dir,
1140
                           "API version is not integer (%s)" % str(err))
1141

    
1142
  return api_versions
1143

    
1144

    
1145
def DiagnoseOS(top_dirs=None):
1146
  """Compute the validity for all OSes.
1147

1148
  Returns an OS object for each name in all the given top directories
1149
  (if not given defaults to constants.OS_SEARCH_PATH)
1150

1151
  Returns:
1152
    list of OS objects
1153

1154
  """
1155
  if top_dirs is None:
1156
    top_dirs = constants.OS_SEARCH_PATH
1157

    
1158
  result = []
1159
  for dir_name in top_dirs:
1160
    if os.path.isdir(dir_name):
1161
      try:
1162
        f_names = utils.ListVisibleFiles(dir_name)
1163
      except EnvironmentError, err:
1164
        logging.exception("Can't list the OS directory %s", dir_name)
1165
        break
1166
      for name in f_names:
1167
        try:
1168
          os_inst = OSFromDisk(name, base_dir=dir_name)
1169
          result.append(os_inst)
1170
        except errors.InvalidOS, err:
1171
          result.append(objects.OS.FromInvalidOS(err))
1172

    
1173
  return result
1174

    
1175

    
1176
def OSFromDisk(name, base_dir=None):
1177
  """Create an OS instance from disk.
1178

1179
  This function will return an OS instance if the given name is a
1180
  valid OS name. Otherwise, it will raise an appropriate
1181
  `errors.InvalidOS` exception, detailing why this is not a valid
1182
  OS.
1183

1184
  @type base_dir: string
1185
  @keyword base_dir: Base directory containing OS installations.
1186
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1187

1188
  """
1189

    
1190
  if base_dir is None:
1191
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1192
    if os_dir is None:
1193
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1194
  else:
1195
    os_dir = os.path.sep.join([base_dir, name])
1196

    
1197
  api_versions = _OSOndiskVersion(name, os_dir)
1198

    
1199
  if constants.OS_API_VERSION not in api_versions:
1200
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1201
                           " (found %s want %s)"
1202
                           % (api_versions, constants.OS_API_VERSION))
1203

    
1204
  # OS Scripts dictionary, we will populate it with the actual script names
1205
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1206

    
1207
  for script in os_scripts:
1208
    os_scripts[script] = os.path.sep.join([os_dir, script])
1209

    
1210
    try:
1211
      st = os.stat(os_scripts[script])
1212
    except EnvironmentError, err:
1213
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1214
                             (script, _ErrnoOrStr(err)))
1215

    
1216
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1217
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1218
                             script)
1219

    
1220
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1221
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1222
                             script)
1223

    
1224

    
1225
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1226
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1227
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1228
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1229
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1230
                    api_versions=api_versions)
1231

    
1232
def OSEnvironment(instance, debug=0):
1233
  """Calculate the environment for an os script.
1234

1235
  @type instance: instance object
1236
  @param instance: target instance for the os script run
1237
  @type debug: integer
1238
  @param debug: debug level (0 or 1, for os api 10)
1239
  @rtype: dict
1240
  @return: dict of environment variables
1241

1242
  """
1243
  result = {}
1244
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1245
  result['INSTANCE_NAME'] = instance.name
1246
  result['HYPERVISOR'] = instance.hypervisor
1247
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1248
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1249
  result['DEBUG_LEVEL'] = '%d' % debug
1250
  for idx, disk in enumerate(instance.disks):
1251
    real_disk = _RecursiveFindBD(disk)
1252
    if real_disk is None:
1253
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1254
                                    str(disk))
1255
    real_disk.Open()
1256
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1257
    # FIXME: When disks will have read-only mode, populate this
1258
    result['DISK_%d_ACCESS' % idx] = 'W'
1259
    if constants.HV_DISK_TYPE in instance.hvparams:
1260
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1261
        instance.hvparams[constants.HV_DISK_TYPE]
1262
    if disk.dev_type in constants.LDS_BLOCK:
1263
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1264
    elif disk.dev_type == constants.LD_FILE:
1265
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1266
        'file:%s' % disk.physical_id[0]
1267
  for idx, nic in enumerate(instance.nics):
1268
    result['NIC_%d_MAC' % idx] = nic.mac
1269
    if nic.ip:
1270
      result['NIC_%d_IP' % idx] = nic.ip
1271
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1272
    if constants.HV_NIC_TYPE in instance.hvparams:
1273
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1274
        instance.hvparams[constants.HV_NIC_TYPE]
1275

    
1276
  return result
1277

    
1278
def GrowBlockDevice(disk, amount):
1279
  """Grow a stack of block devices.
1280

1281
  This function is called recursively, with the childrens being the
1282
  first one resize.
1283

1284
  Args:
1285
    disk: the disk to be grown
1286

1287
  Returns: a tuple of (status, result), with:
1288
    status: the result (true/false) of the operation
1289
    result: the error message if the operation failed, otherwise not used
1290

1291
  """
1292
  r_dev = _RecursiveFindBD(disk)
1293
  if r_dev is None:
1294
    return False, "Cannot find block device %s" % (disk,)
1295

    
1296
  try:
1297
    r_dev.Grow(amount)
1298
  except errors.BlockDeviceError, err:
1299
    return False, str(err)
1300

    
1301
  return True, None
1302

    
1303

    
1304
def SnapshotBlockDevice(disk):
1305
  """Create a snapshot copy of a block device.
1306

1307
  This function is called recursively, and the snapshot is actually created
1308
  just for the leaf lvm backend device.
1309

1310
  @type disk: L{objects.Disk}
1311
  @param disk: the disk to be snapshotted
1312
  @rtype: string
1313
  @return: snapshot disk path
1314

1315
  """
1316
  if disk.children:
1317
    if len(disk.children) == 1:
1318
      # only one child, let's recurse on it
1319
      return SnapshotBlockDevice(disk.children[0])
1320
    else:
1321
      # more than one child, choose one that matches
1322
      for child in disk.children:
1323
        if child.size == disk.size:
1324
          # return implies breaking the loop
1325
          return SnapshotBlockDevice(child)
1326
  elif disk.dev_type == constants.LD_LV:
1327
    r_dev = _RecursiveFindBD(disk)
1328
    if r_dev is not None:
1329
      # let's stay on the safe side and ask for the full size, for now
1330
      return r_dev.Snapshot(disk.size)
1331
    else:
1332
      return None
1333
  else:
1334
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1335
                                 " '%s' of type '%s'" %
1336
                                 (disk.unique_id, disk.dev_type))
1337

    
1338

    
1339
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1340
  """Export a block device snapshot to a remote node.
1341

1342
  @type disk: L{objects.Disk}
1343
  @param disk: the description of the disk to export
1344
  @type dest_node: str
1345
  @param dest_node: the destination node to export to
1346
  @type instance: L{objects.Instance}
1347
  @param instance: the instance object to whom the disk belongs
1348
  @type cluster_name: str
1349
  @param cluster_name: the cluster name, needed for SSH hostalias
1350
  @type idx: int
1351
  @param idx: the index of the disk in the instance's disk list,
1352
      used to export to the OS scripts environment
1353
  @rtype: bool
1354
  @return: the success of the operation
1355

1356
  """
1357
  export_env = OSEnvironment(instance)
1358

    
1359
  inst_os = OSFromDisk(instance.os)
1360
  export_script = inst_os.export_script
1361

    
1362
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1363
                                     instance.name, int(time.time()))
1364
  if not os.path.exists(constants.LOG_OS_DIR):
1365
    os.mkdir(constants.LOG_OS_DIR, 0750)
1366
  real_disk = _RecursiveFindBD(disk)
1367
  if real_disk is None:
1368
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1369
                                  str(disk))
1370
  real_disk.Open()
1371

    
1372
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1373
  export_env['EXPORT_INDEX'] = str(idx)
1374

    
1375
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1376
  destfile = disk.physical_id[1]
1377

    
1378
  # the target command is built out of three individual commands,
1379
  # which are joined by pipes; we check each individual command for
1380
  # valid parameters
1381
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1382
                               export_script, logfile)
1383

    
1384
  comprcmd = "gzip"
1385

    
1386
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1387
                                destdir, destdir, destfile)
1388
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1389
                                                   constants.GANETI_RUNAS,
1390
                                                   destcmd)
1391

    
1392
  # all commands have been checked, so we're safe to combine them
1393
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1394

    
1395
  result = utils.RunCmd(command, env=export_env)
1396

    
1397
  if result.failed:
1398
    logging.error("os snapshot export command '%s' returned error: %s"
1399
                  " output: %s", command, result.fail_reason, result.output)
1400
    return False
1401

    
1402
  return True
1403

    
1404

    
1405
def FinalizeExport(instance, snap_disks):
1406
  """Write out the export configuration information.
1407

1408
  Args:
1409
    instance: instance configuration
1410
    snap_disks: snapshot block devices
1411

1412
  Returns:
1413
    False in case of error, True otherwise.
1414

1415
  """
1416
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1417
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1418

    
1419
  config = objects.SerializableConfigParser()
1420

    
1421
  config.add_section(constants.INISECT_EXP)
1422
  config.set(constants.INISECT_EXP, 'version', '0')
1423
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1424
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1425
  config.set(constants.INISECT_EXP, 'os', instance.os)
1426
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1427

    
1428
  config.add_section(constants.INISECT_INS)
1429
  config.set(constants.INISECT_INS, 'name', instance.name)
1430
  config.set(constants.INISECT_INS, 'memory', '%d' %
1431
             instance.beparams[constants.BE_MEMORY])
1432
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1433
             instance.beparams[constants.BE_VCPUS])
1434
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1435

    
1436
  nic_count = 0
1437
  for nic_count, nic in enumerate(instance.nics):
1438
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1439
               nic_count, '%s' % nic.mac)
1440
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1441
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1442
               '%s' % nic.bridge)
1443
  # TODO: redundant: on load can read nics until it doesn't exist
1444
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1445

    
1446
  disk_count = 0
1447
  for disk_count, disk in enumerate(snap_disks):
1448
    if disk:
1449
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1450
                 ('%s' % disk.iv_name))
1451
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1452
                 ('%s' % disk.physical_id[1]))
1453
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1454
                 ('%d' % disk.size))
1455
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1456

    
1457
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1458
  cfo = open(cff, 'w')
1459
  try:
1460
    config.write(cfo)
1461
  finally:
1462
    cfo.close()
1463

    
1464
  shutil.rmtree(finaldestdir, True)
1465
  shutil.move(destdir, finaldestdir)
1466

    
1467
  return True
1468

    
1469

    
1470
def ExportInfo(dest):
1471
  """Get export configuration information.
1472

1473
  Args:
1474
    dest: directory containing the export
1475

1476
  Returns:
1477
    A serializable config file containing the export info.
1478

1479
  """
1480
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1481

    
1482
  config = objects.SerializableConfigParser()
1483
  config.read(cff)
1484

    
1485
  if (not config.has_section(constants.INISECT_EXP) or
1486
      not config.has_section(constants.INISECT_INS)):
1487
    return None
1488

    
1489
  return config
1490

    
1491

    
1492
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1493
  """Import an os image into an instance.
1494

1495
  @type instance: L{objects.Instance}
1496
  @param instance: instance to import the disks into
1497
  @type src_node: string
1498
  @param src_node: source node for the disk images
1499
  @type src_images: list of string
1500
  @param src_images: absolute paths of the disk images
1501
  @rtype: list of boolean
1502
  @return: each boolean represent the success of importing the n-th disk
1503

1504
  """
1505
  import_env = OSEnvironment(instance)
1506
  inst_os = OSFromDisk(instance.os)
1507
  import_script = inst_os.import_script
1508

    
1509
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1510
                                        instance.name, int(time.time()))
1511
  if not os.path.exists(constants.LOG_OS_DIR):
1512
    os.mkdir(constants.LOG_OS_DIR, 0750)
1513

    
1514
  comprcmd = "gunzip"
1515
  impcmd = utils.BuildShellCmd("(cd %s; %s &>%s)", inst_os.path, import_script,
1516
                               logfile)
1517

    
1518
  final_result = []
1519
  for idx, image in enumerate(src_images):
1520
    if image:
1521
      destcmd = utils.BuildShellCmd('cat %s', image)
1522
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1523
                                                       constants.GANETI_RUNAS,
1524
                                                       destcmd)
1525
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1526
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1527
      import_env['IMPORT_INDEX'] = str(idx)
1528
      result = utils.RunCmd(command, env=import_env)
1529
      if result.failed:
1530
        logging.error("disk import command '%s' returned error: %s"
1531
                      " output: %s", command, result.fail_reason, result.output)
1532
        final_result.append(False)
1533
      else:
1534
        final_result.append(True)
1535
    else:
1536
      final_result.append(True)
1537

    
1538
  return final_result
1539

    
1540

    
1541
def ListExports():
1542
  """Return a list of exports currently available on this machine.
1543

1544
  """
1545
  if os.path.isdir(constants.EXPORT_DIR):
1546
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1547
  else:
1548
    return []
1549

    
1550

    
1551
def RemoveExport(export):
1552
  """Remove an existing export from the node.
1553

1554
  Args:
1555
    export: the name of the export to remove
1556

1557
  Returns:
1558
    False in case of error, True otherwise.
1559

1560
  """
1561
  target = os.path.join(constants.EXPORT_DIR, export)
1562

    
1563
  shutil.rmtree(target)
1564
  # TODO: catch some of the relevant exceptions and provide a pretty
1565
  # error message if rmtree fails.
1566

    
1567
  return True
1568

    
1569

    
1570
def RenameBlockDevices(devlist):
1571
  """Rename a list of block devices.
1572

1573
  The devlist argument is a list of tuples (disk, new_logical,
1574
  new_physical). The return value will be a combined boolean result
1575
  (True only if all renames succeeded).
1576

1577
  """
1578
  result = True
1579
  for disk, unique_id in devlist:
1580
    dev = _RecursiveFindBD(disk)
1581
    if dev is None:
1582
      result = False
1583
      continue
1584
    try:
1585
      old_rpath = dev.dev_path
1586
      dev.Rename(unique_id)
1587
      new_rpath = dev.dev_path
1588
      if old_rpath != new_rpath:
1589
        DevCacheManager.RemoveCache(old_rpath)
1590
        # FIXME: we should add the new cache information here, like:
1591
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1592
        # but we don't have the owner here - maybe parse from existing
1593
        # cache? for now, we only lose lvm data when we rename, which
1594
        # is less critical than DRBD or MD
1595
    except errors.BlockDeviceError, err:
1596
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1597
      result = False
1598
  return result
1599

    
1600

    
1601
def _TransformFileStorageDir(file_storage_dir):
1602
  """Checks whether given file_storage_dir is valid.
1603

1604
  Checks wheter the given file_storage_dir is within the cluster-wide
1605
  default file_storage_dir stored in SimpleStore. Only paths under that
1606
  directory are allowed.
1607

1608
  @type file_storage_dir: str
1609
  @param file_storage_dir: the path to check
1610

1611
  @return: the normalized path if valid, None otherwise
1612

1613
  """
1614
  cfg = _GetConfig()
1615
  file_storage_dir = os.path.normpath(file_storage_dir)
1616
  base_file_storage_dir = cfg.GetFileStorageDir()
1617
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1618
      base_file_storage_dir):
1619
    logging.error("file storage directory '%s' is not under base file"
1620
                  " storage directory '%s'",
1621
                  file_storage_dir, base_file_storage_dir)
1622
    return None
1623
  return file_storage_dir
1624

    
1625

    
1626
def CreateFileStorageDir(file_storage_dir):
1627
  """Create file storage directory.
1628

1629
  @type file_storage_dir: str
1630
  @param file_storage_dir: directory to create
1631

1632
  @rtype: tuple
1633
  @return: tuple with first element a boolean indicating wheter dir
1634
      creation was successful or not
1635

1636
  """
1637
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1638
  result = True,
1639
  if not file_storage_dir:
1640
    result = False,
1641
  else:
1642
    if os.path.exists(file_storage_dir):
1643
      if not os.path.isdir(file_storage_dir):
1644
        logging.error("'%s' is not a directory", file_storage_dir)
1645
        result = False,
1646
    else:
1647
      try:
1648
        os.makedirs(file_storage_dir, 0750)
1649
      except OSError, err:
1650
        logging.error("Cannot create file storage directory '%s': %s",
1651
                      file_storage_dir, err)
1652
        result = False,
1653
  return result
1654

    
1655

    
1656
def RemoveFileStorageDir(file_storage_dir):
1657
  """Remove file storage directory.
1658

1659
  Remove it only if it's empty. If not log an error and return.
1660

1661
  Args:
1662
    file_storage_dir: string containing the path
1663

1664
  Returns:
1665
    tuple with first element a boolean indicating wheter dir
1666
    removal was successful or not
1667

1668
  """
1669
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1670
  result = True,
1671
  if not file_storage_dir:
1672
    result = False,
1673
  else:
1674
    if os.path.exists(file_storage_dir):
1675
      if not os.path.isdir(file_storage_dir):
1676
        logging.error("'%s' is not a directory", file_storage_dir)
1677
        result = False,
1678
      # deletes dir only if empty, otherwise we want to return False
1679
      try:
1680
        os.rmdir(file_storage_dir)
1681
      except OSError, err:
1682
        logging.exception("Cannot remove file storage directory '%s'",
1683
                          file_storage_dir)
1684
        result = False,
1685
  return result
1686

    
1687

    
1688
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1689
  """Rename the file storage directory.
1690

1691
  Args:
1692
    old_file_storage_dir: string containing the old path
1693
    new_file_storage_dir: string containing the new path
1694

1695
  Returns:
1696
    tuple with first element a boolean indicating wheter dir
1697
    rename was successful or not
1698

1699
  """
1700
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1701
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1702
  result = True,
1703
  if not old_file_storage_dir or not new_file_storage_dir:
1704
    result = False,
1705
  else:
1706
    if not os.path.exists(new_file_storage_dir):
1707
      if os.path.isdir(old_file_storage_dir):
1708
        try:
1709
          os.rename(old_file_storage_dir, new_file_storage_dir)
1710
        except OSError, err:
1711
          logging.exception("Cannot rename '%s' to '%s'",
1712
                            old_file_storage_dir, new_file_storage_dir)
1713
          result =  False,
1714
      else:
1715
        logging.error("'%s' is not a directory", old_file_storage_dir)
1716
        result = False,
1717
    else:
1718
      if os.path.exists(old_file_storage_dir):
1719
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1720
                      old_file_storage_dir, new_file_storage_dir)
1721
        result = False,
1722
  return result
1723

    
1724

    
1725
def _IsJobQueueFile(file_name):
1726
  """Checks whether the given filename is in the queue directory.
1727

1728
  """
1729
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1730
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1731

    
1732
  if not result:
1733
    logging.error("'%s' is not a file in the queue directory",
1734
                  file_name)
1735

    
1736
  return result
1737

    
1738

    
1739
def JobQueueUpdate(file_name, content):
1740
  """Updates a file in the queue directory.
1741

1742
  """
1743
  if not _IsJobQueueFile(file_name):
1744
    return False
1745

    
1746
  # Write and replace the file atomically
1747
  utils.WriteFile(file_name, data=content)
1748

    
1749
  return True
1750

    
1751

    
1752
def JobQueueRename(old, new):
1753
  """Renames a job queue file.
1754

1755
  """
1756
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1757
    return False
1758

    
1759
  os.rename(old, new)
1760

    
1761
  return True
1762

    
1763

    
1764
def JobQueueSetDrainFlag(drain_flag):
1765
  """Set the drain flag for the queue.
1766

1767
  This will set or unset the queue drain flag.
1768

1769
  @type drain_flag: bool
1770
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
1771

1772
  """
1773
  if drain_flag:
1774
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1775
  else:
1776
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1777

    
1778
  return True
1779

    
1780

    
1781
def CloseBlockDevices(disks):
1782
  """Closes the given block devices.
1783

1784
  This means they will be switched to secondary mode (in case of DRBD).
1785

1786
  """
1787
  bdevs = []
1788
  for cf in disks:
1789
    rd = _RecursiveFindBD(cf)
1790
    if rd is None:
1791
      return (False, "Can't find device %s" % cf)
1792
    bdevs.append(rd)
1793

    
1794
  msg = []
1795
  for rd in bdevs:
1796
    try:
1797
      rd.Close()
1798
    except errors.BlockDeviceError, err:
1799
      msg.append(str(err))
1800
  if msg:
1801
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1802
  else:
1803
    return (True, "All devices secondary")
1804

    
1805

    
1806
def ValidateHVParams(hvname, hvparams):
1807
  """Validates the given hypervisor parameters.
1808

1809
  @type hvname: string
1810
  @param hvname: the hypervisor name
1811
  @type hvparams: dict
1812
  @param hvparams: the hypervisor parameters to be validated
1813
  @rtype: tuple (bool, str)
1814
  @return: tuple of (success, message)
1815

1816
  """
1817
  try:
1818
    hv_type = hypervisor.GetHypervisor(hvname)
1819
    hv_type.ValidateParameters(hvparams)
1820
    return (True, "Validation passed")
1821
  except errors.HypervisorError, err:
1822
    return (False, str(err))
1823

    
1824

    
1825
class HooksRunner(object):
1826
  """Hook runner.
1827

1828
  This class is instantiated on the node side (ganeti-noded) and not on
1829
  the master side.
1830

1831
  """
1832
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1833

    
1834
  def __init__(self, hooks_base_dir=None):
1835
    """Constructor for hooks runner.
1836

1837
    Args:
1838
      - hooks_base_dir: if not None, this overrides the
1839
        constants.HOOKS_BASE_DIR (useful for unittests)
1840

1841
    """
1842
    if hooks_base_dir is None:
1843
      hooks_base_dir = constants.HOOKS_BASE_DIR
1844
    self._BASE_DIR = hooks_base_dir
1845

    
1846
  @staticmethod
1847
  def ExecHook(script, env):
1848
    """Exec one hook script.
1849

1850
    Args:
1851
     - script: the full path to the script
1852
     - env: the environment with which to exec the script
1853

1854
    """
1855
    # exec the process using subprocess and log the output
1856
    fdstdin = None
1857
    try:
1858
      fdstdin = open("/dev/null", "r")
1859
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1860
                               stderr=subprocess.STDOUT, close_fds=True,
1861
                               shell=False, cwd="/", env=env)
1862
      output = ""
1863
      try:
1864
        output = child.stdout.read(4096)
1865
        child.stdout.close()
1866
      except EnvironmentError, err:
1867
        output += "Hook script error: %s" % str(err)
1868

    
1869
      while True:
1870
        try:
1871
          result = child.wait()
1872
          break
1873
        except EnvironmentError, err:
1874
          if err.errno == errno.EINTR:
1875
            continue
1876
          raise
1877
    finally:
1878
      # try not to leak fds
1879
      for fd in (fdstdin, ):
1880
        if fd is not None:
1881
          try:
1882
            fd.close()
1883
          except EnvironmentError, err:
1884
            # just log the error
1885
            #logging.exception("Error while closing fd %s", fd)
1886
            pass
1887

    
1888
    return result == 0, output
1889

    
1890
  def RunHooks(self, hpath, phase, env):
1891
    """Run the scripts in the hooks directory.
1892

1893
    This method will not be usually overriden by child opcodes.
1894

1895
    """
1896
    if phase == constants.HOOKS_PHASE_PRE:
1897
      suffix = "pre"
1898
    elif phase == constants.HOOKS_PHASE_POST:
1899
      suffix = "post"
1900
    else:
1901
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1902
    rr = []
1903

    
1904
    subdir = "%s-%s.d" % (hpath, suffix)
1905
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1906
    try:
1907
      dir_contents = utils.ListVisibleFiles(dir_name)
1908
    except OSError, err:
1909
      # must log
1910
      return rr
1911

    
1912
    # we use the standard python sort order,
1913
    # so 00name is the recommended naming scheme
1914
    dir_contents.sort()
1915
    for relname in dir_contents:
1916
      fname = os.path.join(dir_name, relname)
1917
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1918
          self.RE_MASK.match(relname) is not None):
1919
        rrval = constants.HKR_SKIP
1920
        output = ""
1921
      else:
1922
        result, output = self.ExecHook(fname, env)
1923
        if not result:
1924
          rrval = constants.HKR_FAIL
1925
        else:
1926
          rrval = constants.HKR_SUCCESS
1927
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1928

    
1929
    return rr
1930

    
1931

    
1932
class IAllocatorRunner(object):
1933
  """IAllocator runner.
1934

1935
  This class is instantiated on the node side (ganeti-noded) and not on
1936
  the master side.
1937

1938
  """
1939
  def Run(self, name, idata):
1940
    """Run an iallocator script.
1941

1942
    Return value: tuple of:
1943
       - run status (one of the IARUN_ constants)
1944
       - stdout
1945
       - stderr
1946
       - fail reason (as from utils.RunResult)
1947

1948
    """
1949
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1950
                                  os.path.isfile)
1951
    if alloc_script is None:
1952
      return (constants.IARUN_NOTFOUND, None, None, None)
1953

    
1954
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1955
    try:
1956
      os.write(fd, idata)
1957
      os.close(fd)
1958
      result = utils.RunCmd([alloc_script, fin_name])
1959
      if result.failed:
1960
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1961
                result.fail_reason)
1962
    finally:
1963
      os.unlink(fin_name)
1964

    
1965
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1966

    
1967

    
1968
class DevCacheManager(object):
1969
  """Simple class for managing a cache of block device information.
1970

1971
  """
1972
  _DEV_PREFIX = "/dev/"
1973
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1974

    
1975
  @classmethod
1976
  def _ConvertPath(cls, dev_path):
1977
    """Converts a /dev/name path to the cache file name.
1978

1979
    This replaces slashes with underscores and strips the /dev
1980
    prefix. It then returns the full path to the cache file
1981

1982
    """
1983
    if dev_path.startswith(cls._DEV_PREFIX):
1984
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1985
    dev_path = dev_path.replace("/", "_")
1986
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1987
    return fpath
1988

    
1989
  @classmethod
1990
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1991
    """Updates the cache information for a given device.
1992

1993
    """
1994
    if dev_path is None:
1995
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1996
      return
1997
    fpath = cls._ConvertPath(dev_path)
1998
    if on_primary:
1999
      state = "primary"
2000
    else:
2001
      state = "secondary"
2002
    if iv_name is None:
2003
      iv_name = "not_visible"
2004
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2005
    try:
2006
      utils.WriteFile(fpath, data=fdata)
2007
    except EnvironmentError, err:
2008
      logging.exception("Can't update bdev cache for %s", dev_path)
2009

    
2010
  @classmethod
2011
  def RemoveCache(cls, dev_path):
2012
    """Remove data for a dev_path.
2013

2014
    """
2015
    if dev_path is None:
2016
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2017
      return
2018
    fpath = cls._ConvertPath(dev_path)
2019
    try:
2020
      utils.RemoveFile(fpath)
2021
    except EnvironmentError, err:
2022
      logging.exception("Can't update bdev cache for %s", dev_path)