Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 082a7f91

History | View | Annotate | Download (57.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetConfig():
48
  return ssconf.SimpleConfigReader()
49

    
50

    
51
def _GetSshRunner(cluster_name):
52
  return ssh.SshRunner(cluster_name)
53

    
54

    
55
def _CleanDirectory(path, exclude=[]):
56
  """Removes all regular files in a directory.
57

58
  @param exclude: List of files to be excluded.
59
  @type exclude: list
60

61
  """
62
  if not os.path.isdir(path):
63
    return
64

    
65
  # Normalize excluded paths
66
  exclude = [os.path.normpath(i) for i in exclude]
67

    
68
  for rel_name in utils.ListVisibleFiles(path):
69
    full_name = os.path.normpath(os.path.join(path, rel_name))
70
    if full_name in exclude:
71
      continue
72
    if os.path.isfile(full_name) and not os.path.islink(full_name):
73
      utils.RemoveFile(full_name)
74

    
75

    
76
def JobQueuePurge():
77
  """Removes job queue files and archived jobs
78

79
  """
80
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82

    
83

    
84
def GetMasterInfo():
85
  """Returns master information.
86

87
  This is an utility function to compute master information, either
88
  for consumption here or from the node daemon.
89

90
  @rtype: tuple
91
  @return: (master_netdev, master_ip, master_name)
92

93
  """
94
  try:
95
    cfg = _GetConfig()
96
    master_netdev = cfg.GetMasterNetdev()
97
    master_ip = cfg.GetMasterIP()
98
    master_node = cfg.GetMasterNode()
99
  except errors.ConfigurationError, err:
100
    logging.exception("Cluster configuration incomplete")
101
    return (None, None)
102
  return (master_netdev, master_ip, master_node)
103

    
104

    
105
def StartMaster(start_daemons):
106
  """Activate local node as master node.
107

108
  The function will always try activate the IP address of the master
109
  (if someone else has it, then it won't). Then, if the start_daemons
110
  parameter is True, it will also start the master daemons
111
  (ganet-masterd and ganeti-rapi).
112

113
  """
114
  ok = True
115
  master_netdev, master_ip, _ = GetMasterInfo()
116
  if not master_netdev:
117
    return False
118

    
119
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120
    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
121
                     source=constants.LOCALHOST_IP_ADDRESS):
122
      # we already have the ip:
123
      logging.debug("Already started")
124
    else:
125
      logging.error("Someone else has the master ip, not activating")
126
      ok = False
127
  else:
128
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
129
                           "dev", master_netdev, "label",
130
                           "%s:0" % master_netdev])
131
    if result.failed:
132
      logging.error("Can't activate master IP: %s", result.output)
133
      ok = False
134

    
135
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
136
                           "-s", master_ip, master_ip])
137
    # we'll ignore the exit code of arping
138

    
139
  # and now start the master and rapi daemons
140
  if start_daemons:
141
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
142
      result = utils.RunCmd([daemon])
143
      if result.failed:
144
        logging.error("Can't start daemon %s: %s", daemon, result.output)
145
        ok = False
146
  return ok
147

    
148

    
149
def StopMaster(stop_daemons):
150
  """Deactivate this node as master.
151

152
  The function will always try to deactivate the IP address of the
153
  master. Then, if the stop_daemons parameter is True, it will also
154
  stop the master daemons (ganet-masterd and ganeti-rapi).
155

156
  """
157
  master_netdev, master_ip, _ = GetMasterInfo()
158
  if not master_netdev:
159
    return False
160

    
161
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
162
                         "dev", master_netdev])
163
  if result.failed:
164
    logging.error("Can't remove the master IP, error: %s", result.output)
165
    # but otherwise ignore the failure
166

    
167
  if stop_daemons:
168
    # stop/kill the rapi and the master daemon
169
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
170
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
171

    
172
  return True
173

    
174

    
175
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
176
  """Joins this node to the cluster.
177

178
  This does the following:
179
      - updates the hostkeys of the machine (rsa and dsa)
180
      - adds the ssh private key to the user
181
      - adds the ssh public key to the users' authorized_keys file
182

183
  """
184
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
185
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
186
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
187
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
188
  for name, content, mode in sshd_keys:
189
    utils.WriteFile(name, data=content, mode=mode)
190

    
191
  try:
192
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
193
                                                    mkdir=True)
194
  except errors.OpExecError, err:
195
    logging.exception("Error while processing user ssh files")
196
    return False
197

    
198
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
199
    utils.WriteFile(name, data=content, mode=0600)
200

    
201
  utils.AddAuthorizedKey(auth_keys, sshpub)
202

    
203
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
204

    
205
  return True
206

    
207

    
208
def LeaveCluster():
209
  """Cleans up the current node and prepares it to be removed from the cluster.
210

211
  """
212
  _CleanDirectory(constants.DATA_DIR)
213
  JobQueuePurge()
214

    
215
  try:
216
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
217
  except errors.OpExecError:
218
    logging.exception("Error while processing ssh files")
219
    return
220

    
221
  f = open(pub_key, 'r')
222
  try:
223
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
224
  finally:
225
    f.close()
226

    
227
  utils.RemoveFile(priv_key)
228
  utils.RemoveFile(pub_key)
229

    
230
  # Return a reassuring string to the caller, and quit
231
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
232

    
233

    
234
def GetNodeInfo(vgname, hypervisor_type):
235
  """Gives back a hash with different informations about the node.
236

237
  @type vgname: C{string}
238
  @param vgname: the name of the volume group to ask for disk space information
239
  @type hypervisor_type: C{str}
240
  @param hypervisor_type: the name of the hypervisor to ask for
241
      memory information
242
  @rtype: C{dict}
243
  @return: dictionary with the following keys:
244
      - vg_size is the size of the configured volume group in MiB
245
      - vg_free is the free size of the volume group in MiB
246
      - memory_dom0 is the memory allocated for domain0 in MiB
247
      - memory_free is the currently available (free) ram in MiB
248
      - memory_total is the total number of ram in MiB
249

250
  """
251
  outputarray = {}
252
  vginfo = _GetVGInfo(vgname)
253
  outputarray['vg_size'] = vginfo['vg_size']
254
  outputarray['vg_free'] = vginfo['vg_free']
255

    
256
  hyper = hypervisor.GetHypervisor(hypervisor_type)
257
  hyp_info = hyper.GetNodeInfo()
258
  if hyp_info is not None:
259
    outputarray.update(hyp_info)
260

    
261
  f = open("/proc/sys/kernel/random/boot_id", 'r')
262
  try:
263
    outputarray["bootid"] = f.read(128).rstrip("\n")
264
  finally:
265
    f.close()
266

    
267
  return outputarray
268

    
269

    
270
def VerifyNode(what, cluster_name):
271
  """Verify the status of the local node.
272

273
  Based on the input L{what} parameter, various checks are done on the
274
  local node.
275

276
  If the I{filelist} key is present, this list of
277
  files is checksummed and the file/checksum pairs are returned.
278

279
  If the I{nodelist} key is present, we check that we have
280
  connectivity via ssh with the target nodes (and check the hostname
281
  report).
282

283
  If the I{node-net-test} key is present, we check that we have
284
  connectivity to the given nodes via both primary IP and, if
285
  applicable, secondary IPs.
286

287
  @type what: C{dict}
288
  @param what: a dictionary of things to check:
289
      - filelist: list of files for which to compute checksums
290
      - nodelist: list of nodes we should check ssh communication with
291
      - node-net-test: list of nodes we should check node daemon port
292
        connectivity with
293
      - hypervisor: list with hypervisors to run the verify for
294

295
  """
296
  result = {}
297

    
298
  if 'hypervisor' in what:
299
    result['hypervisor'] = my_dict = {}
300
    for hv_name in what['hypervisor']:
301
      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
302

    
303
  if 'filelist' in what:
304
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
305

    
306
  if 'nodelist' in what:
307
    result['nodelist'] = {}
308
    random.shuffle(what['nodelist'])
309
    for node in what['nodelist']:
310
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
311
      if not success:
312
        result['nodelist'][node] = message
313
  if 'node-net-test' in what:
314
    result['node-net-test'] = {}
315
    my_name = utils.HostInfo().name
316
    my_pip = my_sip = None
317
    for name, pip, sip in what['node-net-test']:
318
      if name == my_name:
319
        my_pip = pip
320
        my_sip = sip
321
        break
322
    if not my_pip:
323
      result['node-net-test'][my_name] = ("Can't find my own"
324
                                          " primary/secondary IP"
325
                                          " in the node list")
326
    else:
327
      port = utils.GetNodeDaemonPort()
328
      for name, pip, sip in what['node-net-test']:
329
        fail = []
330
        if not utils.TcpPing(pip, port, source=my_pip):
331
          fail.append("primary")
332
        if sip != pip:
333
          if not utils.TcpPing(sip, port, source=my_sip):
334
            fail.append("secondary")
335
        if fail:
336
          result['node-net-test'][name] = ("failure using the %s"
337
                                           " interface(s)" %
338
                                           " and ".join(fail))
339

    
340
  return result
341

    
342

    
343
def GetVolumeList(vg_name):
344
  """Compute list of logical volumes and their size.
345

346
  Returns:
347
    dictionary of all partions (key) with their size (in MiB), inactive
348
    and online status:
349
    {'test1': ('20.06', True, True)}
350

351
  """
352
  lvs = {}
353
  sep = '|'
354
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
355
                         "--separator=%s" % sep,
356
                         "-olv_name,lv_size,lv_attr", vg_name])
357
  if result.failed:
358
    logging.error("Failed to list logical volumes, lvs output: %s",
359
                  result.output)
360
    return result.output
361

    
362
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
363
  for line in result.stdout.splitlines():
364
    line = line.strip()
365
    match = valid_line_re.match(line)
366
    if not match:
367
      logging.error("Invalid line returned from lvs output: '%s'", line)
368
      continue
369
    name, size, attr = match.groups()
370
    inactive = attr[4] == '-'
371
    online = attr[5] == 'o'
372
    lvs[name] = (size, inactive, online)
373

    
374
  return lvs
375

    
376

    
377
def ListVolumeGroups():
378
  """List the volume groups and their size.
379

380
  Returns:
381
    Dictionary with keys volume name and values the size of the volume
382

383
  """
384
  return utils.ListVolumeGroups()
385

    
386

    
387
def NodeVolumes():
388
  """List all volumes on this node.
389

390
  """
391
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
392
                         "--separator=|",
393
                         "--options=lv_name,lv_size,devices,vg_name"])
394
  if result.failed:
395
    logging.error("Failed to list logical volumes, lvs output: %s",
396
                  result.output)
397
    return {}
398

    
399
  def parse_dev(dev):
400
    if '(' in dev:
401
      return dev.split('(')[0]
402
    else:
403
      return dev
404

    
405
  def map_line(line):
406
    return {
407
      'name': line[0].strip(),
408
      'size': line[1].strip(),
409
      'dev': parse_dev(line[2].strip()),
410
      'vg': line[3].strip(),
411
    }
412

    
413
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
414
          if line.count('|') >= 3]
415

    
416

    
417
def BridgesExist(bridges_list):
418
  """Check if a list of bridges exist on the current node.
419

420
  Returns:
421
    True if all of them exist, false otherwise
422

423
  """
424
  for bridge in bridges_list:
425
    if not utils.BridgeExists(bridge):
426
      return False
427

    
428
  return True
429

    
430

    
431
def GetInstanceList(hypervisor_list):
432
  """Provides a list of instances.
433

434
  @type hypervisor_list: list
435
  @param hypervisor_list: the list of hypervisors to query information
436

437
  @rtype: list
438
  @return: a list of all running instances on the current node
439
             - instance1.example.com
440
             - instance2.example.com
441

442
  """
443
  results = []
444
  for hname in hypervisor_list:
445
    try:
446
      names = hypervisor.GetHypervisor(hname).ListInstances()
447
      results.extend(names)
448
    except errors.HypervisorError, err:
449
      logging.exception("Error enumerating instances for hypevisor %s", hname)
450
      # FIXME: should we somehow not propagate this to the master?
451
      raise
452

    
453
  return results
454

    
455

    
456
def GetInstanceInfo(instance, hname):
457
  """Gives back the informations about an instance as a dictionary.
458

459
  @type instance: string
460
  @param instance: the instance name
461
  @type hname: string
462
  @param hname: the hypervisor type of the instance
463

464
  @rtype: dict
465
  @return: dictionary with the following keys:
466
      - memory: memory size of instance (int)
467
      - state: xen state of instance (string)
468
      - time: cpu time of instance (float)
469

470
  """
471
  output = {}
472

    
473
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
474
  if iinfo is not None:
475
    output['memory'] = iinfo[2]
476
    output['state'] = iinfo[4]
477
    output['time'] = iinfo[5]
478

    
479
  return output
480

    
481

    
482
def GetAllInstancesInfo(hypervisor_list):
483
  """Gather data about all instances.
484

485
  This is the equivalent of `GetInstanceInfo()`, except that it
486
  computes data for all instances at once, thus being faster if one
487
  needs data about more than one instance.
488

489
  @type hypervisor_list: list
490
  @param hypervisor_list: list of hypervisors to query for instance data
491

492
  @rtype: dict of dicts
493
  @return: dictionary of instance: data, with data having the following keys:
494
      - memory: memory size of instance (int)
495
      - state: xen state of instance (string)
496
      - time: cpu time of instance (float)
497
      - vcpuus: the number of vcpus
498

499
  """
500
  output = {}
501

    
502
  for hname in hypervisor_list:
503
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
504
    if iinfo:
505
      for name, inst_id, memory, vcpus, state, times in iinfo:
506
        if name in output:
507
          raise errors.HypervisorError("Instance %s running duplicate" % name)
508
        output[name] = {
509
          'memory': memory,
510
          'vcpus': vcpus,
511
          'state': state,
512
          'time': times,
513
          }
514

    
515
  return output
516

    
517

    
518
def AddOSToInstance(instance, os_disk, swap_disk):
519
  """Add an OS to an instance.
520

521
  Args:
522
    instance: the instance object
523
    os_disk: the instance-visible name of the os device
524
    swap_disk: the instance-visible name of the swap device
525

526
  """
527
  inst_os = OSFromDisk(instance.os)
528

    
529
  create_script = inst_os.create_script
530

    
531
  os_device = instance.FindDisk(os_disk)
532
  if os_device is None:
533
    logging.error("Can't find this device-visible name '%s'", os_disk)
534
    return False
535

    
536
  swap_device = instance.FindDisk(swap_disk)
537
  if swap_device is None:
538
    logging.error("Can't find this device-visible name '%s'", swap_disk)
539
    return False
540

    
541
  real_os_dev = _RecursiveFindBD(os_device)
542
  if real_os_dev is None:
543
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
544
                                  str(os_device))
545
  real_os_dev.Open()
546

    
547
  real_swap_dev = _RecursiveFindBD(swap_device)
548
  if real_swap_dev is None:
549
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
550
                                  str(swap_device))
551
  real_swap_dev.Open()
552

    
553
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
554
                                     instance.name, int(time.time()))
555
  if not os.path.exists(constants.LOG_OS_DIR):
556
    os.mkdir(constants.LOG_OS_DIR, 0750)
557

    
558
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
559
                                inst_os.path, create_script, instance.name,
560
                                real_os_dev.dev_path, real_swap_dev.dev_path,
561
                                logfile)
562
  env = {'HYPERVISOR': instance.hypervisor}
563

    
564
  result = utils.RunCmd(command, env=env)
565
  if result.failed:
566
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
567
                  " output: %s", command, result.fail_reason, logfile,
568
                  result.output)
569
    return False
570

    
571
  return True
572

    
573

    
574
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
575
  """Run the OS rename script for an instance.
576

577
  Args:
578
    instance: the instance object
579
    old_name: the old name of the instance
580
    os_disk: the instance-visible name of the os device
581
    swap_disk: the instance-visible name of the swap device
582

583
  """
584
  inst_os = OSFromDisk(instance.os)
585

    
586
  script = inst_os.rename_script
587

    
588
  os_device = instance.FindDisk(os_disk)
589
  if os_device is None:
590
    logging.error("Can't find this device-visible name '%s'", os_disk)
591
    return False
592

    
593
  swap_device = instance.FindDisk(swap_disk)
594
  if swap_device is None:
595
    logging.error("Can't find this device-visible name '%s'", swap_disk)
596
    return False
597

    
598
  real_os_dev = _RecursiveFindBD(os_device)
599
  if real_os_dev is None:
600
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
601
                                  str(os_device))
602
  real_os_dev.Open()
603

    
604
  real_swap_dev = _RecursiveFindBD(swap_device)
605
  if real_swap_dev is None:
606
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
607
                                  str(swap_device))
608
  real_swap_dev.Open()
609

    
610
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
611
                                           old_name,
612
                                           instance.name, int(time.time()))
613
  if not os.path.exists(constants.LOG_OS_DIR):
614
    os.mkdir(constants.LOG_OS_DIR, 0750)
615

    
616
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
617
                                inst_os.path, script, old_name, instance.name,
618
                                real_os_dev.dev_path, real_swap_dev.dev_path,
619
                                logfile)
620

    
621
  result = utils.RunCmd(command)
622

    
623
  if result.failed:
624
    logging.error("os create command '%s' returned error: %s output: %s",
625
                  command, result.fail_reason, result.output)
626
    return False
627

    
628
  return True
629

    
630

    
631
def _GetVGInfo(vg_name):
632
  """Get informations about the volume group.
633

634
  Args:
635
    vg_name: the volume group
636

637
  Returns:
638
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
639
    where
640
    vg_size is the total size of the volume group in MiB
641
    vg_free is the free size of the volume group in MiB
642
    pv_count are the number of physical disks in that vg
643

644
  If an error occurs during gathering of data, we return the same dict
645
  with keys all set to None.
646

647
  """
648
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
649

    
650
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
651
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
652

    
653
  if retval.failed:
654
    logging.error("volume group %s not present", vg_name)
655
    return retdic
656
  valarr = retval.stdout.strip().rstrip(':').split(':')
657
  if len(valarr) == 3:
658
    try:
659
      retdic = {
660
        "vg_size": int(round(float(valarr[0]), 0)),
661
        "vg_free": int(round(float(valarr[1]), 0)),
662
        "pv_count": int(valarr[2]),
663
        }
664
    except ValueError, err:
665
      logging.exception("Fail to parse vgs output")
666
  else:
667
    logging.error("vgs output has the wrong number of fields (expected"
668
                  " three): %s", str(valarr))
669
  return retdic
670

    
671

    
672
def _GatherBlockDevs(instance):
673
  """Set up an instance's block device(s).
674

675
  This is run on the primary node at instance startup. The block
676
  devices must be already assembled.
677

678
  """
679
  block_devices = []
680
  for disk in instance.disks:
681
    device = _RecursiveFindBD(disk)
682
    if device is None:
683
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
684
                                    str(disk))
685
    device.Open()
686
    block_devices.append((disk, device))
687
  return block_devices
688

    
689

    
690
def StartInstance(instance, extra_args):
691
  """Start an instance.
692

693
  @type instance: instance object
694
  @param instance: the instance object
695
  @rtype: boolean
696
  @return: whether the startup was successful or not
697

698
  """
699
  running_instances = GetInstanceList([instance.hypervisor])
700

    
701
  if instance.name in running_instances:
702
    return True
703

    
704
  block_devices = _GatherBlockDevs(instance)
705
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
706

    
707
  try:
708
    hyper.StartInstance(instance, block_devices, extra_args)
709
  except errors.HypervisorError, err:
710
    logging.exception("Failed to start instance")
711
    return False
712

    
713
  return True
714

    
715

    
716
def ShutdownInstance(instance):
717
  """Shut an instance down.
718

719
  @type instance: instance object
720
  @param instance: the instance object
721
  @rtype: boolean
722
  @return: whether the startup was successful or not
723

724
  """
725
  hv_name = instance.hypervisor
726
  running_instances = GetInstanceList([hv_name])
727

    
728
  if instance.name not in running_instances:
729
    return True
730

    
731
  hyper = hypervisor.GetHypervisor(hv_name)
732
  try:
733
    hyper.StopInstance(instance)
734
  except errors.HypervisorError, err:
735
    logging.error("Failed to stop instance")
736
    return False
737

    
738
  # test every 10secs for 2min
739
  shutdown_ok = False
740

    
741
  time.sleep(1)
742
  for dummy in range(11):
743
    if instance.name not in GetInstanceList([hv_name]):
744
      break
745
    time.sleep(10)
746
  else:
747
    # the shutdown did not succeed
748
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
749

    
750
    try:
751
      hyper.StopInstance(instance, force=True)
752
    except errors.HypervisorError, err:
753
      logging.exception("Failed to stop instance")
754
      return False
755

    
756
    time.sleep(1)
757
    if instance.name in GetInstanceList([hv_name]):
758
      logging.error("could not shutdown instance '%s' even by destroy",
759
                    instance.name)
760
      return False
761

    
762
  return True
763

    
764

    
765
def RebootInstance(instance, reboot_type, extra_args):
766
  """Reboot an instance.
767

768
  Args:
769
    instance    - name of instance to reboot
770
    reboot_type - how to reboot [soft,hard,full]
771

772
  """
773
  running_instances = GetInstanceList([instance.hypervisor])
774

    
775
  if instance.name not in running_instances:
776
    logging.error("Cannot reboot instance that is not running")
777
    return False
778

    
779
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
780
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
781
    try:
782
      hyper.RebootInstance(instance)
783
    except errors.HypervisorError, err:
784
      logging.exception("Failed to soft reboot instance")
785
      return False
786
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
787
    try:
788
      ShutdownInstance(instance)
789
      StartInstance(instance, extra_args)
790
    except errors.HypervisorError, err:
791
      logging.exception("Failed to hard reboot instance")
792
      return False
793
  else:
794
    raise errors.ParameterError("reboot_type invalid")
795

    
796
  return True
797

    
798

    
799
def MigrateInstance(instance, target, live):
800
  """Migrates an instance to another node.
801

802
  @type instance: C{objects.Instance}
803
  @param instance: the instance definition
804
  @type target: string
805
  @param target: the target node name
806
  @type live: boolean
807
  @param live: whether the migration should be done live or not (the
808
      interpretation of this parameter is left to the hypervisor)
809
  @rtype: tuple
810
  @return: a tuple of (success, msg) where:
811
      - succes is a boolean denoting the success/failure of the operation
812
      - msg is a string with details in case of failure
813

814
  """
815
  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
816

    
817
  try:
818
    hyper.MigrateInstance(instance.name, target, live)
819
  except errors.HypervisorError, err:
820
    msg = "Failed to migrate instance: %s" % str(err)
821
    logging.error(msg)
822
    return (False, msg)
823
  return (True, "Migration successfull")
824

    
825

    
826
def CreateBlockDevice(disk, size, owner, on_primary, info):
827
  """Creates a block device for an instance.
828

829
  Args:
830
   disk: a ganeti.objects.Disk object
831
   size: the size of the physical underlying device
832
   owner: a string with the name of the instance
833
   on_primary: a boolean indicating if it is the primary node or not
834
   info: string that will be sent to the physical device creation
835

836
  Returns:
837
    the new unique_id of the device (this can sometime be
838
    computed only after creation), or None. On secondary nodes,
839
    it's not required to return anything.
840

841
  """
842
  clist = []
843
  if disk.children:
844
    for child in disk.children:
845
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
846
      if on_primary or disk.AssembleOnSecondary():
847
        # we need the children open in case the device itself has to
848
        # be assembled
849
        crdev.Open()
850
      clist.append(crdev)
851
  try:
852
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
853
    if device is not None:
854
      logging.info("removing existing device %s", disk)
855
      device.Remove()
856
  except errors.BlockDeviceError, err:
857
    pass
858

    
859
  device = bdev.Create(disk.dev_type, disk.physical_id,
860
                       clist, size)
861
  if device is None:
862
    raise ValueError("Can't create child device for %s, %s" %
863
                     (disk, size))
864
  if on_primary or disk.AssembleOnSecondary():
865
    if not device.Assemble():
866
      errorstring = "Can't assemble device after creation"
867
      logging.error(errorstring)
868
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
869
                                    " daemon logs" % errorstring)
870
    device.SetSyncSpeed(constants.SYNC_SPEED)
871
    if on_primary or disk.OpenOnSecondary():
872
      device.Open(force=True)
873
    DevCacheManager.UpdateCache(device.dev_path, owner,
874
                                on_primary, disk.iv_name)
875

    
876
  device.SetInfo(info)
877

    
878
  physical_id = device.unique_id
879
  return physical_id
880

    
881

    
882
def RemoveBlockDevice(disk):
883
  """Remove a block device.
884

885
  This is intended to be called recursively.
886

887
  """
888
  try:
889
    # since we are removing the device, allow a partial match
890
    # this allows removal of broken mirrors
891
    rdev = _RecursiveFindBD(disk, allow_partial=True)
892
  except errors.BlockDeviceError, err:
893
    # probably can't attach
894
    logging.info("Can't attach to device %s in remove", disk)
895
    rdev = None
896
  if rdev is not None:
897
    r_path = rdev.dev_path
898
    result = rdev.Remove()
899
    if result:
900
      DevCacheManager.RemoveCache(r_path)
901
  else:
902
    result = True
903
  if disk.children:
904
    for child in disk.children:
905
      result = result and RemoveBlockDevice(child)
906
  return result
907

    
908

    
909
def _RecursiveAssembleBD(disk, owner, as_primary):
910
  """Activate a block device for an instance.
911

912
  This is run on the primary and secondary nodes for an instance.
913

914
  This function is called recursively.
915

916
  Args:
917
    disk: a objects.Disk object
918
    as_primary: if we should make the block device read/write
919

920
  Returns:
921
    the assembled device or None (in case no device was assembled)
922

923
  If the assembly is not successful, an exception is raised.
924

925
  """
926
  children = []
927
  if disk.children:
928
    mcn = disk.ChildrenNeeded()
929
    if mcn == -1:
930
      mcn = 0 # max number of Nones allowed
931
    else:
932
      mcn = len(disk.children) - mcn # max number of Nones
933
    for chld_disk in disk.children:
934
      try:
935
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
936
      except errors.BlockDeviceError, err:
937
        if children.count(None) >= mcn:
938
          raise
939
        cdev = None
940
        logging.debug("Error in child activation: %s", str(err))
941
      children.append(cdev)
942

    
943
  if as_primary or disk.AssembleOnSecondary():
944
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
945
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
946
    result = r_dev
947
    if as_primary or disk.OpenOnSecondary():
948
      r_dev.Open()
949
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
950
                                as_primary, disk.iv_name)
951

    
952
  else:
953
    result = True
954
  return result
955

    
956

    
957
def AssembleBlockDevice(disk, owner, as_primary):
958
  """Activate a block device for an instance.
959

960
  This is a wrapper over _RecursiveAssembleBD.
961

962
  Returns:
963
    a /dev path for primary nodes
964
    True for secondary nodes
965

966
  """
967
  result = _RecursiveAssembleBD(disk, owner, as_primary)
968
  if isinstance(result, bdev.BlockDev):
969
    result = result.dev_path
970
  return result
971

    
972

    
973
def ShutdownBlockDevice(disk):
974
  """Shut down a block device.
975

976
  First, if the device is assembled (can `Attach()`), then the device
977
  is shutdown. Then the children of the device are shutdown.
978

979
  This function is called recursively. Note that we don't cache the
980
  children or such, as oppossed to assemble, shutdown of different
981
  devices doesn't require that the upper device was active.
982

983
  """
984
  r_dev = _RecursiveFindBD(disk)
985
  if r_dev is not None:
986
    r_path = r_dev.dev_path
987
    result = r_dev.Shutdown()
988
    if result:
989
      DevCacheManager.RemoveCache(r_path)
990
  else:
991
    result = True
992
  if disk.children:
993
    for child in disk.children:
994
      result = result and ShutdownBlockDevice(child)
995
  return result
996

    
997

    
998
def MirrorAddChildren(parent_cdev, new_cdevs):
999
  """Extend a mirrored block device.
1000

1001
  """
1002
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
1003
  if parent_bdev is None:
1004
    logging.error("Can't find parent device")
1005
    return False
1006
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1007
  if new_bdevs.count(None) > 0:
1008
    logging.error("Can't find new device(s) to add: %s:%s",
1009
                  new_bdevs, new_cdevs)
1010
    return False
1011
  parent_bdev.AddChildren(new_bdevs)
1012
  return True
1013

    
1014

    
1015
def MirrorRemoveChildren(parent_cdev, new_cdevs):
1016
  """Shrink a mirrored block device.
1017

1018
  """
1019
  parent_bdev = _RecursiveFindBD(parent_cdev)
1020
  if parent_bdev is None:
1021
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1022
    return False
1023
  devs = []
1024
  for disk in new_cdevs:
1025
    rpath = disk.StaticDevPath()
1026
    if rpath is None:
1027
      bd = _RecursiveFindBD(disk)
1028
      if bd is None:
1029
        logging.error("Can't find dynamic device %s while removing children",
1030
                      disk)
1031
        return False
1032
      else:
1033
        devs.append(bd.dev_path)
1034
    else:
1035
      devs.append(rpath)
1036
  parent_bdev.RemoveChildren(devs)
1037
  return True
1038

    
1039

    
1040
def GetMirrorStatus(disks):
1041
  """Get the mirroring status of a list of devices.
1042

1043
  Args:
1044
    disks: list of `objects.Disk`
1045

1046
  Returns:
1047
    list of (mirror_done, estimated_time) tuples, which
1048
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1049

1050
  """
1051
  stats = []
1052
  for dsk in disks:
1053
    rbd = _RecursiveFindBD(dsk)
1054
    if rbd is None:
1055
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1056
    stats.append(rbd.CombinedSyncStatus())
1057
  return stats
1058

    
1059

    
1060
def _RecursiveFindBD(disk, allow_partial=False):
1061
  """Check if a device is activated.
1062

1063
  If so, return informations about the real device.
1064

1065
  Args:
1066
    disk: the objects.Disk instance
1067
    allow_partial: don't abort the find if a child of the
1068
                   device can't be found; this is intended to be
1069
                   used when repairing mirrors
1070

1071
  Returns:
1072
    None if the device can't be found
1073
    otherwise the device instance
1074

1075
  """
1076
  children = []
1077
  if disk.children:
1078
    for chdisk in disk.children:
1079
      children.append(_RecursiveFindBD(chdisk))
1080

    
1081
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1082

    
1083

    
1084
def FindBlockDevice(disk):
1085
  """Check if a device is activated.
1086

1087
  If so, return informations about the real device.
1088

1089
  Args:
1090
    disk: the objects.Disk instance
1091
  Returns:
1092
    None if the device can't be found
1093
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1094

1095
  """
1096
  rbd = _RecursiveFindBD(disk)
1097
  if rbd is None:
1098
    return rbd
1099
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1100

    
1101

    
1102
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1103
  """Write a file to the filesystem.
1104

1105
  This allows the master to overwrite(!) a file. It will only perform
1106
  the operation if the file belongs to a list of configuration files.
1107

1108
  """
1109
  if not os.path.isabs(file_name):
1110
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1111
                  file_name)
1112
    return False
1113

    
1114
  allowed_files = [
1115
    constants.CLUSTER_CONF_FILE,
1116
    constants.ETC_HOSTS,
1117
    constants.SSH_KNOWN_HOSTS_FILE,
1118
    constants.VNC_PASSWORD_FILE,
1119
    ]
1120

    
1121
  if file_name not in allowed_files:
1122
    logging.error("Filename passed to UploadFile not in allowed"
1123
                 " upload targets: '%s'", file_name)
1124
    return False
1125

    
1126
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1127
                  atime=atime, mtime=mtime)
1128
  return True
1129

    
1130

    
1131
def _ErrnoOrStr(err):
1132
  """Format an EnvironmentError exception.
1133

1134
  If the `err` argument has an errno attribute, it will be looked up
1135
  and converted into a textual EXXXX description. Otherwise the string
1136
  representation of the error will be returned.
1137

1138
  """
1139
  if hasattr(err, 'errno'):
1140
    detail = errno.errorcode[err.errno]
1141
  else:
1142
    detail = str(err)
1143
  return detail
1144

    
1145

    
1146
def _OSOndiskVersion(name, os_dir):
1147
  """Compute and return the API version of a given OS.
1148

1149
  This function will try to read the API version of the os given by
1150
  the 'name' parameter and residing in the 'os_dir' directory.
1151

1152
  Return value will be either an integer denoting the version or None in the
1153
  case when this is not a valid OS name.
1154

1155
  """
1156
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1157

    
1158
  try:
1159
    st = os.stat(api_file)
1160
  except EnvironmentError, err:
1161
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1162
                           " found (%s)" % _ErrnoOrStr(err))
1163

    
1164
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1165
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1166
                           " a regular file")
1167

    
1168
  try:
1169
    f = open(api_file)
1170
    try:
1171
      api_versions = f.readlines()
1172
    finally:
1173
      f.close()
1174
  except EnvironmentError, err:
1175
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1176
                           " API version (%s)" % _ErrnoOrStr(err))
1177

    
1178
  api_versions = [version.strip() for version in api_versions]
1179
  try:
1180
    api_versions = [int(version) for version in api_versions]
1181
  except (TypeError, ValueError), err:
1182
    raise errors.InvalidOS(name, os_dir,
1183
                           "API version is not integer (%s)" % str(err))
1184

    
1185
  return api_versions
1186

    
1187

    
1188
def DiagnoseOS(top_dirs=None):
1189
  """Compute the validity for all OSes.
1190

1191
  Returns an OS object for each name in all the given top directories
1192
  (if not given defaults to constants.OS_SEARCH_PATH)
1193

1194
  Returns:
1195
    list of OS objects
1196

1197
  """
1198
  if top_dirs is None:
1199
    top_dirs = constants.OS_SEARCH_PATH
1200

    
1201
  result = []
1202
  for dir_name in top_dirs:
1203
    if os.path.isdir(dir_name):
1204
      try:
1205
        f_names = utils.ListVisibleFiles(dir_name)
1206
      except EnvironmentError, err:
1207
        logging.exception("Can't list the OS directory %s", dir_name)
1208
        break
1209
      for name in f_names:
1210
        try:
1211
          os_inst = OSFromDisk(name, base_dir=dir_name)
1212
          result.append(os_inst)
1213
        except errors.InvalidOS, err:
1214
          result.append(objects.OS.FromInvalidOS(err))
1215

    
1216
  return result
1217

    
1218

    
1219
def OSFromDisk(name, base_dir=None):
1220
  """Create an OS instance from disk.
1221

1222
  This function will return an OS instance if the given name is a
1223
  valid OS name. Otherwise, it will raise an appropriate
1224
  `errors.InvalidOS` exception, detailing why this is not a valid
1225
  OS.
1226

1227
  Args:
1228
    os_dir: Directory containing the OS scripts. Defaults to a search
1229
            in all the OS_SEARCH_PATH directories.
1230

1231
  """
1232

    
1233
  if base_dir is None:
1234
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1235
    if os_dir is None:
1236
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1237
  else:
1238
    os_dir = os.path.sep.join([base_dir, name])
1239

    
1240
  api_versions = _OSOndiskVersion(name, os_dir)
1241

    
1242
  if constants.OS_API_VERSION not in api_versions:
1243
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1244
                           " (found %s want %s)"
1245
                           % (api_versions, constants.OS_API_VERSION))
1246

    
1247
  # OS Scripts dictionary, we will populate it with the actual script names
1248
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1249

    
1250
  for script in os_scripts:
1251
    os_scripts[script] = os.path.sep.join([os_dir, script])
1252

    
1253
    try:
1254
      st = os.stat(os_scripts[script])
1255
    except EnvironmentError, err:
1256
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1257
                             (script, _ErrnoOrStr(err)))
1258

    
1259
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1260
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1261
                             script)
1262

    
1263
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1264
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1265
                             script)
1266

    
1267

    
1268
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1269
                    create_script=os_scripts['create'],
1270
                    export_script=os_scripts['export'],
1271
                    import_script=os_scripts['import'],
1272
                    rename_script=os_scripts['rename'],
1273
                    api_versions=api_versions)
1274

    
1275

    
1276
def GrowBlockDevice(disk, amount):
1277
  """Grow a stack of block devices.
1278

1279
  This function is called recursively, with the childrens being the
1280
  first one resize.
1281

1282
  Args:
1283
    disk: the disk to be grown
1284

1285
  Returns: a tuple of (status, result), with:
1286
    status: the result (true/false) of the operation
1287
    result: the error message if the operation failed, otherwise not used
1288

1289
  """
1290
  r_dev = _RecursiveFindBD(disk)
1291
  if r_dev is None:
1292
    return False, "Cannot find block device %s" % (disk,)
1293

    
1294
  try:
1295
    r_dev.Grow(amount)
1296
  except errors.BlockDeviceError, err:
1297
    return False, str(err)
1298

    
1299
  return True, None
1300

    
1301

    
1302
def SnapshotBlockDevice(disk):
1303
  """Create a snapshot copy of a block device.
1304

1305
  This function is called recursively, and the snapshot is actually created
1306
  just for the leaf lvm backend device.
1307

1308
  Args:
1309
    disk: the disk to be snapshotted
1310

1311
  Returns:
1312
    a config entry for the actual lvm device snapshotted.
1313

1314
  """
1315
  if disk.children:
1316
    if len(disk.children) == 1:
1317
      # only one child, let's recurse on it
1318
      return SnapshotBlockDevice(disk.children[0])
1319
    else:
1320
      # more than one child, choose one that matches
1321
      for child in disk.children:
1322
        if child.size == disk.size:
1323
          # return implies breaking the loop
1324
          return SnapshotBlockDevice(child)
1325
  elif disk.dev_type == constants.LD_LV:
1326
    r_dev = _RecursiveFindBD(disk)
1327
    if r_dev is not None:
1328
      # let's stay on the safe side and ask for the full size, for now
1329
      return r_dev.Snapshot(disk.size)
1330
    else:
1331
      return None
1332
  else:
1333
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1334
                                 " '%s' of type '%s'" %
1335
                                 (disk.unique_id, disk.dev_type))
1336

    
1337

    
1338
def ExportSnapshot(disk, dest_node, instance, cluster_name):
1339
  """Export a block device snapshot to a remote node.
1340

1341
  Args:
1342
    disk: the snapshot block device
1343
    dest_node: the node to send the image to
1344
    instance: instance being exported
1345

1346
  Returns:
1347
    True if successful, False otherwise.
1348

1349
  """
1350
  inst_os = OSFromDisk(instance.os)
1351
  export_script = inst_os.export_script
1352

    
1353
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1354
                                     instance.name, int(time.time()))
1355
  if not os.path.exists(constants.LOG_OS_DIR):
1356
    os.mkdir(constants.LOG_OS_DIR, 0750)
1357

    
1358
  real_os_dev = _RecursiveFindBD(disk)
1359
  if real_os_dev is None:
1360
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1361
                                  str(disk))
1362
  real_os_dev.Open()
1363

    
1364
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1365
  destfile = disk.physical_id[1]
1366

    
1367
  # the target command is built out of three individual commands,
1368
  # which are joined by pipes; we check each individual command for
1369
  # valid parameters
1370

    
1371
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1372
                               export_script, instance.name,
1373
                               real_os_dev.dev_path, logfile)
1374

    
1375
  comprcmd = "gzip"
1376

    
1377
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1378
                                destdir, destdir, destfile)
1379
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1380
                                                   constants.GANETI_RUNAS,
1381
                                                   destcmd)
1382

    
1383
  # all commands have been checked, so we're safe to combine them
1384
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1385

    
1386
  result = utils.RunCmd(command)
1387

    
1388
  if result.failed:
1389
    logging.error("os snapshot export command '%s' returned error: %s"
1390
                  " output: %s", command, result.fail_reason, result.output)
1391
    return False
1392

    
1393
  return True
1394

    
1395

    
1396
def FinalizeExport(instance, snap_disks):
1397
  """Write out the export configuration information.
1398

1399
  Args:
1400
    instance: instance configuration
1401
    snap_disks: snapshot block devices
1402

1403
  Returns:
1404
    False in case of error, True otherwise.
1405

1406
  """
1407
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1408
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1409

    
1410
  config = objects.SerializableConfigParser()
1411

    
1412
  config.add_section(constants.INISECT_EXP)
1413
  config.set(constants.INISECT_EXP, 'version', '0')
1414
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1415
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1416
  config.set(constants.INISECT_EXP, 'os', instance.os)
1417
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1418

    
1419
  config.add_section(constants.INISECT_INS)
1420
  config.set(constants.INISECT_INS, 'name', instance.name)
1421
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1422
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1423
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1424

    
1425
  nic_count = 0
1426
  for nic_count, nic in enumerate(instance.nics):
1427
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1428
               nic_count, '%s' % nic.mac)
1429
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1430
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1431
               '%s' % nic.bridge)
1432
  # TODO: redundant: on load can read nics until it doesn't exist
1433
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1434

    
1435
  disk_count = 0
1436
  for disk_count, disk in enumerate(snap_disks):
1437
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1438
               ('%s' % disk.iv_name))
1439
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1440
               ('%s' % disk.physical_id[1]))
1441
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1442
               ('%d' % disk.size))
1443
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1444

    
1445
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1446
  cfo = open(cff, 'w')
1447
  try:
1448
    config.write(cfo)
1449
  finally:
1450
    cfo.close()
1451

    
1452
  shutil.rmtree(finaldestdir, True)
1453
  shutil.move(destdir, finaldestdir)
1454

    
1455
  return True
1456

    
1457

    
1458
def ExportInfo(dest):
1459
  """Get export configuration information.
1460

1461
  Args:
1462
    dest: directory containing the export
1463

1464
  Returns:
1465
    A serializable config file containing the export info.
1466

1467
  """
1468
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1469

    
1470
  config = objects.SerializableConfigParser()
1471
  config.read(cff)
1472

    
1473
  if (not config.has_section(constants.INISECT_EXP) or
1474
      not config.has_section(constants.INISECT_INS)):
1475
    return None
1476

    
1477
  return config
1478

    
1479

    
1480
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1481
                         cluster_name):
1482
  """Import an os image into an instance.
1483

1484
  Args:
1485
    instance: the instance object
1486
    os_disk: the instance-visible name of the os device
1487
    swap_disk: the instance-visible name of the swap device
1488
    src_node: node holding the source image
1489
    src_image: path to the source image on src_node
1490

1491
  Returns:
1492
    False in case of error, True otherwise.
1493

1494
  """
1495
  inst_os = OSFromDisk(instance.os)
1496
  import_script = inst_os.import_script
1497

    
1498
  os_device = instance.FindDisk(os_disk)
1499
  if os_device is None:
1500
    logging.error("Can't find this device-visible name '%s'", os_disk)
1501
    return False
1502

    
1503
  swap_device = instance.FindDisk(swap_disk)
1504
  if swap_device is None:
1505
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1506
    return False
1507

    
1508
  real_os_dev = _RecursiveFindBD(os_device)
1509
  if real_os_dev is None:
1510
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1511
                                  str(os_device))
1512
  real_os_dev.Open()
1513

    
1514
  real_swap_dev = _RecursiveFindBD(swap_device)
1515
  if real_swap_dev is None:
1516
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1517
                                  str(swap_device))
1518
  real_swap_dev.Open()
1519

    
1520
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1521
                                        instance.name, int(time.time()))
1522
  if not os.path.exists(constants.LOG_OS_DIR):
1523
    os.mkdir(constants.LOG_OS_DIR, 0750)
1524

    
1525
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1526
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1527
                                                   constants.GANETI_RUNAS,
1528
                                                   destcmd)
1529

    
1530
  comprcmd = "gunzip"
1531
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1532
                               inst_os.path, import_script, instance.name,
1533
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1534
                               logfile)
1535

    
1536
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1537
  env = {'HYPERVISOR': instance.hypervisor}
1538

    
1539
  result = utils.RunCmd(command, env=env)
1540

    
1541
  if result.failed:
1542
    logging.error("os import command '%s' returned error: %s"
1543
                  " output: %s", command, result.fail_reason, result.output)
1544
    return False
1545

    
1546
  return True
1547

    
1548

    
1549
def ListExports():
1550
  """Return a list of exports currently available on this machine.
1551

1552
  """
1553
  if os.path.isdir(constants.EXPORT_DIR):
1554
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1555
  else:
1556
    return []
1557

    
1558

    
1559
def RemoveExport(export):
1560
  """Remove an existing export from the node.
1561

1562
  Args:
1563
    export: the name of the export to remove
1564

1565
  Returns:
1566
    False in case of error, True otherwise.
1567

1568
  """
1569
  target = os.path.join(constants.EXPORT_DIR, export)
1570

    
1571
  shutil.rmtree(target)
1572
  # TODO: catch some of the relevant exceptions and provide a pretty
1573
  # error message if rmtree fails.
1574

    
1575
  return True
1576

    
1577

    
1578
def RenameBlockDevices(devlist):
1579
  """Rename a list of block devices.
1580

1581
  The devlist argument is a list of tuples (disk, new_logical,
1582
  new_physical). The return value will be a combined boolean result
1583
  (True only if all renames succeeded).
1584

1585
  """
1586
  result = True
1587
  for disk, unique_id in devlist:
1588
    dev = _RecursiveFindBD(disk)
1589
    if dev is None:
1590
      result = False
1591
      continue
1592
    try:
1593
      old_rpath = dev.dev_path
1594
      dev.Rename(unique_id)
1595
      new_rpath = dev.dev_path
1596
      if old_rpath != new_rpath:
1597
        DevCacheManager.RemoveCache(old_rpath)
1598
        # FIXME: we should add the new cache information here, like:
1599
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1600
        # but we don't have the owner here - maybe parse from existing
1601
        # cache? for now, we only lose lvm data when we rename, which
1602
        # is less critical than DRBD or MD
1603
    except errors.BlockDeviceError, err:
1604
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1605
      result = False
1606
  return result
1607

    
1608

    
1609
def _TransformFileStorageDir(file_storage_dir):
1610
  """Checks whether given file_storage_dir is valid.
1611

1612
  Checks wheter the given file_storage_dir is within the cluster-wide
1613
  default file_storage_dir stored in SimpleStore. Only paths under that
1614
  directory are allowed.
1615

1616
  Args:
1617
    file_storage_dir: string with path
1618

1619
  Returns:
1620
    normalized file_storage_dir (string) if valid, None otherwise
1621

1622
  """
1623
  cfg = _GetConfig()
1624
  file_storage_dir = os.path.normpath(file_storage_dir)
1625
  base_file_storage_dir = cfg.GetFileStorageDir()
1626
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1627
      base_file_storage_dir):
1628
    logging.error("file storage directory '%s' is not under base file"
1629
                  " storage directory '%s'",
1630
                  file_storage_dir, base_file_storage_dir)
1631
    return None
1632
  return file_storage_dir
1633

    
1634

    
1635
def CreateFileStorageDir(file_storage_dir):
1636
  """Create file storage directory.
1637

1638
  Args:
1639
    file_storage_dir: string containing the path
1640

1641
  Returns:
1642
    tuple with first element a boolean indicating wheter dir
1643
    creation was successful or not
1644

1645
  """
1646
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1647
  result = True,
1648
  if not file_storage_dir:
1649
    result = False,
1650
  else:
1651
    if os.path.exists(file_storage_dir):
1652
      if not os.path.isdir(file_storage_dir):
1653
        logging.error("'%s' is not a directory", file_storage_dir)
1654
        result = False,
1655
    else:
1656
      try:
1657
        os.makedirs(file_storage_dir, 0750)
1658
      except OSError, err:
1659
        logging.error("Cannot create file storage directory '%s': %s",
1660
                      file_storage_dir, err)
1661
        result = False,
1662
  return result
1663

    
1664

    
1665
def RemoveFileStorageDir(file_storage_dir):
1666
  """Remove file storage directory.
1667

1668
  Remove it only if it's empty. If not log an error and return.
1669

1670
  Args:
1671
    file_storage_dir: string containing the path
1672

1673
  Returns:
1674
    tuple with first element a boolean indicating wheter dir
1675
    removal was successful or not
1676

1677
  """
1678
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1679
  result = True,
1680
  if not file_storage_dir:
1681
    result = False,
1682
  else:
1683
    if os.path.exists(file_storage_dir):
1684
      if not os.path.isdir(file_storage_dir):
1685
        logging.error("'%s' is not a directory", file_storage_dir)
1686
        result = False,
1687
      # deletes dir only if empty, otherwise we want to return False
1688
      try:
1689
        os.rmdir(file_storage_dir)
1690
      except OSError, err:
1691
        logging.exception("Cannot remove file storage directory '%s'",
1692
                          file_storage_dir)
1693
        result = False,
1694
  return result
1695

    
1696

    
1697
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1698
  """Rename the file storage directory.
1699

1700
  Args:
1701
    old_file_storage_dir: string containing the old path
1702
    new_file_storage_dir: string containing the new path
1703

1704
  Returns:
1705
    tuple with first element a boolean indicating wheter dir
1706
    rename was successful or not
1707

1708
  """
1709
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1710
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1711
  result = True,
1712
  if not old_file_storage_dir or not new_file_storage_dir:
1713
    result = False,
1714
  else:
1715
    if not os.path.exists(new_file_storage_dir):
1716
      if os.path.isdir(old_file_storage_dir):
1717
        try:
1718
          os.rename(old_file_storage_dir, new_file_storage_dir)
1719
        except OSError, err:
1720
          logging.exception("Cannot rename '%s' to '%s'",
1721
                            old_file_storage_dir, new_file_storage_dir)
1722
          result =  False,
1723
      else:
1724
        logging.error("'%s' is not a directory", old_file_storage_dir)
1725
        result = False,
1726
    else:
1727
      if os.path.exists(old_file_storage_dir):
1728
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1729
                      old_file_storage_dir, new_file_storage_dir)
1730
        result = False,
1731
  return result
1732

    
1733

    
1734
def _IsJobQueueFile(file_name):
1735
  """Checks whether the given filename is in the queue directory.
1736

1737
  """
1738
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1739
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1740

    
1741
  if not result:
1742
    logging.error("'%s' is not a file in the queue directory",
1743
                  file_name)
1744

    
1745
  return result
1746

    
1747

    
1748
def JobQueueUpdate(file_name, content):
1749
  """Updates a file in the queue directory.
1750

1751
  """
1752
  if not _IsJobQueueFile(file_name):
1753
    return False
1754

    
1755
  # Write and replace the file atomically
1756
  utils.WriteFile(file_name, data=content)
1757

    
1758
  return True
1759

    
1760

    
1761
def JobQueueRename(old, new):
1762
  """Renames a job queue file.
1763

1764
  """
1765
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1766
    return False
1767

    
1768
  os.rename(old, new)
1769

    
1770
  return True
1771

    
1772

    
1773
def CloseBlockDevices(disks):
1774
  """Closes the given block devices.
1775

1776
  This means they will be switched to secondary mode (in case of DRBD).
1777

1778
  """
1779
  bdevs = []
1780
  for cf in disks:
1781
    rd = _RecursiveFindBD(cf)
1782
    if rd is None:
1783
      return (False, "Can't find device %s" % cf)
1784
    bdevs.append(rd)
1785

    
1786
  msg = []
1787
  for rd in bdevs:
1788
    try:
1789
      rd.Close()
1790
    except errors.BlockDeviceError, err:
1791
      msg.append(str(err))
1792
  if msg:
1793
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1794
  else:
1795
    return (True, "All devices secondary")
1796

    
1797

    
1798
class HooksRunner(object):
1799
  """Hook runner.
1800

1801
  This class is instantiated on the node side (ganeti-noded) and not on
1802
  the master side.
1803

1804
  """
1805
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1806

    
1807
  def __init__(self, hooks_base_dir=None):
1808
    """Constructor for hooks runner.
1809

1810
    Args:
1811
      - hooks_base_dir: if not None, this overrides the
1812
        constants.HOOKS_BASE_DIR (useful for unittests)
1813

1814
    """
1815
    if hooks_base_dir is None:
1816
      hooks_base_dir = constants.HOOKS_BASE_DIR
1817
    self._BASE_DIR = hooks_base_dir
1818

    
1819
  @staticmethod
1820
  def ExecHook(script, env):
1821
    """Exec one hook script.
1822

1823
    Args:
1824
     - script: the full path to the script
1825
     - env: the environment with which to exec the script
1826

1827
    """
1828
    # exec the process using subprocess and log the output
1829
    fdstdin = None
1830
    try:
1831
      fdstdin = open("/dev/null", "r")
1832
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1833
                               stderr=subprocess.STDOUT, close_fds=True,
1834
                               shell=False, cwd="/", env=env)
1835
      output = ""
1836
      try:
1837
        output = child.stdout.read(4096)
1838
        child.stdout.close()
1839
      except EnvironmentError, err:
1840
        output += "Hook script error: %s" % str(err)
1841

    
1842
      while True:
1843
        try:
1844
          result = child.wait()
1845
          break
1846
        except EnvironmentError, err:
1847
          if err.errno == errno.EINTR:
1848
            continue
1849
          raise
1850
    finally:
1851
      # try not to leak fds
1852
      for fd in (fdstdin, ):
1853
        if fd is not None:
1854
          try:
1855
            fd.close()
1856
          except EnvironmentError, err:
1857
            # just log the error
1858
            #logging.exception("Error while closing fd %s", fd)
1859
            pass
1860

    
1861
    return result == 0, output
1862

    
1863
  def RunHooks(self, hpath, phase, env):
1864
    """Run the scripts in the hooks directory.
1865

1866
    This method will not be usually overriden by child opcodes.
1867

1868
    """
1869
    if phase == constants.HOOKS_PHASE_PRE:
1870
      suffix = "pre"
1871
    elif phase == constants.HOOKS_PHASE_POST:
1872
      suffix = "post"
1873
    else:
1874
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1875
    rr = []
1876

    
1877
    subdir = "%s-%s.d" % (hpath, suffix)
1878
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1879
    try:
1880
      dir_contents = utils.ListVisibleFiles(dir_name)
1881
    except OSError, err:
1882
      # must log
1883
      return rr
1884

    
1885
    # we use the standard python sort order,
1886
    # so 00name is the recommended naming scheme
1887
    dir_contents.sort()
1888
    for relname in dir_contents:
1889
      fname = os.path.join(dir_name, relname)
1890
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1891
          self.RE_MASK.match(relname) is not None):
1892
        rrval = constants.HKR_SKIP
1893
        output = ""
1894
      else:
1895
        result, output = self.ExecHook(fname, env)
1896
        if not result:
1897
          rrval = constants.HKR_FAIL
1898
        else:
1899
          rrval = constants.HKR_SUCCESS
1900
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1901

    
1902
    return rr
1903

    
1904

    
1905
class IAllocatorRunner(object):
1906
  """IAllocator runner.
1907

1908
  This class is instantiated on the node side (ganeti-noded) and not on
1909
  the master side.
1910

1911
  """
1912
  def Run(self, name, idata):
1913
    """Run an iallocator script.
1914

1915
    Return value: tuple of:
1916
       - run status (one of the IARUN_ constants)
1917
       - stdout
1918
       - stderr
1919
       - fail reason (as from utils.RunResult)
1920

1921
    """
1922
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1923
                                  os.path.isfile)
1924
    if alloc_script is None:
1925
      return (constants.IARUN_NOTFOUND, None, None, None)
1926

    
1927
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1928
    try:
1929
      os.write(fd, idata)
1930
      os.close(fd)
1931
      result = utils.RunCmd([alloc_script, fin_name])
1932
      if result.failed:
1933
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1934
                result.fail_reason)
1935
    finally:
1936
      os.unlink(fin_name)
1937

    
1938
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1939

    
1940

    
1941
class DevCacheManager(object):
1942
  """Simple class for managing a cache of block device information.
1943

1944
  """
1945
  _DEV_PREFIX = "/dev/"
1946
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1947

    
1948
  @classmethod
1949
  def _ConvertPath(cls, dev_path):
1950
    """Converts a /dev/name path to the cache file name.
1951

1952
    This replaces slashes with underscores and strips the /dev
1953
    prefix. It then returns the full path to the cache file
1954

1955
    """
1956
    if dev_path.startswith(cls._DEV_PREFIX):
1957
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1958
    dev_path = dev_path.replace("/", "_")
1959
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1960
    return fpath
1961

    
1962
  @classmethod
1963
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1964
    """Updates the cache information for a given device.
1965

1966
    """
1967
    if dev_path is None:
1968
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1969
      return
1970
    fpath = cls._ConvertPath(dev_path)
1971
    if on_primary:
1972
      state = "primary"
1973
    else:
1974
      state = "secondary"
1975
    if iv_name is None:
1976
      iv_name = "not_visible"
1977
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1978
    try:
1979
      utils.WriteFile(fpath, data=fdata)
1980
    except EnvironmentError, err:
1981
      logging.exception("Can't update bdev cache for %s", dev_path)
1982

    
1983
  @classmethod
1984
  def RemoveCache(cls, dev_path):
1985
    """Remove data for a dev_path.
1986

1987
    """
1988
    if dev_path is None:
1989
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
1990
      return
1991
    fpath = cls._ConvertPath(dev_path)
1992
    try:
1993
      utils.RemoveFile(fpath)
1994
    except EnvironmentError, err:
1995
      logging.exception("Can't update bdev cache for %s", dev_path)