Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ e69d05fd

History | View | Annotate | Download (57.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetConfig():
48
  return ssconf.SimpleConfigReader()
49

    
50

    
51
def _GetSshRunner(cluster_name):
52
  return ssh.SshRunner(cluster_name)
53

    
54

    
55
def _CleanDirectory(path, exclude=[]):
56
  """Removes all regular files in a directory.
57

58
  @param exclude: List of files to be excluded.
59
  @type exclude: list
60

61
  """
62
  if not os.path.isdir(path):
63
    return
64

    
65
  # Normalize excluded paths
66
  exclude = [os.path.normpath(i) for i in exclude]
67

    
68
  for rel_name in utils.ListVisibleFiles(path):
69
    full_name = os.path.normpath(os.path.join(path, rel_name))
70
    if full_name in exclude:
71
      continue
72
    if os.path.isfile(full_name) and not os.path.islink(full_name):
73
      utils.RemoveFile(full_name)
74

    
75

    
76
def JobQueuePurge():
77
  """Removes job queue files and archived jobs
78

79
  """
80
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82

    
83

    
84
def GetMasterInfo():
85
  """Returns master information.
86

87
  This is an utility function to compute master information, either
88
  for consumption here or from the node daemon.
89

90
  @rtype: tuple
91
  @return: (master_netdev, master_ip, master_name)
92

93
  """
94
  try:
95
    cfg = _GetConfig()
96
    master_netdev = cfg.GetMasterNetdev()
97
    master_ip = cfg.GetMasterIP()
98
    master_node = cfg.GetMasterNode()
99
  except errors.ConfigurationError, err:
100
    logging.exception("Cluster configuration incomplete")
101
    return (None, None)
102
  return (master_netdev, master_ip, master_node)
103

    
104

    
105
def StartMaster(start_daemons):
106
  """Activate local node as master node.
107

108
  The function will always try activate the IP address of the master
109
  (if someone else has it, then it won't). Then, if the start_daemons
110
  parameter is True, it will also start the master daemons
111
  (ganet-masterd and ganeti-rapi).
112

113
  """
114
  ok = True
115
  master_netdev, master_ip, _ = GetMasterInfo()
116
  if not master_netdev:
117
    return False
118

    
119
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120
    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
121
                     source=constants.LOCALHOST_IP_ADDRESS):
122
      # we already have the ip:
123
      logging.debug("Already started")
124
    else:
125
      logging.error("Someone else has the master ip, not activating")
126
      ok = False
127
  else:
128
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
129
                           "dev", master_netdev, "label",
130
                           "%s:0" % master_netdev])
131
    if result.failed:
132
      logging.error("Can't activate master IP: %s", result.output)
133
      ok = False
134

    
135
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
136
                           "-s", master_ip, master_ip])
137
    # we'll ignore the exit code of arping
138

    
139
  # and now start the master and rapi daemons
140
  if start_daemons:
141
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
142
      result = utils.RunCmd([daemon])
143
      if result.failed:
144
        logging.error("Can't start daemon %s: %s", daemon, result.output)
145
        ok = False
146
  return ok
147

    
148

    
149
def StopMaster(stop_daemons):
150
  """Deactivate this node as master.
151

152
  The function will always try to deactivate the IP address of the
153
  master. Then, if the stop_daemons parameter is True, it will also
154
  stop the master daemons (ganet-masterd and ganeti-rapi).
155

156
  """
157
  master_netdev, master_ip, _ = GetMasterInfo()
158
  if not master_netdev:
159
    return False
160

    
161
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
162
                         "dev", master_netdev])
163
  if result.failed:
164
    logging.error("Can't remove the master IP, error: %s", result.output)
165
    # but otherwise ignore the failure
166

    
167
  if stop_daemons:
168
    # stop/kill the rapi and the master daemon
169
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
170
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
171

    
172
  return True
173

    
174

    
175
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
176
  """Joins this node to the cluster.
177

178
  This does the following:
179
      - updates the hostkeys of the machine (rsa and dsa)
180
      - adds the ssh private key to the user
181
      - adds the ssh public key to the users' authorized_keys file
182

183
  """
184
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
185
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
186
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
187
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
188
  for name, content, mode in sshd_keys:
189
    utils.WriteFile(name, data=content, mode=mode)
190

    
191
  try:
192
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
193
                                                    mkdir=True)
194
  except errors.OpExecError, err:
195
    logging.exception("Error while processing user ssh files")
196
    return False
197

    
198
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
199
    utils.WriteFile(name, data=content, mode=0600)
200

    
201
  utils.AddAuthorizedKey(auth_keys, sshpub)
202

    
203
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
204

    
205
  return True
206

    
207

    
208
def LeaveCluster():
209
  """Cleans up the current node and prepares it to be removed from the cluster.
210

211
  """
212
  _CleanDirectory(constants.DATA_DIR)
213
  JobQueuePurge()
214

    
215
  try:
216
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
217
  except errors.OpExecError:
218
    logging.exception("Error while processing ssh files")
219
    return
220

    
221
  f = open(pub_key, 'r')
222
  try:
223
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
224
  finally:
225
    f.close()
226

    
227
  utils.RemoveFile(priv_key)
228
  utils.RemoveFile(pub_key)
229

    
230
  # Return a reassuring string to the caller, and quit
231
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
232

    
233

    
234
def GetNodeInfo(vgname, hypervisor_type):
235
  """Gives back a hash with different informations about the node.
236

237
  @type vgname: C{string}
238
  @param vgname: the name of the volume group to ask for disk space information
239
  @type hypervisor_type: C{str}
240
  @param hypervisor_type: the name of the hypervisor to ask for
241
      memory information
242
  @rtype: C{dict}
243
  @return: dictionary with the following keys:
244
      - vg_size is the size of the configured volume group in MiB
245
      - vg_free is the free size of the volume group in MiB
246
      - memory_dom0 is the memory allocated for domain0 in MiB
247
      - memory_free is the currently available (free) ram in MiB
248
      - memory_total is the total number of ram in MiB
249

250
  """
251
  outputarray = {}
252
  vginfo = _GetVGInfo(vgname)
253
  outputarray['vg_size'] = vginfo['vg_size']
254
  outputarray['vg_free'] = vginfo['vg_free']
255

    
256
  hyper = hypervisor.GetHypervisor(hypervisor_type)
257
  hyp_info = hyper.GetNodeInfo()
258
  if hyp_info is not None:
259
    outputarray.update(hyp_info)
260

    
261
  f = open("/proc/sys/kernel/random/boot_id", 'r')
262
  try:
263
    outputarray["bootid"] = f.read(128).rstrip("\n")
264
  finally:
265
    f.close()
266

    
267
  return outputarray
268

    
269

    
270
def VerifyNode(what, cluster_name):
271
  """Verify the status of the local node.
272

273
  Based on the input L{what} parameter, various checks are done on the
274
  local node.
275

276
  If the I{filelist} key is present, this list of
277
  files is checksummed and the file/checksum pairs are returned.
278

279
  If the I{nodelist} key is present, we check that we have
280
  connectivity via ssh with the target nodes (and check the hostname
281
  report).
282

283
  If the I{node-net-test} key is present, we check that we have
284
  connectivity to the given nodes via both primary IP and, if
285
  applicable, secondary IPs.
286

287
  @type what: C{dict}
288
  @param what: a dictionary of things to check:
289
      - filelist: list of files for which to compute checksums
290
      - nodelist: list of nodes we should check ssh communication with
291
      - node-net-test: list of nodes we should check node daemon port
292
        connectivity with
293
      - hypervisor: list with hypervisors to run the verify for
294

295

296
  """
297
  result = {}
298

    
299
  if 'hypervisor' in what:
300
    result['hypervisor'] = my_dict = {}
301
    for hv_name in what['hypervisor']:
302
      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
303

    
304
  if 'filelist' in what:
305
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
306

    
307
  if 'nodelist' in what:
308
    result['nodelist'] = {}
309
    random.shuffle(what['nodelist'])
310
    for node in what['nodelist']:
311
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
312
      if not success:
313
        result['nodelist'][node] = message
314
  if 'node-net-test' in what:
315
    result['node-net-test'] = {}
316
    my_name = utils.HostInfo().name
317
    my_pip = my_sip = None
318
    for name, pip, sip in what['node-net-test']:
319
      if name == my_name:
320
        my_pip = pip
321
        my_sip = sip
322
        break
323
    if not my_pip:
324
      result['node-net-test'][my_name] = ("Can't find my own"
325
                                          " primary/secondary IP"
326
                                          " in the node list")
327
    else:
328
      port = utils.GetNodeDaemonPort()
329
      for name, pip, sip in what['node-net-test']:
330
        fail = []
331
        if not utils.TcpPing(pip, port, source=my_pip):
332
          fail.append("primary")
333
        if sip != pip:
334
          if not utils.TcpPing(sip, port, source=my_sip):
335
            fail.append("secondary")
336
        if fail:
337
          result['node-net-test'][name] = ("failure using the %s"
338
                                           " interface(s)" %
339
                                           " and ".join(fail))
340

    
341
  return result
342

    
343

    
344
def GetVolumeList(vg_name):
345
  """Compute list of logical volumes and their size.
346

347
  Returns:
348
    dictionary of all partions (key) with their size (in MiB), inactive
349
    and online status:
350
    {'test1': ('20.06', True, True)}
351

352
  """
353
  lvs = {}
354
  sep = '|'
355
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
356
                         "--separator=%s" % sep,
357
                         "-olv_name,lv_size,lv_attr", vg_name])
358
  if result.failed:
359
    logging.error("Failed to list logical volumes, lvs output: %s",
360
                  result.output)
361
    return result.output
362

    
363
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
364
  for line in result.stdout.splitlines():
365
    line = line.strip()
366
    match = valid_line_re.match(line)
367
    if not match:
368
      logging.error("Invalid line returned from lvs output: '%s'", line)
369
      continue
370
    name, size, attr = match.groups()
371
    inactive = attr[4] == '-'
372
    online = attr[5] == 'o'
373
    lvs[name] = (size, inactive, online)
374

    
375
  return lvs
376

    
377

    
378
def ListVolumeGroups():
379
  """List the volume groups and their size.
380

381
  Returns:
382
    Dictionary with keys volume name and values the size of the volume
383

384
  """
385
  return utils.ListVolumeGroups()
386

    
387

    
388
def NodeVolumes():
389
  """List all volumes on this node.
390

391
  """
392
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
393
                         "--separator=|",
394
                         "--options=lv_name,lv_size,devices,vg_name"])
395
  if result.failed:
396
    logging.error("Failed to list logical volumes, lvs output: %s",
397
                  result.output)
398
    return {}
399

    
400
  def parse_dev(dev):
401
    if '(' in dev:
402
      return dev.split('(')[0]
403
    else:
404
      return dev
405

    
406
  def map_line(line):
407
    return {
408
      'name': line[0].strip(),
409
      'size': line[1].strip(),
410
      'dev': parse_dev(line[2].strip()),
411
      'vg': line[3].strip(),
412
    }
413

    
414
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
415
          if line.count('|') >= 3]
416

    
417

    
418
def BridgesExist(bridges_list):
419
  """Check if a list of bridges exist on the current node.
420

421
  Returns:
422
    True if all of them exist, false otherwise
423

424
  """
425
  for bridge in bridges_list:
426
    if not utils.BridgeExists(bridge):
427
      return False
428

    
429
  return True
430

    
431

    
432
def GetInstanceList(hypervisor_list):
433
  """Provides a list of instances.
434

435
  @type hypervisor_list: list
436
  @param hypervisor_list: the list of hypervisors to query information
437

438
  @rtype: list
439
  @return: a list of all running instances on the current node
440
             - instance1.example.com
441
             - instance2.example.com
442

443
  """
444
  results = []
445
  for hname in hypervisor_list:
446
    try:
447
      names = hypervisor.GetHypervisor(hname).ListInstances()
448
      results.extend(names)
449
    except errors.HypervisorError, err:
450
      logging.exception("Error enumerating instances for hypevisor %s", hname)
451
      # FIXME: should we somehow not propagate this to the master?
452
      raise
453

    
454
  return results
455

    
456

    
457
def GetInstanceInfo(instance, hname):
458
  """Gives back the informations about an instance as a dictionary.
459

460
  @type instance: string
461
  @param instance: the instance name
462
  @type hname: string
463
  @param hname: the hypervisor type of the instance
464

465
  @rtype: dict
466
  @return: dictionary with the following keys:
467
      - memory: memory size of instance (int)
468
      - state: xen state of instance (string)
469
      - time: cpu time of instance (float)
470

471
  """
472
  output = {}
473

    
474
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
475
  if iinfo is not None:
476
    output['memory'] = iinfo[2]
477
    output['state'] = iinfo[4]
478
    output['time'] = iinfo[5]
479

    
480
  return output
481

    
482

    
483
def GetAllInstancesInfo(hypervisor_list):
484
  """Gather data about all instances.
485

486
  This is the equivalent of `GetInstanceInfo()`, except that it
487
  computes data for all instances at once, thus being faster if one
488
  needs data about more than one instance.
489

490
  @type hypervisor_list: list
491
  @param hypervisor_list: list of hypervisors to query for instance data
492

493
  @rtype: dict of dicts
494
  @return: dictionary of instance: data, with data having the following keys:
495
      - memory: memory size of instance (int)
496
      - state: xen state of instance (string)
497
      - time: cpu time of instance (float)
498
      - vcpuus: the number of vcpus
499

500
  """
501
  output = {}
502

    
503
  for hname in hypervisor_list:
504
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
505
    if iinfo:
506
      for name, inst_id, memory, vcpus, state, times in iinfo:
507
        if name in output:
508
          raise errors.HypervisorError("Instance %s running duplicate" % name)
509
        output[name] = {
510
          'memory': memory,
511
          'vcpus': vcpus,
512
          'state': state,
513
          'time': times,
514
          }
515

    
516
  return output
517

    
518

    
519
def AddOSToInstance(instance, os_disk, swap_disk):
520
  """Add an OS to an instance.
521

522
  Args:
523
    instance: the instance object
524
    os_disk: the instance-visible name of the os device
525
    swap_disk: the instance-visible name of the swap device
526

527
  """
528
  inst_os = OSFromDisk(instance.os)
529

    
530
  create_script = inst_os.create_script
531

    
532
  os_device = instance.FindDisk(os_disk)
533
  if os_device is None:
534
    logging.error("Can't find this device-visible name '%s'", os_disk)
535
    return False
536

    
537
  swap_device = instance.FindDisk(swap_disk)
538
  if swap_device is None:
539
    logging.error("Can't find this device-visible name '%s'", swap_disk)
540
    return False
541

    
542
  real_os_dev = _RecursiveFindBD(os_device)
543
  if real_os_dev is None:
544
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
545
                                  str(os_device))
546
  real_os_dev.Open()
547

    
548
  real_swap_dev = _RecursiveFindBD(swap_device)
549
  if real_swap_dev is None:
550
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
551
                                  str(swap_device))
552
  real_swap_dev.Open()
553

    
554
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
555
                                     instance.name, int(time.time()))
556
  if not os.path.exists(constants.LOG_OS_DIR):
557
    os.mkdir(constants.LOG_OS_DIR, 0750)
558

    
559
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
560
                                inst_os.path, create_script, instance.name,
561
                                real_os_dev.dev_path, real_swap_dev.dev_path,
562
                                logfile)
563
  env = {'HYPERVISOR': instance.hypervisor}
564

    
565
  result = utils.RunCmd(command, env=env)
566
  if result.failed:
567
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
568
                  " output: %s", command, result.fail_reason, logfile,
569
                  result.output)
570
    return False
571

    
572
  return True
573

    
574

    
575
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
576
  """Run the OS rename script for an instance.
577

578
  Args:
579
    instance: the instance object
580
    old_name: the old name of the instance
581
    os_disk: the instance-visible name of the os device
582
    swap_disk: the instance-visible name of the swap device
583

584
  """
585
  inst_os = OSFromDisk(instance.os)
586

    
587
  script = inst_os.rename_script
588

    
589
  os_device = instance.FindDisk(os_disk)
590
  if os_device is None:
591
    logging.error("Can't find this device-visible name '%s'", os_disk)
592
    return False
593

    
594
  swap_device = instance.FindDisk(swap_disk)
595
  if swap_device is None:
596
    logging.error("Can't find this device-visible name '%s'", swap_disk)
597
    return False
598

    
599
  real_os_dev = _RecursiveFindBD(os_device)
600
  if real_os_dev is None:
601
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
602
                                  str(os_device))
603
  real_os_dev.Open()
604

    
605
  real_swap_dev = _RecursiveFindBD(swap_device)
606
  if real_swap_dev is None:
607
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
608
                                  str(swap_device))
609
  real_swap_dev.Open()
610

    
611
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
612
                                           old_name,
613
                                           instance.name, int(time.time()))
614
  if not os.path.exists(constants.LOG_OS_DIR):
615
    os.mkdir(constants.LOG_OS_DIR, 0750)
616

    
617
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
618
                                inst_os.path, script, old_name, instance.name,
619
                                real_os_dev.dev_path, real_swap_dev.dev_path,
620
                                logfile)
621

    
622
  result = utils.RunCmd(command)
623

    
624
  if result.failed:
625
    logging.error("os create command '%s' returned error: %s output: %s",
626
                  command, result.fail_reason, result.output)
627
    return False
628

    
629
  return True
630

    
631

    
632
def _GetVGInfo(vg_name):
633
  """Get informations about the volume group.
634

635
  Args:
636
    vg_name: the volume group
637

638
  Returns:
639
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
640
    where
641
    vg_size is the total size of the volume group in MiB
642
    vg_free is the free size of the volume group in MiB
643
    pv_count are the number of physical disks in that vg
644

645
  If an error occurs during gathering of data, we return the same dict
646
  with keys all set to None.
647

648
  """
649
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
650

    
651
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
652
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
653

    
654
  if retval.failed:
655
    logging.error("volume group %s not present", vg_name)
656
    return retdic
657
  valarr = retval.stdout.strip().rstrip(':').split(':')
658
  if len(valarr) == 3:
659
    try:
660
      retdic = {
661
        "vg_size": int(round(float(valarr[0]), 0)),
662
        "vg_free": int(round(float(valarr[1]), 0)),
663
        "pv_count": int(valarr[2]),
664
        }
665
    except ValueError, err:
666
      logging.exception("Fail to parse vgs output")
667
  else:
668
    logging.error("vgs output has the wrong number of fields (expected"
669
                  " three): %s", str(valarr))
670
  return retdic
671

    
672

    
673
def _GatherBlockDevs(instance):
674
  """Set up an instance's block device(s).
675

676
  This is run on the primary node at instance startup. The block
677
  devices must be already assembled.
678

679
  """
680
  block_devices = []
681
  for disk in instance.disks:
682
    device = _RecursiveFindBD(disk)
683
    if device is None:
684
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
685
                                    str(disk))
686
    device.Open()
687
    block_devices.append((disk, device))
688
  return block_devices
689

    
690

    
691
def StartInstance(instance, extra_args):
692
  """Start an instance.
693

694
  @type instance: instance object
695
  @param instance: the instance object
696
  @rtype: boolean
697
  @return: whether the startup was successful or not
698

699
  """
700
  running_instances = GetInstanceList([instance.hypervisor])
701

    
702
  if instance.name in running_instances:
703
    return True
704

    
705
  block_devices = _GatherBlockDevs(instance)
706
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
707

    
708
  try:
709
    hyper.StartInstance(instance, block_devices, extra_args)
710
  except errors.HypervisorError, err:
711
    logging.exception("Failed to start instance")
712
    return False
713

    
714
  return True
715

    
716

    
717
def ShutdownInstance(instance):
718
  """Shut an instance down.
719

720
  @type instance: instance object
721
  @param instance: the instance object
722
  @rtype: boolean
723
  @return: whether the startup was successful or not
724

725
  """
726
  hv_name = instance.hypervisor
727
  running_instances = GetInstanceList([hv_name])
728

    
729
  if instance.name not in running_instances:
730
    return True
731

    
732
  hyper = hypervisor.GetHypervisor(hv_name)
733
  try:
734
    hyper.StopInstance(instance)
735
  except errors.HypervisorError, err:
736
    logging.error("Failed to stop instance")
737
    return False
738

    
739
  # test every 10secs for 2min
740
  shutdown_ok = False
741

    
742
  time.sleep(1)
743
  for dummy in range(11):
744
    if instance.name not in GetInstanceList([hv_name]):
745
      break
746
    time.sleep(10)
747
  else:
748
    # the shutdown did not succeed
749
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
750

    
751
    try:
752
      hyper.StopInstance(instance, force=True)
753
    except errors.HypervisorError, err:
754
      logging.exception("Failed to stop instance")
755
      return False
756

    
757
    time.sleep(1)
758
    if instance.name in GetInstanceList([hv_name]):
759
      logging.error("could not shutdown instance '%s' even by destroy",
760
                    instance.name)
761
      return False
762

    
763
  return True
764

    
765

    
766
def RebootInstance(instance, reboot_type, extra_args):
767
  """Reboot an instance.
768

769
  Args:
770
    instance    - name of instance to reboot
771
    reboot_type - how to reboot [soft,hard,full]
772

773
  """
774
  running_instances = GetInstanceList([instance.hypervisor])
775

    
776
  if instance.name not in running_instances:
777
    logging.error("Cannot reboot instance that is not running")
778
    return False
779

    
780
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
781
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
782
    try:
783
      hyper.RebootInstance(instance)
784
    except errors.HypervisorError, err:
785
      logging.exception("Failed to soft reboot instance")
786
      return False
787
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
788
    try:
789
      ShutdownInstance(instance)
790
      StartInstance(instance, extra_args)
791
    except errors.HypervisorError, err:
792
      logging.exception("Failed to hard reboot instance")
793
      return False
794
  else:
795
    raise errors.ParameterError("reboot_type invalid")
796

    
797
  return True
798

    
799

    
800
def MigrateInstance(instance, target, live):
801
  """Migrates an instance to another node.
802

803
  @type instance: C{objects.Instance}
804
  @param instance: the instance definition
805
  @type target: string
806
  @param target: the target node name
807
  @type live: boolean
808
  @param live: whether the migration should be done live or not (the
809
      interpretation of this parameter is left to the hypervisor)
810
  @rtype: tuple
811
  @return: a tuple of (success, msg) where:
812
      - succes is a boolean denoting the success/failure of the operation
813
      - msg is a string with details in case of failure
814

815
  """
816
  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
817

    
818
  try:
819
    hyper.MigrateInstance(instance.name, target, live)
820
  except errors.HypervisorError, err:
821
    msg = "Failed to migrate instance: %s" % str(err)
822
    logging.error(msg)
823
    return (False, msg)
824
  return (True, "Migration successfull")
825

    
826

    
827
def CreateBlockDevice(disk, size, owner, on_primary, info):
828
  """Creates a block device for an instance.
829

830
  Args:
831
   disk: a ganeti.objects.Disk object
832
   size: the size of the physical underlying device
833
   owner: a string with the name of the instance
834
   on_primary: a boolean indicating if it is the primary node or not
835
   info: string that will be sent to the physical device creation
836

837
  Returns:
838
    the new unique_id of the device (this can sometime be
839
    computed only after creation), or None. On secondary nodes,
840
    it's not required to return anything.
841

842
  """
843
  clist = []
844
  if disk.children:
845
    for child in disk.children:
846
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
847
      if on_primary or disk.AssembleOnSecondary():
848
        # we need the children open in case the device itself has to
849
        # be assembled
850
        crdev.Open()
851
      clist.append(crdev)
852
  try:
853
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
854
    if device is not None:
855
      logging.info("removing existing device %s", disk)
856
      device.Remove()
857
  except errors.BlockDeviceError, err:
858
    pass
859

    
860
  device = bdev.Create(disk.dev_type, disk.physical_id,
861
                       clist, size)
862
  if device is None:
863
    raise ValueError("Can't create child device for %s, %s" %
864
                     (disk, size))
865
  if on_primary or disk.AssembleOnSecondary():
866
    if not device.Assemble():
867
      errorstring = "Can't assemble device after creation"
868
      logging.error(errorstring)
869
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
870
                                    " daemon logs" % errorstring)
871
    device.SetSyncSpeed(constants.SYNC_SPEED)
872
    if on_primary or disk.OpenOnSecondary():
873
      device.Open(force=True)
874
    DevCacheManager.UpdateCache(device.dev_path, owner,
875
                                on_primary, disk.iv_name)
876

    
877
  device.SetInfo(info)
878

    
879
  physical_id = device.unique_id
880
  return physical_id
881

    
882

    
883
def RemoveBlockDevice(disk):
884
  """Remove a block device.
885

886
  This is intended to be called recursively.
887

888
  """
889
  try:
890
    # since we are removing the device, allow a partial match
891
    # this allows removal of broken mirrors
892
    rdev = _RecursiveFindBD(disk, allow_partial=True)
893
  except errors.BlockDeviceError, err:
894
    # probably can't attach
895
    logging.info("Can't attach to device %s in remove", disk)
896
    rdev = None
897
  if rdev is not None:
898
    r_path = rdev.dev_path
899
    result = rdev.Remove()
900
    if result:
901
      DevCacheManager.RemoveCache(r_path)
902
  else:
903
    result = True
904
  if disk.children:
905
    for child in disk.children:
906
      result = result and RemoveBlockDevice(child)
907
  return result
908

    
909

    
910
def _RecursiveAssembleBD(disk, owner, as_primary):
911
  """Activate a block device for an instance.
912

913
  This is run on the primary and secondary nodes for an instance.
914

915
  This function is called recursively.
916

917
  Args:
918
    disk: a objects.Disk object
919
    as_primary: if we should make the block device read/write
920

921
  Returns:
922
    the assembled device or None (in case no device was assembled)
923

924
  If the assembly is not successful, an exception is raised.
925

926
  """
927
  children = []
928
  if disk.children:
929
    mcn = disk.ChildrenNeeded()
930
    if mcn == -1:
931
      mcn = 0 # max number of Nones allowed
932
    else:
933
      mcn = len(disk.children) - mcn # max number of Nones
934
    for chld_disk in disk.children:
935
      try:
936
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
937
      except errors.BlockDeviceError, err:
938
        if children.count(None) >= mcn:
939
          raise
940
        cdev = None
941
        logging.debug("Error in child activation: %s", str(err))
942
      children.append(cdev)
943

    
944
  if as_primary or disk.AssembleOnSecondary():
945
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
946
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
947
    result = r_dev
948
    if as_primary or disk.OpenOnSecondary():
949
      r_dev.Open()
950
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
951
                                as_primary, disk.iv_name)
952

    
953
  else:
954
    result = True
955
  return result
956

    
957

    
958
def AssembleBlockDevice(disk, owner, as_primary):
959
  """Activate a block device for an instance.
960

961
  This is a wrapper over _RecursiveAssembleBD.
962

963
  Returns:
964
    a /dev path for primary nodes
965
    True for secondary nodes
966

967
  """
968
  result = _RecursiveAssembleBD(disk, owner, as_primary)
969
  if isinstance(result, bdev.BlockDev):
970
    result = result.dev_path
971
  return result
972

    
973

    
974
def ShutdownBlockDevice(disk):
975
  """Shut down a block device.
976

977
  First, if the device is assembled (can `Attach()`), then the device
978
  is shutdown. Then the children of the device are shutdown.
979

980
  This function is called recursively. Note that we don't cache the
981
  children or such, as oppossed to assemble, shutdown of different
982
  devices doesn't require that the upper device was active.
983

984
  """
985
  r_dev = _RecursiveFindBD(disk)
986
  if r_dev is not None:
987
    r_path = r_dev.dev_path
988
    result = r_dev.Shutdown()
989
    if result:
990
      DevCacheManager.RemoveCache(r_path)
991
  else:
992
    result = True
993
  if disk.children:
994
    for child in disk.children:
995
      result = result and ShutdownBlockDevice(child)
996
  return result
997

    
998

    
999
def MirrorAddChildren(parent_cdev, new_cdevs):
1000
  """Extend a mirrored block device.
1001

1002
  """
1003
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
1004
  if parent_bdev is None:
1005
    logging.error("Can't find parent device")
1006
    return False
1007
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1008
  if new_bdevs.count(None) > 0:
1009
    logging.error("Can't find new device(s) to add: %s:%s",
1010
                  new_bdevs, new_cdevs)
1011
    return False
1012
  parent_bdev.AddChildren(new_bdevs)
1013
  return True
1014

    
1015

    
1016
def MirrorRemoveChildren(parent_cdev, new_cdevs):
1017
  """Shrink a mirrored block device.
1018

1019
  """
1020
  parent_bdev = _RecursiveFindBD(parent_cdev)
1021
  if parent_bdev is None:
1022
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1023
    return False
1024
  devs = []
1025
  for disk in new_cdevs:
1026
    rpath = disk.StaticDevPath()
1027
    if rpath is None:
1028
      bd = _RecursiveFindBD(disk)
1029
      if bd is None:
1030
        logging.error("Can't find dynamic device %s while removing children",
1031
                      disk)
1032
        return False
1033
      else:
1034
        devs.append(bd.dev_path)
1035
    else:
1036
      devs.append(rpath)
1037
  parent_bdev.RemoveChildren(devs)
1038
  return True
1039

    
1040

    
1041
def GetMirrorStatus(disks):
1042
  """Get the mirroring status of a list of devices.
1043

1044
  Args:
1045
    disks: list of `objects.Disk`
1046

1047
  Returns:
1048
    list of (mirror_done, estimated_time) tuples, which
1049
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1050

1051
  """
1052
  stats = []
1053
  for dsk in disks:
1054
    rbd = _RecursiveFindBD(dsk)
1055
    if rbd is None:
1056
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1057
    stats.append(rbd.CombinedSyncStatus())
1058
  return stats
1059

    
1060

    
1061
def _RecursiveFindBD(disk, allow_partial=False):
1062
  """Check if a device is activated.
1063

1064
  If so, return informations about the real device.
1065

1066
  Args:
1067
    disk: the objects.Disk instance
1068
    allow_partial: don't abort the find if a child of the
1069
                   device can't be found; this is intended to be
1070
                   used when repairing mirrors
1071

1072
  Returns:
1073
    None if the device can't be found
1074
    otherwise the device instance
1075

1076
  """
1077
  children = []
1078
  if disk.children:
1079
    for chdisk in disk.children:
1080
      children.append(_RecursiveFindBD(chdisk))
1081

    
1082
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1083

    
1084

    
1085
def FindBlockDevice(disk):
1086
  """Check if a device is activated.
1087

1088
  If so, return informations about the real device.
1089

1090
  Args:
1091
    disk: the objects.Disk instance
1092
  Returns:
1093
    None if the device can't be found
1094
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1095

1096
  """
1097
  rbd = _RecursiveFindBD(disk)
1098
  if rbd is None:
1099
    return rbd
1100
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1101

    
1102

    
1103
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1104
  """Write a file to the filesystem.
1105

1106
  This allows the master to overwrite(!) a file. It will only perform
1107
  the operation if the file belongs to a list of configuration files.
1108

1109
  """
1110
  if not os.path.isabs(file_name):
1111
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1112
                  file_name)
1113
    return False
1114

    
1115
  allowed_files = [
1116
    constants.CLUSTER_CONF_FILE,
1117
    constants.ETC_HOSTS,
1118
    constants.SSH_KNOWN_HOSTS_FILE,
1119
    constants.VNC_PASSWORD_FILE,
1120
    ]
1121

    
1122
  if file_name not in allowed_files:
1123
    logging.error("Filename passed to UploadFile not in allowed"
1124
                 " upload targets: '%s'", file_name)
1125
    return False
1126

    
1127
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1128
                  atime=atime, mtime=mtime)
1129
  return True
1130

    
1131

    
1132
def _ErrnoOrStr(err):
1133
  """Format an EnvironmentError exception.
1134

1135
  If the `err` argument has an errno attribute, it will be looked up
1136
  and converted into a textual EXXXX description. Otherwise the string
1137
  representation of the error will be returned.
1138

1139
  """
1140
  if hasattr(err, 'errno'):
1141
    detail = errno.errorcode[err.errno]
1142
  else:
1143
    detail = str(err)
1144
  return detail
1145

    
1146

    
1147
def _OSOndiskVersion(name, os_dir):
1148
  """Compute and return the API version of a given OS.
1149

1150
  This function will try to read the API version of the os given by
1151
  the 'name' parameter and residing in the 'os_dir' directory.
1152

1153
  Return value will be either an integer denoting the version or None in the
1154
  case when this is not a valid OS name.
1155

1156
  """
1157
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1158

    
1159
  try:
1160
    st = os.stat(api_file)
1161
  except EnvironmentError, err:
1162
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1163
                           " found (%s)" % _ErrnoOrStr(err))
1164

    
1165
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1166
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1167
                           " a regular file")
1168

    
1169
  try:
1170
    f = open(api_file)
1171
    try:
1172
      api_version = f.read(256)
1173
    finally:
1174
      f.close()
1175
  except EnvironmentError, err:
1176
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1177
                           " API version (%s)" % _ErrnoOrStr(err))
1178

    
1179
  api_version = api_version.strip()
1180
  try:
1181
    api_version = int(api_version)
1182
  except (TypeError, ValueError), err:
1183
    raise errors.InvalidOS(name, os_dir,
1184
                           "API version is not integer (%s)" % str(err))
1185

    
1186
  return api_version
1187

    
1188

    
1189
def DiagnoseOS(top_dirs=None):
1190
  """Compute the validity for all OSes.
1191

1192
  Returns an OS object for each name in all the given top directories
1193
  (if not given defaults to constants.OS_SEARCH_PATH)
1194

1195
  Returns:
1196
    list of OS objects
1197

1198
  """
1199
  if top_dirs is None:
1200
    top_dirs = constants.OS_SEARCH_PATH
1201

    
1202
  result = []
1203
  for dir_name in top_dirs:
1204
    if os.path.isdir(dir_name):
1205
      try:
1206
        f_names = utils.ListVisibleFiles(dir_name)
1207
      except EnvironmentError, err:
1208
        logging.exception("Can't list the OS directory %s", dir_name)
1209
        break
1210
      for name in f_names:
1211
        try:
1212
          os_inst = OSFromDisk(name, base_dir=dir_name)
1213
          result.append(os_inst)
1214
        except errors.InvalidOS, err:
1215
          result.append(objects.OS.FromInvalidOS(err))
1216

    
1217
  return result
1218

    
1219

    
1220
def OSFromDisk(name, base_dir=None):
1221
  """Create an OS instance from disk.
1222

1223
  This function will return an OS instance if the given name is a
1224
  valid OS name. Otherwise, it will raise an appropriate
1225
  `errors.InvalidOS` exception, detailing why this is not a valid
1226
  OS.
1227

1228
  Args:
1229
    os_dir: Directory containing the OS scripts. Defaults to a search
1230
            in all the OS_SEARCH_PATH directories.
1231

1232
  """
1233

    
1234
  if base_dir is None:
1235
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1236
    if os_dir is None:
1237
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1238
  else:
1239
    os_dir = os.path.sep.join([base_dir, name])
1240

    
1241
  api_version = _OSOndiskVersion(name, os_dir)
1242

    
1243
  if api_version != constants.OS_API_VERSION:
1244
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1245
                           " (found %s want %s)"
1246
                           % (api_version, constants.OS_API_VERSION))
1247

    
1248
  # OS Scripts dictionary, we will populate it with the actual script names
1249
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1250

    
1251
  for script in os_scripts:
1252
    os_scripts[script] = os.path.sep.join([os_dir, script])
1253

    
1254
    try:
1255
      st = os.stat(os_scripts[script])
1256
    except EnvironmentError, err:
1257
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1258
                             (script, _ErrnoOrStr(err)))
1259

    
1260
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1261
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1262
                             script)
1263

    
1264
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1265
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1266
                             script)
1267

    
1268

    
1269
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1270
                    create_script=os_scripts['create'],
1271
                    export_script=os_scripts['export'],
1272
                    import_script=os_scripts['import'],
1273
                    rename_script=os_scripts['rename'],
1274
                    api_version=api_version)
1275

    
1276

    
1277
def GrowBlockDevice(disk, amount):
1278
  """Grow a stack of block devices.
1279

1280
  This function is called recursively, with the childrens being the
1281
  first one resize.
1282

1283
  Args:
1284
    disk: the disk to be grown
1285

1286
  Returns: a tuple of (status, result), with:
1287
    status: the result (true/false) of the operation
1288
    result: the error message if the operation failed, otherwise not used
1289

1290
  """
1291
  r_dev = _RecursiveFindBD(disk)
1292
  if r_dev is None:
1293
    return False, "Cannot find block device %s" % (disk,)
1294

    
1295
  try:
1296
    r_dev.Grow(amount)
1297
  except errors.BlockDeviceError, err:
1298
    return False, str(err)
1299

    
1300
  return True, None
1301

    
1302

    
1303
def SnapshotBlockDevice(disk):
1304
  """Create a snapshot copy of a block device.
1305

1306
  This function is called recursively, and the snapshot is actually created
1307
  just for the leaf lvm backend device.
1308

1309
  Args:
1310
    disk: the disk to be snapshotted
1311

1312
  Returns:
1313
    a config entry for the actual lvm device snapshotted.
1314

1315
  """
1316
  if disk.children:
1317
    if len(disk.children) == 1:
1318
      # only one child, let's recurse on it
1319
      return SnapshotBlockDevice(disk.children[0])
1320
    else:
1321
      # more than one child, choose one that matches
1322
      for child in disk.children:
1323
        if child.size == disk.size:
1324
          # return implies breaking the loop
1325
          return SnapshotBlockDevice(child)
1326
  elif disk.dev_type == constants.LD_LV:
1327
    r_dev = _RecursiveFindBD(disk)
1328
    if r_dev is not None:
1329
      # let's stay on the safe side and ask for the full size, for now
1330
      return r_dev.Snapshot(disk.size)
1331
    else:
1332
      return None
1333
  else:
1334
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1335
                                 " '%s' of type '%s'" %
1336
                                 (disk.unique_id, disk.dev_type))
1337

    
1338

    
1339
def ExportSnapshot(disk, dest_node, instance, cluster_name):
1340
  """Export a block device snapshot to a remote node.
1341

1342
  Args:
1343
    disk: the snapshot block device
1344
    dest_node: the node to send the image to
1345
    instance: instance being exported
1346

1347
  Returns:
1348
    True if successful, False otherwise.
1349

1350
  """
1351
  inst_os = OSFromDisk(instance.os)
1352
  export_script = inst_os.export_script
1353

    
1354
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1355
                                     instance.name, int(time.time()))
1356
  if not os.path.exists(constants.LOG_OS_DIR):
1357
    os.mkdir(constants.LOG_OS_DIR, 0750)
1358

    
1359
  real_os_dev = _RecursiveFindBD(disk)
1360
  if real_os_dev is None:
1361
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1362
                                  str(disk))
1363
  real_os_dev.Open()
1364

    
1365
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1366
  destfile = disk.physical_id[1]
1367

    
1368
  # the target command is built out of three individual commands,
1369
  # which are joined by pipes; we check each individual command for
1370
  # valid parameters
1371

    
1372
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1373
                               export_script, instance.name,
1374
                               real_os_dev.dev_path, logfile)
1375

    
1376
  comprcmd = "gzip"
1377

    
1378
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1379
                                destdir, destdir, destfile)
1380
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1381
                                                   constants.GANETI_RUNAS,
1382
                                                   destcmd)
1383

    
1384
  # all commands have been checked, so we're safe to combine them
1385
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1386

    
1387
  result = utils.RunCmd(command)
1388

    
1389
  if result.failed:
1390
    logging.error("os snapshot export command '%s' returned error: %s"
1391
                  " output: %s", command, result.fail_reason, result.output)
1392
    return False
1393

    
1394
  return True
1395

    
1396

    
1397
def FinalizeExport(instance, snap_disks):
1398
  """Write out the export configuration information.
1399

1400
  Args:
1401
    instance: instance configuration
1402
    snap_disks: snapshot block devices
1403

1404
  Returns:
1405
    False in case of error, True otherwise.
1406

1407
  """
1408
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1409
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1410

    
1411
  config = objects.SerializableConfigParser()
1412

    
1413
  config.add_section(constants.INISECT_EXP)
1414
  config.set(constants.INISECT_EXP, 'version', '0')
1415
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1416
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1417
  config.set(constants.INISECT_EXP, 'os', instance.os)
1418
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1419

    
1420
  config.add_section(constants.INISECT_INS)
1421
  config.set(constants.INISECT_INS, 'name', instance.name)
1422
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1423
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1424
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1425

    
1426
  nic_count = 0
1427
  for nic_count, nic in enumerate(instance.nics):
1428
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1429
               nic_count, '%s' % nic.mac)
1430
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1431
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1432
               '%s' % nic.bridge)
1433
  # TODO: redundant: on load can read nics until it doesn't exist
1434
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1435

    
1436
  disk_count = 0
1437
  for disk_count, disk in enumerate(snap_disks):
1438
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1439
               ('%s' % disk.iv_name))
1440
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1441
               ('%s' % disk.physical_id[1]))
1442
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1443
               ('%d' % disk.size))
1444
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1445

    
1446
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1447
  cfo = open(cff, 'w')
1448
  try:
1449
    config.write(cfo)
1450
  finally:
1451
    cfo.close()
1452

    
1453
  shutil.rmtree(finaldestdir, True)
1454
  shutil.move(destdir, finaldestdir)
1455

    
1456
  return True
1457

    
1458

    
1459
def ExportInfo(dest):
1460
  """Get export configuration information.
1461

1462
  Args:
1463
    dest: directory containing the export
1464

1465
  Returns:
1466
    A serializable config file containing the export info.
1467

1468
  """
1469
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1470

    
1471
  config = objects.SerializableConfigParser()
1472
  config.read(cff)
1473

    
1474
  if (not config.has_section(constants.INISECT_EXP) or
1475
      not config.has_section(constants.INISECT_INS)):
1476
    return None
1477

    
1478
  return config
1479

    
1480

    
1481
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1482
                         cluster_name):
1483
  """Import an os image into an instance.
1484

1485
  Args:
1486
    instance: the instance object
1487
    os_disk: the instance-visible name of the os device
1488
    swap_disk: the instance-visible name of the swap device
1489
    src_node: node holding the source image
1490
    src_image: path to the source image on src_node
1491

1492
  Returns:
1493
    False in case of error, True otherwise.
1494

1495
  """
1496
  inst_os = OSFromDisk(instance.os)
1497
  import_script = inst_os.import_script
1498

    
1499
  os_device = instance.FindDisk(os_disk)
1500
  if os_device is None:
1501
    logging.error("Can't find this device-visible name '%s'", os_disk)
1502
    return False
1503

    
1504
  swap_device = instance.FindDisk(swap_disk)
1505
  if swap_device is None:
1506
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1507
    return False
1508

    
1509
  real_os_dev = _RecursiveFindBD(os_device)
1510
  if real_os_dev is None:
1511
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1512
                                  str(os_device))
1513
  real_os_dev.Open()
1514

    
1515
  real_swap_dev = _RecursiveFindBD(swap_device)
1516
  if real_swap_dev is None:
1517
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1518
                                  str(swap_device))
1519
  real_swap_dev.Open()
1520

    
1521
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1522
                                        instance.name, int(time.time()))
1523
  if not os.path.exists(constants.LOG_OS_DIR):
1524
    os.mkdir(constants.LOG_OS_DIR, 0750)
1525

    
1526
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1527
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1528
                                                   constants.GANETI_RUNAS,
1529
                                                   destcmd)
1530

    
1531
  comprcmd = "gunzip"
1532
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1533
                               inst_os.path, import_script, instance.name,
1534
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1535
                               logfile)
1536

    
1537
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1538
  env = {'HYPERVISOR': instance.hypervisor}
1539

    
1540
  result = utils.RunCmd(command, env=env)
1541

    
1542
  if result.failed:
1543
    logging.error("os import command '%s' returned error: %s"
1544
                  " output: %s", command, result.fail_reason, result.output)
1545
    return False
1546

    
1547
  return True
1548

    
1549

    
1550
def ListExports():
1551
  """Return a list of exports currently available on this machine.
1552

1553
  """
1554
  if os.path.isdir(constants.EXPORT_DIR):
1555
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1556
  else:
1557
    return []
1558

    
1559

    
1560
def RemoveExport(export):
1561
  """Remove an existing export from the node.
1562

1563
  Args:
1564
    export: the name of the export to remove
1565

1566
  Returns:
1567
    False in case of error, True otherwise.
1568

1569
  """
1570
  target = os.path.join(constants.EXPORT_DIR, export)
1571

    
1572
  shutil.rmtree(target)
1573
  # TODO: catch some of the relevant exceptions and provide a pretty
1574
  # error message if rmtree fails.
1575

    
1576
  return True
1577

    
1578

    
1579
def RenameBlockDevices(devlist):
1580
  """Rename a list of block devices.
1581

1582
  The devlist argument is a list of tuples (disk, new_logical,
1583
  new_physical). The return value will be a combined boolean result
1584
  (True only if all renames succeeded).
1585

1586
  """
1587
  result = True
1588
  for disk, unique_id in devlist:
1589
    dev = _RecursiveFindBD(disk)
1590
    if dev is None:
1591
      result = False
1592
      continue
1593
    try:
1594
      old_rpath = dev.dev_path
1595
      dev.Rename(unique_id)
1596
      new_rpath = dev.dev_path
1597
      if old_rpath != new_rpath:
1598
        DevCacheManager.RemoveCache(old_rpath)
1599
        # FIXME: we should add the new cache information here, like:
1600
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1601
        # but we don't have the owner here - maybe parse from existing
1602
        # cache? for now, we only lose lvm data when we rename, which
1603
        # is less critical than DRBD or MD
1604
    except errors.BlockDeviceError, err:
1605
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1606
      result = False
1607
  return result
1608

    
1609

    
1610
def _TransformFileStorageDir(file_storage_dir):
1611
  """Checks whether given file_storage_dir is valid.
1612

1613
  Checks wheter the given file_storage_dir is within the cluster-wide
1614
  default file_storage_dir stored in SimpleStore. Only paths under that
1615
  directory are allowed.
1616

1617
  Args:
1618
    file_storage_dir: string with path
1619

1620
  Returns:
1621
    normalized file_storage_dir (string) if valid, None otherwise
1622

1623
  """
1624
  cfg = _GetConfig()
1625
  file_storage_dir = os.path.normpath(file_storage_dir)
1626
  base_file_storage_dir = cfg.GetFileStorageDir()
1627
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1628
      base_file_storage_dir):
1629
    logging.error("file storage directory '%s' is not under base file"
1630
                  " storage directory '%s'",
1631
                  file_storage_dir, base_file_storage_dir)
1632
    return None
1633
  return file_storage_dir
1634

    
1635

    
1636
def CreateFileStorageDir(file_storage_dir):
1637
  """Create file storage directory.
1638

1639
  Args:
1640
    file_storage_dir: string containing the path
1641

1642
  Returns:
1643
    tuple with first element a boolean indicating wheter dir
1644
    creation was successful or not
1645

1646
  """
1647
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1648
  result = True,
1649
  if not file_storage_dir:
1650
    result = False,
1651
  else:
1652
    if os.path.exists(file_storage_dir):
1653
      if not os.path.isdir(file_storage_dir):
1654
        logging.error("'%s' is not a directory", file_storage_dir)
1655
        result = False,
1656
    else:
1657
      try:
1658
        os.makedirs(file_storage_dir, 0750)
1659
      except OSError, err:
1660
        logging.error("Cannot create file storage directory '%s': %s",
1661
                      file_storage_dir, err)
1662
        result = False,
1663
  return result
1664

    
1665

    
1666
def RemoveFileStorageDir(file_storage_dir):
1667
  """Remove file storage directory.
1668

1669
  Remove it only if it's empty. If not log an error and return.
1670

1671
  Args:
1672
    file_storage_dir: string containing the path
1673

1674
  Returns:
1675
    tuple with first element a boolean indicating wheter dir
1676
    removal was successful or not
1677

1678
  """
1679
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1680
  result = True,
1681
  if not file_storage_dir:
1682
    result = False,
1683
  else:
1684
    if os.path.exists(file_storage_dir):
1685
      if not os.path.isdir(file_storage_dir):
1686
        logging.error("'%s' is not a directory", file_storage_dir)
1687
        result = False,
1688
      # deletes dir only if empty, otherwise we want to return False
1689
      try:
1690
        os.rmdir(file_storage_dir)
1691
      except OSError, err:
1692
        logging.exception("Cannot remove file storage directory '%s'",
1693
                          file_storage_dir)
1694
        result = False,
1695
  return result
1696

    
1697

    
1698
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1699
  """Rename the file storage directory.
1700

1701
  Args:
1702
    old_file_storage_dir: string containing the old path
1703
    new_file_storage_dir: string containing the new path
1704

1705
  Returns:
1706
    tuple with first element a boolean indicating wheter dir
1707
    rename was successful or not
1708

1709
  """
1710
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1711
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1712
  result = True,
1713
  if not old_file_storage_dir or not new_file_storage_dir:
1714
    result = False,
1715
  else:
1716
    if not os.path.exists(new_file_storage_dir):
1717
      if os.path.isdir(old_file_storage_dir):
1718
        try:
1719
          os.rename(old_file_storage_dir, new_file_storage_dir)
1720
        except OSError, err:
1721
          logging.exception("Cannot rename '%s' to '%s'",
1722
                            old_file_storage_dir, new_file_storage_dir)
1723
          result =  False,
1724
      else:
1725
        logging.error("'%s' is not a directory", old_file_storage_dir)
1726
        result = False,
1727
    else:
1728
      if os.path.exists(old_file_storage_dir):
1729
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1730
                      old_file_storage_dir, new_file_storage_dir)
1731
        result = False,
1732
  return result
1733

    
1734

    
1735
def _IsJobQueueFile(file_name):
1736
  """Checks whether the given filename is in the queue directory.
1737

1738
  """
1739
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1740
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1741

    
1742
  if not result:
1743
    logging.error("'%s' is not a file in the queue directory",
1744
                  file_name)
1745

    
1746
  return result
1747

    
1748

    
1749
def JobQueueUpdate(file_name, content):
1750
  """Updates a file in the queue directory.
1751

1752
  """
1753
  if not _IsJobQueueFile(file_name):
1754
    return False
1755

    
1756
  # Write and replace the file atomically
1757
  utils.WriteFile(file_name, data=content)
1758

    
1759
  return True
1760

    
1761

    
1762
def JobQueueRename(old, new):
1763
  """Renames a job queue file.
1764

1765
  """
1766
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1767
    return False
1768

    
1769
  os.rename(old, new)
1770

    
1771
  return True
1772

    
1773

    
1774
def CloseBlockDevices(disks):
1775
  """Closes the given block devices.
1776

1777
  This means they will be switched to secondary mode (in case of DRBD).
1778

1779
  """
1780
  bdevs = []
1781
  for cf in disks:
1782
    rd = _RecursiveFindBD(cf)
1783
    if rd is None:
1784
      return (False, "Can't find device %s" % cf)
1785
    bdevs.append(rd)
1786

    
1787
  msg = []
1788
  for rd in bdevs:
1789
    try:
1790
      rd.Close()
1791
    except errors.BlockDeviceError, err:
1792
      msg.append(str(err))
1793
  if msg:
1794
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1795
  else:
1796
    return (True, "All devices secondary")
1797

    
1798

    
1799
class HooksRunner(object):
1800
  """Hook runner.
1801

1802
  This class is instantiated on the node side (ganeti-noded) and not on
1803
  the master side.
1804

1805
  """
1806
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1807

    
1808
  def __init__(self, hooks_base_dir=None):
1809
    """Constructor for hooks runner.
1810

1811
    Args:
1812
      - hooks_base_dir: if not None, this overrides the
1813
        constants.HOOKS_BASE_DIR (useful for unittests)
1814

1815
    """
1816
    if hooks_base_dir is None:
1817
      hooks_base_dir = constants.HOOKS_BASE_DIR
1818
    self._BASE_DIR = hooks_base_dir
1819

    
1820
  @staticmethod
1821
  def ExecHook(script, env):
1822
    """Exec one hook script.
1823

1824
    Args:
1825
     - script: the full path to the script
1826
     - env: the environment with which to exec the script
1827

1828
    """
1829
    # exec the process using subprocess and log the output
1830
    fdstdin = None
1831
    try:
1832
      fdstdin = open("/dev/null", "r")
1833
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1834
                               stderr=subprocess.STDOUT, close_fds=True,
1835
                               shell=False, cwd="/", env=env)
1836
      output = ""
1837
      try:
1838
        output = child.stdout.read(4096)
1839
        child.stdout.close()
1840
      except EnvironmentError, err:
1841
        output += "Hook script error: %s" % str(err)
1842

    
1843
      while True:
1844
        try:
1845
          result = child.wait()
1846
          break
1847
        except EnvironmentError, err:
1848
          if err.errno == errno.EINTR:
1849
            continue
1850
          raise
1851
    finally:
1852
      # try not to leak fds
1853
      for fd in (fdstdin, ):
1854
        if fd is not None:
1855
          try:
1856
            fd.close()
1857
          except EnvironmentError, err:
1858
            # just log the error
1859
            #logging.exception("Error while closing fd %s", fd)
1860
            pass
1861

    
1862
    return result == 0, output
1863

    
1864
  def RunHooks(self, hpath, phase, env):
1865
    """Run the scripts in the hooks directory.
1866

1867
    This method will not be usually overriden by child opcodes.
1868

1869
    """
1870
    if phase == constants.HOOKS_PHASE_PRE:
1871
      suffix = "pre"
1872
    elif phase == constants.HOOKS_PHASE_POST:
1873
      suffix = "post"
1874
    else:
1875
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1876
    rr = []
1877

    
1878
    subdir = "%s-%s.d" % (hpath, suffix)
1879
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1880
    try:
1881
      dir_contents = utils.ListVisibleFiles(dir_name)
1882
    except OSError, err:
1883
      # must log
1884
      return rr
1885

    
1886
    # we use the standard python sort order,
1887
    # so 00name is the recommended naming scheme
1888
    dir_contents.sort()
1889
    for relname in dir_contents:
1890
      fname = os.path.join(dir_name, relname)
1891
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1892
          self.RE_MASK.match(relname) is not None):
1893
        rrval = constants.HKR_SKIP
1894
        output = ""
1895
      else:
1896
        result, output = self.ExecHook(fname, env)
1897
        if not result:
1898
          rrval = constants.HKR_FAIL
1899
        else:
1900
          rrval = constants.HKR_SUCCESS
1901
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1902

    
1903
    return rr
1904

    
1905

    
1906
class IAllocatorRunner(object):
1907
  """IAllocator runner.
1908

1909
  This class is instantiated on the node side (ganeti-noded) and not on
1910
  the master side.
1911

1912
  """
1913
  def Run(self, name, idata):
1914
    """Run an iallocator script.
1915

1916
    Return value: tuple of:
1917
       - run status (one of the IARUN_ constants)
1918
       - stdout
1919
       - stderr
1920
       - fail reason (as from utils.RunResult)
1921

1922
    """
1923
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1924
                                  os.path.isfile)
1925
    if alloc_script is None:
1926
      return (constants.IARUN_NOTFOUND, None, None, None)
1927

    
1928
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1929
    try:
1930
      os.write(fd, idata)
1931
      os.close(fd)
1932
      result = utils.RunCmd([alloc_script, fin_name])
1933
      if result.failed:
1934
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1935
                result.fail_reason)
1936
    finally:
1937
      os.unlink(fin_name)
1938

    
1939
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1940

    
1941

    
1942
class DevCacheManager(object):
1943
  """Simple class for managing a cache of block device information.
1944

1945
  """
1946
  _DEV_PREFIX = "/dev/"
1947
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1948

    
1949
  @classmethod
1950
  def _ConvertPath(cls, dev_path):
1951
    """Converts a /dev/name path to the cache file name.
1952

1953
    This replaces slashes with underscores and strips the /dev
1954
    prefix. It then returns the full path to the cache file
1955

1956
    """
1957
    if dev_path.startswith(cls._DEV_PREFIX):
1958
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1959
    dev_path = dev_path.replace("/", "_")
1960
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1961
    return fpath
1962

    
1963
  @classmethod
1964
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1965
    """Updates the cache information for a given device.
1966

1967
    """
1968
    if dev_path is None:
1969
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1970
      return
1971
    fpath = cls._ConvertPath(dev_path)
1972
    if on_primary:
1973
      state = "primary"
1974
    else:
1975
      state = "secondary"
1976
    if iv_name is None:
1977
      iv_name = "not_visible"
1978
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1979
    try:
1980
      utils.WriteFile(fpath, data=fdata)
1981
    except EnvironmentError, err:
1982
      logging.exception("Can't update bdev cache for %s", dev_path)
1983

    
1984
  @classmethod
1985
  def RemoveCache(cls, dev_path):
1986
    """Remove data for a dev_path.
1987

1988
    """
1989
    if dev_path is None:
1990
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
1991
      return
1992
    fpath = cls._ConvertPath(dev_path)
1993
    try:
1994
      utils.RemoveFile(fpath)
1995
    except EnvironmentError, err:
1996
      logging.exception("Can't update bdev cache for %s", dev_path)