Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ d324e3fc

History | View | Annotate | Download (59.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetConfig():
48
  return ssconf.SimpleConfigReader()
49

    
50

    
51
def _GetSshRunner(cluster_name):
52
  return ssh.SshRunner(cluster_name)
53

    
54

    
55
def _CleanDirectory(path, exclude=[]):
56
  """Removes all regular files in a directory.
57

58
  @param exclude: List of files to be excluded.
59
  @type exclude: list
60

61
  """
62
  if not os.path.isdir(path):
63
    return
64

    
65
  # Normalize excluded paths
66
  exclude = [os.path.normpath(i) for i in exclude]
67

    
68
  for rel_name in utils.ListVisibleFiles(path):
69
    full_name = os.path.normpath(os.path.join(path, rel_name))
70
    if full_name in exclude:
71
      continue
72
    if os.path.isfile(full_name) and not os.path.islink(full_name):
73
      utils.RemoveFile(full_name)
74

    
75

    
76
def JobQueuePurge():
77
  """Removes job queue files and archived jobs
78

79
  """
80
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82

    
83

    
84
def GetMasterInfo():
85
  """Returns master information.
86

87
  This is an utility function to compute master information, either
88
  for consumption here or from the node daemon.
89

90
  @rtype: tuple
91
  @return: (master_netdev, master_ip, master_name)
92

93
  """
94
  try:
95
    cfg = _GetConfig()
96
    master_netdev = cfg.GetMasterNetdev()
97
    master_ip = cfg.GetMasterIP()
98
    master_node = cfg.GetMasterNode()
99
  except errors.ConfigurationError, err:
100
    logging.exception("Cluster configuration incomplete")
101
    return (None, None)
102
  return (master_netdev, master_ip, master_node)
103

    
104

    
105
def StartMaster(start_daemons):
106
  """Activate local node as master node.
107

108
  The function will always try activate the IP address of the master
109
  (if someone else has it, then it won't). Then, if the start_daemons
110
  parameter is True, it will also start the master daemons
111
  (ganet-masterd and ganeti-rapi).
112

113
  """
114
  ok = True
115
  master_netdev, master_ip, _ = GetMasterInfo()
116
  if not master_netdev:
117
    return False
118

    
119
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120
    if utils.OwnIpAddress(master_ip):
121
      # we already have the ip:
122
      logging.debug("Already started")
123
    else:
124
      logging.error("Someone else has the master ip, not activating")
125
      ok = False
126
  else:
127
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128
                           "dev", master_netdev, "label",
129
                           "%s:0" % master_netdev])
130
    if result.failed:
131
      logging.error("Can't activate master IP: %s", result.output)
132
      ok = False
133

    
134
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135
                           "-s", master_ip, master_ip])
136
    # we'll ignore the exit code of arping
137

    
138
  # and now start the master and rapi daemons
139
  if start_daemons:
140
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
141
      result = utils.RunCmd([daemon])
142
      if result.failed:
143
        logging.error("Can't start daemon %s: %s", daemon, result.output)
144
        ok = False
145
  return ok
146

    
147

    
148
def StopMaster(stop_daemons):
149
  """Deactivate this node as master.
150

151
  The function will always try to deactivate the IP address of the
152
  master. Then, if the stop_daemons parameter is True, it will also
153
  stop the master daemons (ganet-masterd and ganeti-rapi).
154

155
  """
156
  master_netdev, master_ip, _ = GetMasterInfo()
157
  if not master_netdev:
158
    return False
159

    
160
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161
                         "dev", master_netdev])
162
  if result.failed:
163
    logging.error("Can't remove the master IP, error: %s", result.output)
164
    # but otherwise ignore the failure
165

    
166
  if stop_daemons:
167
    # stop/kill the rapi and the master daemon
168
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
170

    
171
  return True
172

    
173

    
174
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175
  """Joins this node to the cluster.
176

177
  This does the following:
178
      - updates the hostkeys of the machine (rsa and dsa)
179
      - adds the ssh private key to the user
180
      - adds the ssh public key to the users' authorized_keys file
181

182
  """
183
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187
  for name, content, mode in sshd_keys:
188
    utils.WriteFile(name, data=content, mode=mode)
189

    
190
  try:
191
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
192
                                                    mkdir=True)
193
  except errors.OpExecError, err:
194
    logging.exception("Error while processing user ssh files")
195
    return False
196

    
197
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198
    utils.WriteFile(name, data=content, mode=0600)
199

    
200
  utils.AddAuthorizedKey(auth_keys, sshpub)
201

    
202
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
203

    
204
  return True
205

    
206

    
207
def LeaveCluster():
208
  """Cleans up the current node and prepares it to be removed from the cluster.
209

210
  """
211
  _CleanDirectory(constants.DATA_DIR)
212
  JobQueuePurge()
213

    
214
  try:
215
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216
  except errors.OpExecError:
217
    logging.exception("Error while processing ssh files")
218
    return
219

    
220
  f = open(pub_key, 'r')
221
  try:
222
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
223
  finally:
224
    f.close()
225

    
226
  utils.RemoveFile(priv_key)
227
  utils.RemoveFile(pub_key)
228

    
229
  # Return a reassuring string to the caller, and quit
230
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
231

    
232

    
233
def GetNodeInfo(vgname, hypervisor_type):
234
  """Gives back a hash with different informations about the node.
235

236
  @type vgname: C{string}
237
  @param vgname: the name of the volume group to ask for disk space information
238
  @type hypervisor_type: C{str}
239
  @param hypervisor_type: the name of the hypervisor to ask for
240
      memory information
241
  @rtype: C{dict}
242
  @return: dictionary with the following keys:
243
      - vg_size is the size of the configured volume group in MiB
244
      - vg_free is the free size of the volume group in MiB
245
      - memory_dom0 is the memory allocated for domain0 in MiB
246
      - memory_free is the currently available (free) ram in MiB
247
      - memory_total is the total number of ram in MiB
248

249
  """
250
  outputarray = {}
251
  vginfo = _GetVGInfo(vgname)
252
  outputarray['vg_size'] = vginfo['vg_size']
253
  outputarray['vg_free'] = vginfo['vg_free']
254

    
255
  hyper = hypervisor.GetHypervisor(hypervisor_type)
256
  hyp_info = hyper.GetNodeInfo()
257
  if hyp_info is not None:
258
    outputarray.update(hyp_info)
259

    
260
  f = open("/proc/sys/kernel/random/boot_id", 'r')
261
  try:
262
    outputarray["bootid"] = f.read(128).rstrip("\n")
263
  finally:
264
    f.close()
265

    
266
  return outputarray
267

    
268

    
269
def VerifyNode(what, cluster_name):
270
  """Verify the status of the local node.
271

272
  Based on the input L{what} parameter, various checks are done on the
273
  local node.
274

275
  If the I{filelist} key is present, this list of
276
  files is checksummed and the file/checksum pairs are returned.
277

278
  If the I{nodelist} key is present, we check that we have
279
  connectivity via ssh with the target nodes (and check the hostname
280
  report).
281

282
  If the I{node-net-test} key is present, we check that we have
283
  connectivity to the given nodes via both primary IP and, if
284
  applicable, secondary IPs.
285

286
  @type what: C{dict}
287
  @param what: a dictionary of things to check:
288
      - filelist: list of files for which to compute checksums
289
      - nodelist: list of nodes we should check ssh communication with
290
      - node-net-test: list of nodes we should check node daemon port
291
        connectivity with
292
      - hypervisor: list with hypervisors to run the verify for
293

294
  """
295
  result = {}
296

    
297
  if 'hypervisor' in what:
298
    result['hypervisor'] = my_dict = {}
299
    for hv_name in what['hypervisor']:
300
      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
301

    
302
  if 'filelist' in what:
303
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
304

    
305
  if 'nodelist' in what:
306
    result['nodelist'] = {}
307
    random.shuffle(what['nodelist'])
308
    for node in what['nodelist']:
309
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
310
      if not success:
311
        result['nodelist'][node] = message
312
  if 'node-net-test' in what:
313
    result['node-net-test'] = {}
314
    my_name = utils.HostInfo().name
315
    my_pip = my_sip = None
316
    for name, pip, sip in what['node-net-test']:
317
      if name == my_name:
318
        my_pip = pip
319
        my_sip = sip
320
        break
321
    if not my_pip:
322
      result['node-net-test'][my_name] = ("Can't find my own"
323
                                          " primary/secondary IP"
324
                                          " in the node list")
325
    else:
326
      port = utils.GetNodeDaemonPort()
327
      for name, pip, sip in what['node-net-test']:
328
        fail = []
329
        if not utils.TcpPing(pip, port, source=my_pip):
330
          fail.append("primary")
331
        if sip != pip:
332
          if not utils.TcpPing(sip, port, source=my_sip):
333
            fail.append("secondary")
334
        if fail:
335
          result['node-net-test'][name] = ("failure using the %s"
336
                                           " interface(s)" %
337
                                           " and ".join(fail))
338

    
339
  return result
340

    
341

    
342
def GetVolumeList(vg_name):
343
  """Compute list of logical volumes and their size.
344

345
  Returns:
346
    dictionary of all partions (key) with their size (in MiB), inactive
347
    and online status:
348
    {'test1': ('20.06', True, True)}
349

350
  """
351
  lvs = {}
352
  sep = '|'
353
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354
                         "--separator=%s" % sep,
355
                         "-olv_name,lv_size,lv_attr", vg_name])
356
  if result.failed:
357
    logging.error("Failed to list logical volumes, lvs output: %s",
358
                  result.output)
359
    return result.output
360

    
361
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362
  for line in result.stdout.splitlines():
363
    line = line.strip()
364
    match = valid_line_re.match(line)
365
    if not match:
366
      logging.error("Invalid line returned from lvs output: '%s'", line)
367
      continue
368
    name, size, attr = match.groups()
369
    inactive = attr[4] == '-'
370
    online = attr[5] == 'o'
371
    lvs[name] = (size, inactive, online)
372

    
373
  return lvs
374

    
375

    
376
def ListVolumeGroups():
377
  """List the volume groups and their size.
378

379
  Returns:
380
    Dictionary with keys volume name and values the size of the volume
381

382
  """
383
  return utils.ListVolumeGroups()
384

    
385

    
386
def NodeVolumes():
387
  """List all volumes on this node.
388

389
  """
390
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
391
                         "--separator=|",
392
                         "--options=lv_name,lv_size,devices,vg_name"])
393
  if result.failed:
394
    logging.error("Failed to list logical volumes, lvs output: %s",
395
                  result.output)
396
    return {}
397

    
398
  def parse_dev(dev):
399
    if '(' in dev:
400
      return dev.split('(')[0]
401
    else:
402
      return dev
403

    
404
  def map_line(line):
405
    return {
406
      'name': line[0].strip(),
407
      'size': line[1].strip(),
408
      'dev': parse_dev(line[2].strip()),
409
      'vg': line[3].strip(),
410
    }
411

    
412
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
413
          if line.count('|') >= 3]
414

    
415

    
416
def BridgesExist(bridges_list):
417
  """Check if a list of bridges exist on the current node.
418

419
  Returns:
420
    True if all of them exist, false otherwise
421

422
  """
423
  for bridge in bridges_list:
424
    if not utils.BridgeExists(bridge):
425
      return False
426

    
427
  return True
428

    
429

    
430
def GetInstanceList(hypervisor_list):
431
  """Provides a list of instances.
432

433
  @type hypervisor_list: list
434
  @param hypervisor_list: the list of hypervisors to query information
435

436
  @rtype: list
437
  @return: a list of all running instances on the current node
438
             - instance1.example.com
439
             - instance2.example.com
440

441
  """
442
  results = []
443
  for hname in hypervisor_list:
444
    try:
445
      names = hypervisor.GetHypervisor(hname).ListInstances()
446
      results.extend(names)
447
    except errors.HypervisorError, err:
448
      logging.exception("Error enumerating instances for hypevisor %s", hname)
449
      # FIXME: should we somehow not propagate this to the master?
450
      raise
451

    
452
  return results
453

    
454

    
455
def GetInstanceInfo(instance, hname):
456
  """Gives back the informations about an instance as a dictionary.
457

458
  @type instance: string
459
  @param instance: the instance name
460
  @type hname: string
461
  @param hname: the hypervisor type of the instance
462

463
  @rtype: dict
464
  @return: dictionary with the following keys:
465
      - memory: memory size of instance (int)
466
      - state: xen state of instance (string)
467
      - time: cpu time of instance (float)
468

469
  """
470
  output = {}
471

    
472
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473
  if iinfo is not None:
474
    output['memory'] = iinfo[2]
475
    output['state'] = iinfo[4]
476
    output['time'] = iinfo[5]
477

    
478
  return output
479

    
480

    
481
def GetAllInstancesInfo(hypervisor_list):
482
  """Gather data about all instances.
483

484
  This is the equivalent of `GetInstanceInfo()`, except that it
485
  computes data for all instances at once, thus being faster if one
486
  needs data about more than one instance.
487

488
  @type hypervisor_list: list
489
  @param hypervisor_list: list of hypervisors to query for instance data
490

491
  @rtype: dict of dicts
492
  @return: dictionary of instance: data, with data having the following keys:
493
      - memory: memory size of instance (int)
494
      - state: xen state of instance (string)
495
      - time: cpu time of instance (float)
496
      - vcpuus: the number of vcpus
497

498
  """
499
  output = {}
500

    
501
  for hname in hypervisor_list:
502
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
503
    if iinfo:
504
      for name, inst_id, memory, vcpus, state, times in iinfo:
505
        value = {
506
          'memory': memory,
507
          'vcpus': vcpus,
508
          'state': state,
509
          'time': times,
510
          }
511
        if name in output and output[name] != value:
512
          raise errors.HypervisorError("Instance %s running duplicate"
513
                                       " with different parameters" % name)
514
        output[name] = value
515

    
516
  return output
517

    
518

    
519
def AddOSToInstance(instance, os_disk, swap_disk):
520
  """Add an OS to an instance.
521

522
  Args:
523
    instance: the instance object
524
    os_disk: the instance-visible name of the os device
525
    swap_disk: the instance-visible name of the swap device
526

527
  """
528
  inst_os = OSFromDisk(instance.os)
529

    
530
  create_script = inst_os.create_script
531
  create_env = OSEnvironment(instance)
532

    
533
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
534
                                     instance.name, int(time.time()))
535
  if not os.path.exists(constants.LOG_OS_DIR):
536
    os.mkdir(constants.LOG_OS_DIR, 0750)
537

    
538
  command = utils.BuildShellCmd("cd %s && %s &>%s",
539
                                inst_os.path, create_script, logfile)
540

    
541
  result = utils.RunCmd(command, env=create_env)
542
  if result.failed:
543
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
544
                  " output: %s", command, result.fail_reason, logfile,
545
                  result.output)
546
    return False
547

    
548
  return True
549

    
550

    
551
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
552
  """Run the OS rename script for an instance.
553

554
  Args:
555
    instance: the instance object
556
    old_name: the old name of the instance
557
    os_disk: the instance-visible name of the os device
558
    swap_disk: the instance-visible name of the swap device
559

560
  """
561
  inst_os = OSFromDisk(instance.os)
562

    
563
  script = inst_os.rename_script
564
  rename_env = OSEnvironment(instance)
565
  rename_env['OLD_INSTANCE_NAME'] = old_name
566

    
567
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
568
                                           old_name,
569
                                           instance.name, int(time.time()))
570
  if not os.path.exists(constants.LOG_OS_DIR):
571
    os.mkdir(constants.LOG_OS_DIR, 0750)
572

    
573
  command = utils.BuildShellCmd("cd %s && %s &>%s",
574
                                inst_os.path, script, logfile)
575

    
576
  result = utils.RunCmd(command, env=rename_env)
577

    
578
  if result.failed:
579
    logging.error("os create command '%s' returned error: %s output: %s",
580
                  command, result.fail_reason, result.output)
581
    return False
582

    
583
  return True
584

    
585

    
586
def _GetVGInfo(vg_name):
587
  """Get informations about the volume group.
588

589
  Args:
590
    vg_name: the volume group
591

592
  Returns:
593
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
594
    where
595
    vg_size is the total size of the volume group in MiB
596
    vg_free is the free size of the volume group in MiB
597
    pv_count are the number of physical disks in that vg
598

599
  If an error occurs during gathering of data, we return the same dict
600
  with keys all set to None.
601

602
  """
603
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
604

    
605
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
606
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
607

    
608
  if retval.failed:
609
    logging.error("volume group %s not present", vg_name)
610
    return retdic
611
  valarr = retval.stdout.strip().rstrip(':').split(':')
612
  if len(valarr) == 3:
613
    try:
614
      retdic = {
615
        "vg_size": int(round(float(valarr[0]), 0)),
616
        "vg_free": int(round(float(valarr[1]), 0)),
617
        "pv_count": int(valarr[2]),
618
        }
619
    except ValueError, err:
620
      logging.exception("Fail to parse vgs output")
621
  else:
622
    logging.error("vgs output has the wrong number of fields (expected"
623
                  " three): %s", str(valarr))
624
  return retdic
625

    
626

    
627
def _GatherBlockDevs(instance):
628
  """Set up an instance's block device(s).
629

630
  This is run on the primary node at instance startup. The block
631
  devices must be already assembled.
632

633
  """
634
  block_devices = []
635
  for disk in instance.disks:
636
    device = _RecursiveFindBD(disk)
637
    if device is None:
638
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
639
                                    str(disk))
640
    device.Open()
641
    block_devices.append((disk, device))
642
  return block_devices
643

    
644

    
645
def StartInstance(instance, extra_args):
646
  """Start an instance.
647

648
  @type instance: instance object
649
  @param instance: the instance object
650
  @rtype: boolean
651
  @return: whether the startup was successful or not
652

653
  """
654
  running_instances = GetInstanceList([instance.hypervisor])
655

    
656
  if instance.name in running_instances:
657
    return True
658

    
659
  block_devices = _GatherBlockDevs(instance)
660
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
661

    
662
  try:
663
    hyper.StartInstance(instance, block_devices, extra_args)
664
  except errors.HypervisorError, err:
665
    logging.exception("Failed to start instance")
666
    return False
667

    
668
  return True
669

    
670

    
671
def ShutdownInstance(instance):
672
  """Shut an instance down.
673

674
  @type instance: instance object
675
  @param instance: the instance object
676
  @rtype: boolean
677
  @return: whether the startup was successful or not
678

679
  """
680
  hv_name = instance.hypervisor
681
  running_instances = GetInstanceList([hv_name])
682

    
683
  if instance.name not in running_instances:
684
    return True
685

    
686
  hyper = hypervisor.GetHypervisor(hv_name)
687
  try:
688
    hyper.StopInstance(instance)
689
  except errors.HypervisorError, err:
690
    logging.error("Failed to stop instance")
691
    return False
692

    
693
  # test every 10secs for 2min
694
  shutdown_ok = False
695

    
696
  time.sleep(1)
697
  for dummy in range(11):
698
    if instance.name not in GetInstanceList([hv_name]):
699
      break
700
    time.sleep(10)
701
  else:
702
    # the shutdown did not succeed
703
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
704

    
705
    try:
706
      hyper.StopInstance(instance, force=True)
707
    except errors.HypervisorError, err:
708
      logging.exception("Failed to stop instance")
709
      return False
710

    
711
    time.sleep(1)
712
    if instance.name in GetInstanceList([hv_name]):
713
      logging.error("could not shutdown instance '%s' even by destroy",
714
                    instance.name)
715
      return False
716

    
717
  return True
718

    
719

    
720
def RebootInstance(instance, reboot_type, extra_args):
721
  """Reboot an instance.
722

723
  Args:
724
    instance    - name of instance to reboot
725
    reboot_type - how to reboot [soft,hard,full]
726

727
  """
728
  running_instances = GetInstanceList([instance.hypervisor])
729

    
730
  if instance.name not in running_instances:
731
    logging.error("Cannot reboot instance that is not running")
732
    return False
733

    
734
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
735
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
736
    try:
737
      hyper.RebootInstance(instance)
738
    except errors.HypervisorError, err:
739
      logging.exception("Failed to soft reboot instance")
740
      return False
741
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
742
    try:
743
      ShutdownInstance(instance)
744
      StartInstance(instance, extra_args)
745
    except errors.HypervisorError, err:
746
      logging.exception("Failed to hard reboot instance")
747
      return False
748
  else:
749
    raise errors.ParameterError("reboot_type invalid")
750

    
751
  return True
752

    
753

    
754
def MigrateInstance(instance, target, live):
755
  """Migrates an instance to another node.
756

757
  @type instance: C{objects.Instance}
758
  @param instance: the instance definition
759
  @type target: string
760
  @param target: the target node name
761
  @type live: boolean
762
  @param live: whether the migration should be done live or not (the
763
      interpretation of this parameter is left to the hypervisor)
764
  @rtype: tuple
765
  @return: a tuple of (success, msg) where:
766
      - succes is a boolean denoting the success/failure of the operation
767
      - msg is a string with details in case of failure
768

769
  """
770
  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
771

    
772
  try:
773
    hyper.MigrateInstance(instance.name, target, live)
774
  except errors.HypervisorError, err:
775
    msg = "Failed to migrate instance: %s" % str(err)
776
    logging.error(msg)
777
    return (False, msg)
778
  return (True, "Migration successfull")
779

    
780

    
781
def CreateBlockDevice(disk, size, owner, on_primary, info):
782
  """Creates a block device for an instance.
783

784
  Args:
785
   disk: a ganeti.objects.Disk object
786
   size: the size of the physical underlying device
787
   owner: a string with the name of the instance
788
   on_primary: a boolean indicating if it is the primary node or not
789
   info: string that will be sent to the physical device creation
790

791
  Returns:
792
    the new unique_id of the device (this can sometime be
793
    computed only after creation), or None. On secondary nodes,
794
    it's not required to return anything.
795

796
  """
797
  clist = []
798
  if disk.children:
799
    for child in disk.children:
800
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
801
      if on_primary or disk.AssembleOnSecondary():
802
        # we need the children open in case the device itself has to
803
        # be assembled
804
        crdev.Open()
805
      clist.append(crdev)
806
  try:
807
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
808
    if device is not None:
809
      logging.info("removing existing device %s", disk)
810
      device.Remove()
811
  except errors.BlockDeviceError, err:
812
    pass
813

    
814
  device = bdev.Create(disk.dev_type, disk.physical_id,
815
                       clist, size)
816
  if device is None:
817
    raise ValueError("Can't create child device for %s, %s" %
818
                     (disk, size))
819
  if on_primary or disk.AssembleOnSecondary():
820
    if not device.Assemble():
821
      errorstring = "Can't assemble device after creation"
822
      logging.error(errorstring)
823
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
824
                                    " daemon logs" % errorstring)
825
    device.SetSyncSpeed(constants.SYNC_SPEED)
826
    if on_primary or disk.OpenOnSecondary():
827
      device.Open(force=True)
828
    DevCacheManager.UpdateCache(device.dev_path, owner,
829
                                on_primary, disk.iv_name)
830

    
831
  device.SetInfo(info)
832

    
833
  physical_id = device.unique_id
834
  return physical_id
835

    
836

    
837
def RemoveBlockDevice(disk):
838
  """Remove a block device.
839

840
  This is intended to be called recursively.
841

842
  """
843
  try:
844
    # since we are removing the device, allow a partial match
845
    # this allows removal of broken mirrors
846
    rdev = _RecursiveFindBD(disk, allow_partial=True)
847
  except errors.BlockDeviceError, err:
848
    # probably can't attach
849
    logging.info("Can't attach to device %s in remove", disk)
850
    rdev = None
851
  if rdev is not None:
852
    r_path = rdev.dev_path
853
    result = rdev.Remove()
854
    if result:
855
      DevCacheManager.RemoveCache(r_path)
856
  else:
857
    result = True
858
  if disk.children:
859
    for child in disk.children:
860
      result = result and RemoveBlockDevice(child)
861
  return result
862

    
863

    
864
def _RecursiveAssembleBD(disk, owner, as_primary):
865
  """Activate a block device for an instance.
866

867
  This is run on the primary and secondary nodes for an instance.
868

869
  This function is called recursively.
870

871
  Args:
872
    disk: a objects.Disk object
873
    as_primary: if we should make the block device read/write
874

875
  Returns:
876
    the assembled device or None (in case no device was assembled)
877

878
  If the assembly is not successful, an exception is raised.
879

880
  """
881
  children = []
882
  if disk.children:
883
    mcn = disk.ChildrenNeeded()
884
    if mcn == -1:
885
      mcn = 0 # max number of Nones allowed
886
    else:
887
      mcn = len(disk.children) - mcn # max number of Nones
888
    for chld_disk in disk.children:
889
      try:
890
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
891
      except errors.BlockDeviceError, err:
892
        if children.count(None) >= mcn:
893
          raise
894
        cdev = None
895
        logging.debug("Error in child activation: %s", str(err))
896
      children.append(cdev)
897

    
898
  if as_primary or disk.AssembleOnSecondary():
899
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
900
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
901
    result = r_dev
902
    if as_primary or disk.OpenOnSecondary():
903
      r_dev.Open()
904
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
905
                                as_primary, disk.iv_name)
906

    
907
  else:
908
    result = True
909
  return result
910

    
911

    
912
def AssembleBlockDevice(disk, owner, as_primary):
913
  """Activate a block device for an instance.
914

915
  This is a wrapper over _RecursiveAssembleBD.
916

917
  Returns:
918
    a /dev path for primary nodes
919
    True for secondary nodes
920

921
  """
922
  result = _RecursiveAssembleBD(disk, owner, as_primary)
923
  if isinstance(result, bdev.BlockDev):
924
    result = result.dev_path
925
  return result
926

    
927

    
928
def ShutdownBlockDevice(disk):
929
  """Shut down a block device.
930

931
  First, if the device is assembled (can `Attach()`), then the device
932
  is shutdown. Then the children of the device are shutdown.
933

934
  This function is called recursively. Note that we don't cache the
935
  children or such, as oppossed to assemble, shutdown of different
936
  devices doesn't require that the upper device was active.
937

938
  """
939
  r_dev = _RecursiveFindBD(disk)
940
  if r_dev is not None:
941
    r_path = r_dev.dev_path
942
    result = r_dev.Shutdown()
943
    if result:
944
      DevCacheManager.RemoveCache(r_path)
945
  else:
946
    result = True
947
  if disk.children:
948
    for child in disk.children:
949
      result = result and ShutdownBlockDevice(child)
950
  return result
951

    
952

    
953
def MirrorAddChildren(parent_cdev, new_cdevs):
954
  """Extend a mirrored block device.
955

956
  """
957
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
958
  if parent_bdev is None:
959
    logging.error("Can't find parent device")
960
    return False
961
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
962
  if new_bdevs.count(None) > 0:
963
    logging.error("Can't find new device(s) to add: %s:%s",
964
                  new_bdevs, new_cdevs)
965
    return False
966
  parent_bdev.AddChildren(new_bdevs)
967
  return True
968

    
969

    
970
def MirrorRemoveChildren(parent_cdev, new_cdevs):
971
  """Shrink a mirrored block device.
972

973
  """
974
  parent_bdev = _RecursiveFindBD(parent_cdev)
975
  if parent_bdev is None:
976
    logging.error("Can't find parent in remove children: %s", parent_cdev)
977
    return False
978
  devs = []
979
  for disk in new_cdevs:
980
    rpath = disk.StaticDevPath()
981
    if rpath is None:
982
      bd = _RecursiveFindBD(disk)
983
      if bd is None:
984
        logging.error("Can't find dynamic device %s while removing children",
985
                      disk)
986
        return False
987
      else:
988
        devs.append(bd.dev_path)
989
    else:
990
      devs.append(rpath)
991
  parent_bdev.RemoveChildren(devs)
992
  return True
993

    
994

    
995
def GetMirrorStatus(disks):
996
  """Get the mirroring status of a list of devices.
997

998
  Args:
999
    disks: list of `objects.Disk`
1000

1001
  Returns:
1002
    list of (mirror_done, estimated_time) tuples, which
1003
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1004

1005
  """
1006
  stats = []
1007
  for dsk in disks:
1008
    rbd = _RecursiveFindBD(dsk)
1009
    if rbd is None:
1010
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1011
    stats.append(rbd.CombinedSyncStatus())
1012
  return stats
1013

    
1014

    
1015
def _RecursiveFindBD(disk, allow_partial=False):
1016
  """Check if a device is activated.
1017

1018
  If so, return informations about the real device.
1019

1020
  Args:
1021
    disk: the objects.Disk instance
1022
    allow_partial: don't abort the find if a child of the
1023
                   device can't be found; this is intended to be
1024
                   used when repairing mirrors
1025

1026
  Returns:
1027
    None if the device can't be found
1028
    otherwise the device instance
1029

1030
  """
1031
  children = []
1032
  if disk.children:
1033
    for chdisk in disk.children:
1034
      children.append(_RecursiveFindBD(chdisk))
1035

    
1036
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1037

    
1038

    
1039
def FindBlockDevice(disk):
1040
  """Check if a device is activated.
1041

1042
  If so, return informations about the real device.
1043

1044
  Args:
1045
    disk: the objects.Disk instance
1046
  Returns:
1047
    None if the device can't be found
1048
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1049

1050
  """
1051
  rbd = _RecursiveFindBD(disk)
1052
  if rbd is None:
1053
    return rbd
1054
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1055

    
1056

    
1057
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1058
  """Write a file to the filesystem.
1059

1060
  This allows the master to overwrite(!) a file. It will only perform
1061
  the operation if the file belongs to a list of configuration files.
1062

1063
  """
1064
  if not os.path.isabs(file_name):
1065
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1066
                  file_name)
1067
    return False
1068

    
1069
  allowed_files = [
1070
    constants.CLUSTER_CONF_FILE,
1071
    constants.ETC_HOSTS,
1072
    constants.SSH_KNOWN_HOSTS_FILE,
1073
    constants.VNC_PASSWORD_FILE,
1074
    ]
1075

    
1076
  if file_name not in allowed_files:
1077
    logging.error("Filename passed to UploadFile not in allowed"
1078
                 " upload targets: '%s'", file_name)
1079
    return False
1080

    
1081
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1082
                  atime=atime, mtime=mtime)
1083
  return True
1084

    
1085

    
1086
def _ErrnoOrStr(err):
1087
  """Format an EnvironmentError exception.
1088

1089
  If the `err` argument has an errno attribute, it will be looked up
1090
  and converted into a textual EXXXX description. Otherwise the string
1091
  representation of the error will be returned.
1092

1093
  """
1094
  if hasattr(err, 'errno'):
1095
    detail = errno.errorcode[err.errno]
1096
  else:
1097
    detail = str(err)
1098
  return detail
1099

    
1100

    
1101
def _OSOndiskVersion(name, os_dir):
1102
  """Compute and return the API version of a given OS.
1103

1104
  This function will try to read the API version of the os given by
1105
  the 'name' parameter and residing in the 'os_dir' directory.
1106

1107
  Return value will be either an integer denoting the version or None in the
1108
  case when this is not a valid OS name.
1109

1110
  """
1111
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1112

    
1113
  try:
1114
    st = os.stat(api_file)
1115
  except EnvironmentError, err:
1116
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1117
                           " found (%s)" % _ErrnoOrStr(err))
1118

    
1119
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1120
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1121
                           " a regular file")
1122

    
1123
  try:
1124
    f = open(api_file)
1125
    try:
1126
      api_versions = f.readlines()
1127
    finally:
1128
      f.close()
1129
  except EnvironmentError, err:
1130
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1131
                           " API version (%s)" % _ErrnoOrStr(err))
1132

    
1133
  api_versions = [version.strip() for version in api_versions]
1134
  try:
1135
    api_versions = [int(version) for version in api_versions]
1136
  except (TypeError, ValueError), err:
1137
    raise errors.InvalidOS(name, os_dir,
1138
                           "API version is not integer (%s)" % str(err))
1139

    
1140
  return api_versions
1141

    
1142

    
1143
def DiagnoseOS(top_dirs=None):
1144
  """Compute the validity for all OSes.
1145

1146
  Returns an OS object for each name in all the given top directories
1147
  (if not given defaults to constants.OS_SEARCH_PATH)
1148

1149
  Returns:
1150
    list of OS objects
1151

1152
  """
1153
  if top_dirs is None:
1154
    top_dirs = constants.OS_SEARCH_PATH
1155

    
1156
  result = []
1157
  for dir_name in top_dirs:
1158
    if os.path.isdir(dir_name):
1159
      try:
1160
        f_names = utils.ListVisibleFiles(dir_name)
1161
      except EnvironmentError, err:
1162
        logging.exception("Can't list the OS directory %s", dir_name)
1163
        break
1164
      for name in f_names:
1165
        try:
1166
          os_inst = OSFromDisk(name, base_dir=dir_name)
1167
          result.append(os_inst)
1168
        except errors.InvalidOS, err:
1169
          result.append(objects.OS.FromInvalidOS(err))
1170

    
1171
  return result
1172

    
1173

    
1174
def OSFromDisk(name, base_dir=None):
1175
  """Create an OS instance from disk.
1176

1177
  This function will return an OS instance if the given name is a
1178
  valid OS name. Otherwise, it will raise an appropriate
1179
  `errors.InvalidOS` exception, detailing why this is not a valid
1180
  OS.
1181

1182
  @type base_dir: string
1183
  @keyword base_dir: Base directory containing OS installations.
1184
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1185

1186
  """
1187

    
1188
  if base_dir is None:
1189
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1190
    if os_dir is None:
1191
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1192
  else:
1193
    os_dir = os.path.sep.join([base_dir, name])
1194

    
1195
  api_versions = _OSOndiskVersion(name, os_dir)
1196

    
1197
  if constants.OS_API_VERSION not in api_versions:
1198
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1199
                           " (found %s want %s)"
1200
                           % (api_versions, constants.OS_API_VERSION))
1201

    
1202
  # OS Scripts dictionary, we will populate it with the actual script names
1203
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1204

    
1205
  for script in os_scripts:
1206
    os_scripts[script] = os.path.sep.join([os_dir, script])
1207

    
1208
    try:
1209
      st = os.stat(os_scripts[script])
1210
    except EnvironmentError, err:
1211
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1212
                             (script, _ErrnoOrStr(err)))
1213

    
1214
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1215
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1216
                             script)
1217

    
1218
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1219
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1220
                             script)
1221

    
1222

    
1223
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1224
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1225
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1226
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1227
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1228
                    api_versions=api_versions)
1229

    
1230
def OSEnvironment(instance, debug=0):
1231
  """Calculate the environment for an os script.
1232

1233
  @type instance: instance object
1234
  @param instance: target instance for the os script run
1235
  @type debug: integer
1236
  @param debug: debug level (0 or 1, for os api 10)
1237
  @rtype: dict
1238
  @return: dict of environment variables
1239

1240
  """
1241
  result = {}
1242
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1243
  result['INSTANCE_NAME'] = instance.name
1244
  result['HYPERVISOR'] = instance.hypervisor
1245
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1246
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1247
  result['DEBUG_LEVEL'] = '%d' % debug
1248
  for idx, disk in enumerate(instance.disks):
1249
    real_disk = _RecursiveFindBD(disk)
1250
    if real_disk is None:
1251
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1252
                                    str(disk))
1253
    real_disk.Open()
1254
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1255
    # FIXME: When disks will have read-only mode, populate this
1256
    result['DISK_%d_ACCESS' % idx] = 'W'
1257
    if constants.HV_DISK_TYPE in instance.hvparams:
1258
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1259
        instance.hvparams[constants.HV_DISK_TYPE]
1260
    if disk.dev_type in constants.LDS_BLOCK:
1261
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1262
    elif disk.dev_type == constants.LD_FILE:
1263
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1264
        'file:%s' % disk.physical_id[0]
1265
  for idx, nic in enumerate(instance.nics):
1266
    result['NIC_%d_MAC' % idx] = nic.mac
1267
    if nic.ip:
1268
      result['NIC_%d_IP' % idx] = nic.ip
1269
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1270
    if constants.HV_NIC_TYPE in instance.hvparams:
1271
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1272
        instance.hvparams[constants.HV_NIC_TYPE]
1273

    
1274
  return result
1275

    
1276
def GrowBlockDevice(disk, amount):
1277
  """Grow a stack of block devices.
1278

1279
  This function is called recursively, with the childrens being the
1280
  first one resize.
1281

1282
  Args:
1283
    disk: the disk to be grown
1284

1285
  Returns: a tuple of (status, result), with:
1286
    status: the result (true/false) of the operation
1287
    result: the error message if the operation failed, otherwise not used
1288

1289
  """
1290
  r_dev = _RecursiveFindBD(disk)
1291
  if r_dev is None:
1292
    return False, "Cannot find block device %s" % (disk,)
1293

    
1294
  try:
1295
    r_dev.Grow(amount)
1296
  except errors.BlockDeviceError, err:
1297
    return False, str(err)
1298

    
1299
  return True, None
1300

    
1301

    
1302
def SnapshotBlockDevice(disk):
1303
  """Create a snapshot copy of a block device.
1304

1305
  This function is called recursively, and the snapshot is actually created
1306
  just for the leaf lvm backend device.
1307

1308
  Args:
1309
    disk: the disk to be snapshotted
1310

1311
  Returns:
1312
    a config entry for the actual lvm device snapshotted.
1313

1314
  """
1315
  if disk.children:
1316
    if len(disk.children) == 1:
1317
      # only one child, let's recurse on it
1318
      return SnapshotBlockDevice(disk.children[0])
1319
    else:
1320
      # more than one child, choose one that matches
1321
      for child in disk.children:
1322
        if child.size == disk.size:
1323
          # return implies breaking the loop
1324
          return SnapshotBlockDevice(child)
1325
  elif disk.dev_type == constants.LD_LV:
1326
    r_dev = _RecursiveFindBD(disk)
1327
    if r_dev is not None:
1328
      # let's stay on the safe side and ask for the full size, for now
1329
      return r_dev.Snapshot(disk.size)
1330
    else:
1331
      return None
1332
  else:
1333
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1334
                                 " '%s' of type '%s'" %
1335
                                 (disk.unique_id, disk.dev_type))
1336

    
1337

    
1338
def ExportSnapshot(disk, dest_node, instance, cluster_name):
1339
  """Export a block device snapshot to a remote node.
1340

1341
  Args:
1342
    disk: the snapshot block device
1343
    dest_node: the node to send the image to
1344
    instance: instance being exported
1345

1346
  Returns:
1347
    True if successful, False otherwise.
1348

1349
  """
1350
  # TODO(ultrotter): Import/Export still to be converted to OS API 10
1351
  logging.error("Import/Export still to be converted to OS API 10")
1352
  return False
1353

    
1354
  inst_os = OSFromDisk(instance.os)
1355
  export_script = inst_os.export_script
1356

    
1357
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1358
                                     instance.name, int(time.time()))
1359
  if not os.path.exists(constants.LOG_OS_DIR):
1360
    os.mkdir(constants.LOG_OS_DIR, 0750)
1361

    
1362
  real_os_dev = _RecursiveFindBD(disk)
1363
  if real_os_dev is None:
1364
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1365
                                  str(disk))
1366
  real_os_dev.Open()
1367

    
1368
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1369
  destfile = disk.physical_id[1]
1370

    
1371
  # the target command is built out of three individual commands,
1372
  # which are joined by pipes; we check each individual command for
1373
  # valid parameters
1374

    
1375
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1376
                               export_script, instance.name,
1377
                               real_os_dev.dev_path, logfile)
1378

    
1379
  comprcmd = "gzip"
1380

    
1381
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1382
                                destdir, destdir, destfile)
1383
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1384
                                                   constants.GANETI_RUNAS,
1385
                                                   destcmd)
1386

    
1387
  # all commands have been checked, so we're safe to combine them
1388
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1389

    
1390
  result = utils.RunCmd(command)
1391

    
1392
  if result.failed:
1393
    logging.error("os snapshot export command '%s' returned error: %s"
1394
                  " output: %s", command, result.fail_reason, result.output)
1395
    return False
1396

    
1397
  return True
1398

    
1399

    
1400
def FinalizeExport(instance, snap_disks):
1401
  """Write out the export configuration information.
1402

1403
  Args:
1404
    instance: instance configuration
1405
    snap_disks: snapshot block devices
1406

1407
  Returns:
1408
    False in case of error, True otherwise.
1409

1410
  """
1411
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1412
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1413

    
1414
  config = objects.SerializableConfigParser()
1415

    
1416
  config.add_section(constants.INISECT_EXP)
1417
  config.set(constants.INISECT_EXP, 'version', '0')
1418
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1419
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1420
  config.set(constants.INISECT_EXP, 'os', instance.os)
1421
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1422

    
1423
  config.add_section(constants.INISECT_INS)
1424
  config.set(constants.INISECT_INS, 'name', instance.name)
1425
  config.set(constants.INISECT_INS, 'memory', '%d' %
1426
             instance.beparams[constants.BE_MEMORY])
1427
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1428
             instance.beparams[constants.BE_VCPUS])
1429
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1430

    
1431
  nic_count = 0
1432
  for nic_count, nic in enumerate(instance.nics):
1433
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1434
               nic_count, '%s' % nic.mac)
1435
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1436
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1437
               '%s' % nic.bridge)
1438
  # TODO: redundant: on load can read nics until it doesn't exist
1439
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1440

    
1441
  disk_count = 0
1442
  for disk_count, disk in enumerate(snap_disks):
1443
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1444
               ('%s' % disk.iv_name))
1445
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1446
               ('%s' % disk.physical_id[1]))
1447
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1448
               ('%d' % disk.size))
1449
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1450

    
1451
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1452
  cfo = open(cff, 'w')
1453
  try:
1454
    config.write(cfo)
1455
  finally:
1456
    cfo.close()
1457

    
1458
  shutil.rmtree(finaldestdir, True)
1459
  shutil.move(destdir, finaldestdir)
1460

    
1461
  return True
1462

    
1463

    
1464
def ExportInfo(dest):
1465
  """Get export configuration information.
1466

1467
  Args:
1468
    dest: directory containing the export
1469

1470
  Returns:
1471
    A serializable config file containing the export info.
1472

1473
  """
1474
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1475

    
1476
  config = objects.SerializableConfigParser()
1477
  config.read(cff)
1478

    
1479
  if (not config.has_section(constants.INISECT_EXP) or
1480
      not config.has_section(constants.INISECT_INS)):
1481
    return None
1482

    
1483
  return config
1484

    
1485

    
1486
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1487
                         cluster_name):
1488
  """Import an os image into an instance.
1489

1490
  Args:
1491
    instance: the instance object
1492
    os_disk: the instance-visible name of the os device
1493
    swap_disk: the instance-visible name of the swap device
1494
    src_node: node holding the source image
1495
    src_image: path to the source image on src_node
1496

1497
  Returns:
1498
    False in case of error, True otherwise.
1499

1500
  """
1501
  # TODO(ultrotter): Import/Export still to be converted to OS API 10
1502
  logging.error("Import/Export still to be converted to OS API 10")
1503
  return False
1504

    
1505
  inst_os = OSFromDisk(instance.os)
1506
  import_script = inst_os.import_script
1507

    
1508
  os_device = instance.FindDisk(os_disk)
1509
  if os_device is None:
1510
    logging.error("Can't find this device-visible name '%s'", os_disk)
1511
    return False
1512

    
1513
  swap_device = instance.FindDisk(swap_disk)
1514
  if swap_device is None:
1515
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1516
    return False
1517

    
1518
  real_os_dev = _RecursiveFindBD(os_device)
1519
  if real_os_dev is None:
1520
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1521
                                  str(os_device))
1522
  real_os_dev.Open()
1523

    
1524
  real_swap_dev = _RecursiveFindBD(swap_device)
1525
  if real_swap_dev is None:
1526
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1527
                                  str(swap_device))
1528
  real_swap_dev.Open()
1529

    
1530
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1531
                                        instance.name, int(time.time()))
1532
  if not os.path.exists(constants.LOG_OS_DIR):
1533
    os.mkdir(constants.LOG_OS_DIR, 0750)
1534

    
1535
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1536
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1537
                                                   constants.GANETI_RUNAS,
1538
                                                   destcmd)
1539

    
1540
  comprcmd = "gunzip"
1541
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1542
                               inst_os.path, import_script, instance.name,
1543
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1544
                               logfile)
1545

    
1546
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1547
  env = {'HYPERVISOR': instance.hypervisor}
1548

    
1549
  result = utils.RunCmd(command, env=env)
1550

    
1551
  if result.failed:
1552
    logging.error("os import command '%s' returned error: %s"
1553
                  " output: %s", command, result.fail_reason, result.output)
1554
    return False
1555

    
1556
  return True
1557

    
1558

    
1559
def ListExports():
1560
  """Return a list of exports currently available on this machine.
1561

1562
  """
1563
  if os.path.isdir(constants.EXPORT_DIR):
1564
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1565
  else:
1566
    return []
1567

    
1568

    
1569
def RemoveExport(export):
1570
  """Remove an existing export from the node.
1571

1572
  Args:
1573
    export: the name of the export to remove
1574

1575
  Returns:
1576
    False in case of error, True otherwise.
1577

1578
  """
1579
  target = os.path.join(constants.EXPORT_DIR, export)
1580

    
1581
  shutil.rmtree(target)
1582
  # TODO: catch some of the relevant exceptions and provide a pretty
1583
  # error message if rmtree fails.
1584

    
1585
  return True
1586

    
1587

    
1588
def RenameBlockDevices(devlist):
1589
  """Rename a list of block devices.
1590

1591
  The devlist argument is a list of tuples (disk, new_logical,
1592
  new_physical). The return value will be a combined boolean result
1593
  (True only if all renames succeeded).
1594

1595
  """
1596
  result = True
1597
  for disk, unique_id in devlist:
1598
    dev = _RecursiveFindBD(disk)
1599
    if dev is None:
1600
      result = False
1601
      continue
1602
    try:
1603
      old_rpath = dev.dev_path
1604
      dev.Rename(unique_id)
1605
      new_rpath = dev.dev_path
1606
      if old_rpath != new_rpath:
1607
        DevCacheManager.RemoveCache(old_rpath)
1608
        # FIXME: we should add the new cache information here, like:
1609
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1610
        # but we don't have the owner here - maybe parse from existing
1611
        # cache? for now, we only lose lvm data when we rename, which
1612
        # is less critical than DRBD or MD
1613
    except errors.BlockDeviceError, err:
1614
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1615
      result = False
1616
  return result
1617

    
1618

    
1619
def _TransformFileStorageDir(file_storage_dir):
1620
  """Checks whether given file_storage_dir is valid.
1621

1622
  Checks wheter the given file_storage_dir is within the cluster-wide
1623
  default file_storage_dir stored in SimpleStore. Only paths under that
1624
  directory are allowed.
1625

1626
  Args:
1627
    file_storage_dir: string with path
1628

1629
  Returns:
1630
    normalized file_storage_dir (string) if valid, None otherwise
1631

1632
  """
1633
  cfg = _GetConfig()
1634
  file_storage_dir = os.path.normpath(file_storage_dir)
1635
  base_file_storage_dir = cfg.GetFileStorageDir()
1636
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1637
      base_file_storage_dir):
1638
    logging.error("file storage directory '%s' is not under base file"
1639
                  " storage directory '%s'",
1640
                  file_storage_dir, base_file_storage_dir)
1641
    return None
1642
  return file_storage_dir
1643

    
1644

    
1645
def CreateFileStorageDir(file_storage_dir):
1646
  """Create file storage directory.
1647

1648
  Args:
1649
    file_storage_dir: string containing the path
1650

1651
  Returns:
1652
    tuple with first element a boolean indicating wheter dir
1653
    creation was successful or not
1654

1655
  """
1656
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1657
  result = True,
1658
  if not file_storage_dir:
1659
    result = False,
1660
  else:
1661
    if os.path.exists(file_storage_dir):
1662
      if not os.path.isdir(file_storage_dir):
1663
        logging.error("'%s' is not a directory", file_storage_dir)
1664
        result = False,
1665
    else:
1666
      try:
1667
        os.makedirs(file_storage_dir, 0750)
1668
      except OSError, err:
1669
        logging.error("Cannot create file storage directory '%s': %s",
1670
                      file_storage_dir, err)
1671
        result = False,
1672
  return result
1673

    
1674

    
1675
def RemoveFileStorageDir(file_storage_dir):
1676
  """Remove file storage directory.
1677

1678
  Remove it only if it's empty. If not log an error and return.
1679

1680
  Args:
1681
    file_storage_dir: string containing the path
1682

1683
  Returns:
1684
    tuple with first element a boolean indicating wheter dir
1685
    removal was successful or not
1686

1687
  """
1688
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1689
  result = True,
1690
  if not file_storage_dir:
1691
    result = False,
1692
  else:
1693
    if os.path.exists(file_storage_dir):
1694
      if not os.path.isdir(file_storage_dir):
1695
        logging.error("'%s' is not a directory", file_storage_dir)
1696
        result = False,
1697
      # deletes dir only if empty, otherwise we want to return False
1698
      try:
1699
        os.rmdir(file_storage_dir)
1700
      except OSError, err:
1701
        logging.exception("Cannot remove file storage directory '%s'",
1702
                          file_storage_dir)
1703
        result = False,
1704
  return result
1705

    
1706

    
1707
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1708
  """Rename the file storage directory.
1709

1710
  Args:
1711
    old_file_storage_dir: string containing the old path
1712
    new_file_storage_dir: string containing the new path
1713

1714
  Returns:
1715
    tuple with first element a boolean indicating wheter dir
1716
    rename was successful or not
1717

1718
  """
1719
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1720
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1721
  result = True,
1722
  if not old_file_storage_dir or not new_file_storage_dir:
1723
    result = False,
1724
  else:
1725
    if not os.path.exists(new_file_storage_dir):
1726
      if os.path.isdir(old_file_storage_dir):
1727
        try:
1728
          os.rename(old_file_storage_dir, new_file_storage_dir)
1729
        except OSError, err:
1730
          logging.exception("Cannot rename '%s' to '%s'",
1731
                            old_file_storage_dir, new_file_storage_dir)
1732
          result =  False,
1733
      else:
1734
        logging.error("'%s' is not a directory", old_file_storage_dir)
1735
        result = False,
1736
    else:
1737
      if os.path.exists(old_file_storage_dir):
1738
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1739
                      old_file_storage_dir, new_file_storage_dir)
1740
        result = False,
1741
  return result
1742

    
1743

    
1744
def _IsJobQueueFile(file_name):
1745
  """Checks whether the given filename is in the queue directory.
1746

1747
  """
1748
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1749
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1750

    
1751
  if not result:
1752
    logging.error("'%s' is not a file in the queue directory",
1753
                  file_name)
1754

    
1755
  return result
1756

    
1757

    
1758
def JobQueueUpdate(file_name, content):
1759
  """Updates a file in the queue directory.
1760

1761
  """
1762
  if not _IsJobQueueFile(file_name):
1763
    return False
1764

    
1765
  # Write and replace the file atomically
1766
  utils.WriteFile(file_name, data=content)
1767

    
1768
  return True
1769

    
1770

    
1771
def JobQueueRename(old, new):
1772
  """Renames a job queue file.
1773

1774
  """
1775
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1776
    return False
1777

    
1778
  os.rename(old, new)
1779

    
1780
  return True
1781

    
1782

    
1783
def JobQueueSetDrainFlag(drain_flag):
1784
  """Set the drain flag for the queue.
1785

1786
  This will set or unset the queue drain flag.
1787

1788
  @type drain_flag: bool
1789
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
1790

1791
  """
1792
  if drain_flag:
1793
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1794
  else:
1795
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1796

    
1797
  return True
1798

    
1799

    
1800
def CloseBlockDevices(disks):
1801
  """Closes the given block devices.
1802

1803
  This means they will be switched to secondary mode (in case of DRBD).
1804

1805
  """
1806
  bdevs = []
1807
  for cf in disks:
1808
    rd = _RecursiveFindBD(cf)
1809
    if rd is None:
1810
      return (False, "Can't find device %s" % cf)
1811
    bdevs.append(rd)
1812

    
1813
  msg = []
1814
  for rd in bdevs:
1815
    try:
1816
      rd.Close()
1817
    except errors.BlockDeviceError, err:
1818
      msg.append(str(err))
1819
  if msg:
1820
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1821
  else:
1822
    return (True, "All devices secondary")
1823

    
1824

    
1825
def ValidateHVParams(hvname, hvparams):
1826
  """Validates the given hypervisor parameters.
1827

1828
  @type hvname: string
1829
  @param hvname: the hypervisor name
1830
  @type hvparams: dict
1831
  @param hvparams: the hypervisor parameters to be validated
1832
  @rtype: tuple (bool, str)
1833
  @return: tuple of (success, message)
1834

1835
  """
1836
  try:
1837
    hv_type = hypervisor.GetHypervisor(hvname)
1838
    hv_type.ValidateParameters(hvparams)
1839
    return (True, "Validation passed")
1840
  except errors.HypervisorError, err:
1841
    return (False, str(err))
1842

    
1843

    
1844
class HooksRunner(object):
1845
  """Hook runner.
1846

1847
  This class is instantiated on the node side (ganeti-noded) and not on
1848
  the master side.
1849

1850
  """
1851
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1852

    
1853
  def __init__(self, hooks_base_dir=None):
1854
    """Constructor for hooks runner.
1855

1856
    Args:
1857
      - hooks_base_dir: if not None, this overrides the
1858
        constants.HOOKS_BASE_DIR (useful for unittests)
1859

1860
    """
1861
    if hooks_base_dir is None:
1862
      hooks_base_dir = constants.HOOKS_BASE_DIR
1863
    self._BASE_DIR = hooks_base_dir
1864

    
1865
  @staticmethod
1866
  def ExecHook(script, env):
1867
    """Exec one hook script.
1868

1869
    Args:
1870
     - script: the full path to the script
1871
     - env: the environment with which to exec the script
1872

1873
    """
1874
    # exec the process using subprocess and log the output
1875
    fdstdin = None
1876
    try:
1877
      fdstdin = open("/dev/null", "r")
1878
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1879
                               stderr=subprocess.STDOUT, close_fds=True,
1880
                               shell=False, cwd="/", env=env)
1881
      output = ""
1882
      try:
1883
        output = child.stdout.read(4096)
1884
        child.stdout.close()
1885
      except EnvironmentError, err:
1886
        output += "Hook script error: %s" % str(err)
1887

    
1888
      while True:
1889
        try:
1890
          result = child.wait()
1891
          break
1892
        except EnvironmentError, err:
1893
          if err.errno == errno.EINTR:
1894
            continue
1895
          raise
1896
    finally:
1897
      # try not to leak fds
1898
      for fd in (fdstdin, ):
1899
        if fd is not None:
1900
          try:
1901
            fd.close()
1902
          except EnvironmentError, err:
1903
            # just log the error
1904
            #logging.exception("Error while closing fd %s", fd)
1905
            pass
1906

    
1907
    return result == 0, output
1908

    
1909
  def RunHooks(self, hpath, phase, env):
1910
    """Run the scripts in the hooks directory.
1911

1912
    This method will not be usually overriden by child opcodes.
1913

1914
    """
1915
    if phase == constants.HOOKS_PHASE_PRE:
1916
      suffix = "pre"
1917
    elif phase == constants.HOOKS_PHASE_POST:
1918
      suffix = "post"
1919
    else:
1920
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1921
    rr = []
1922

    
1923
    subdir = "%s-%s.d" % (hpath, suffix)
1924
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1925
    try:
1926
      dir_contents = utils.ListVisibleFiles(dir_name)
1927
    except OSError, err:
1928
      # must log
1929
      return rr
1930

    
1931
    # we use the standard python sort order,
1932
    # so 00name is the recommended naming scheme
1933
    dir_contents.sort()
1934
    for relname in dir_contents:
1935
      fname = os.path.join(dir_name, relname)
1936
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1937
          self.RE_MASK.match(relname) is not None):
1938
        rrval = constants.HKR_SKIP
1939
        output = ""
1940
      else:
1941
        result, output = self.ExecHook(fname, env)
1942
        if not result:
1943
          rrval = constants.HKR_FAIL
1944
        else:
1945
          rrval = constants.HKR_SUCCESS
1946
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1947

    
1948
    return rr
1949

    
1950

    
1951
class IAllocatorRunner(object):
1952
  """IAllocator runner.
1953

1954
  This class is instantiated on the node side (ganeti-noded) and not on
1955
  the master side.
1956

1957
  """
1958
  def Run(self, name, idata):
1959
    """Run an iallocator script.
1960

1961
    Return value: tuple of:
1962
       - run status (one of the IARUN_ constants)
1963
       - stdout
1964
       - stderr
1965
       - fail reason (as from utils.RunResult)
1966

1967
    """
1968
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1969
                                  os.path.isfile)
1970
    if alloc_script is None:
1971
      return (constants.IARUN_NOTFOUND, None, None, None)
1972

    
1973
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1974
    try:
1975
      os.write(fd, idata)
1976
      os.close(fd)
1977
      result = utils.RunCmd([alloc_script, fin_name])
1978
      if result.failed:
1979
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1980
                result.fail_reason)
1981
    finally:
1982
      os.unlink(fin_name)
1983

    
1984
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1985

    
1986

    
1987
class DevCacheManager(object):
1988
  """Simple class for managing a cache of block device information.
1989

1990
  """
1991
  _DEV_PREFIX = "/dev/"
1992
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1993

    
1994
  @classmethod
1995
  def _ConvertPath(cls, dev_path):
1996
    """Converts a /dev/name path to the cache file name.
1997

1998
    This replaces slashes with underscores and strips the /dev
1999
    prefix. It then returns the full path to the cache file
2000

2001
    """
2002
    if dev_path.startswith(cls._DEV_PREFIX):
2003
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2004
    dev_path = dev_path.replace("/", "_")
2005
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2006
    return fpath
2007

    
2008
  @classmethod
2009
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2010
    """Updates the cache information for a given device.
2011

2012
    """
2013
    if dev_path is None:
2014
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2015
      return
2016
    fpath = cls._ConvertPath(dev_path)
2017
    if on_primary:
2018
      state = "primary"
2019
    else:
2020
      state = "secondary"
2021
    if iv_name is None:
2022
      iv_name = "not_visible"
2023
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2024
    try:
2025
      utils.WriteFile(fpath, data=fdata)
2026
    except EnvironmentError, err:
2027
      logging.exception("Can't update bdev cache for %s", dev_path)
2028

    
2029
  @classmethod
2030
  def RemoveCache(cls, dev_path):
2031
    """Remove data for a dev_path.
2032

2033
    """
2034
    if dev_path is None:
2035
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2036
      return
2037
    fpath = cls._ConvertPath(dev_path)
2038
    try:
2039
      utils.RemoveFile(fpath)
2040
    except EnvironmentError, err:
2041
      logging.exception("Can't update bdev cache for %s", dev_path)