Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ caad16e2

History | View | Annotate | Download (57.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetConfig():
48
  return ssconf.SimpleConfigReader()
49

    
50

    
51
def _GetSshRunner(cluster_name):
52
  return ssh.SshRunner(cluster_name)
53

    
54

    
55
def _CleanDirectory(path, exclude=[]):
56
  """Removes all regular files in a directory.
57

58
  @param exclude: List of files to be excluded.
59
  @type exclude: list
60

61
  """
62
  if not os.path.isdir(path):
63
    return
64

    
65
  # Normalize excluded paths
66
  exclude = [os.path.normpath(i) for i in exclude]
67

    
68
  for rel_name in utils.ListVisibleFiles(path):
69
    full_name = os.path.normpath(os.path.join(path, rel_name))
70
    if full_name in exclude:
71
      continue
72
    if os.path.isfile(full_name) and not os.path.islink(full_name):
73
      utils.RemoveFile(full_name)
74

    
75

    
76
def JobQueuePurge():
77
  """Removes job queue files and archived jobs
78

79
  """
80
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82

    
83

    
84
def GetMasterInfo():
85
  """Returns master information.
86

87
  This is an utility function to compute master information, either
88
  for consumption here or from the node daemon.
89

90
  @rtype: tuple
91
  @return: (master_netdev, master_ip, master_name)
92

93
  """
94
  try:
95
    cfg = _GetConfig()
96
    master_netdev = cfg.GetMasterNetdev()
97
    master_ip = cfg.GetMasterIP()
98
    master_node = cfg.GetMasterNode()
99
  except errors.ConfigurationError, err:
100
    logging.exception("Cluster configuration incomplete")
101
    return (None, None)
102
  return (master_netdev, master_ip, master_node)
103

    
104

    
105
def StartMaster(start_daemons):
106
  """Activate local node as master node.
107

108
  The function will always try activate the IP address of the master
109
  (if someone else has it, then it won't). Then, if the start_daemons
110
  parameter is True, it will also start the master daemons
111
  (ganet-masterd and ganeti-rapi).
112

113
  """
114
  ok = True
115
  master_netdev, master_ip, _ = GetMasterInfo()
116
  if not master_netdev:
117
    return False
118

    
119
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120
    if utils.OwnIpAddress(master_ip):
121
      # we already have the ip:
122
      logging.debug("Already started")
123
    else:
124
      logging.error("Someone else has the master ip, not activating")
125
      ok = False
126
  else:
127
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128
                           "dev", master_netdev, "label",
129
                           "%s:0" % master_netdev])
130
    if result.failed:
131
      logging.error("Can't activate master IP: %s", result.output)
132
      ok = False
133

    
134
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135
                           "-s", master_ip, master_ip])
136
    # we'll ignore the exit code of arping
137

    
138
  # and now start the master and rapi daemons
139
  if start_daemons:
140
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
141
      result = utils.RunCmd([daemon])
142
      if result.failed:
143
        logging.error("Can't start daemon %s: %s", daemon, result.output)
144
        ok = False
145
  return ok
146

    
147

    
148
def StopMaster(stop_daemons):
149
  """Deactivate this node as master.
150

151
  The function will always try to deactivate the IP address of the
152
  master. Then, if the stop_daemons parameter is True, it will also
153
  stop the master daemons (ganet-masterd and ganeti-rapi).
154

155
  """
156
  master_netdev, master_ip, _ = GetMasterInfo()
157
  if not master_netdev:
158
    return False
159

    
160
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161
                         "dev", master_netdev])
162
  if result.failed:
163
    logging.error("Can't remove the master IP, error: %s", result.output)
164
    # but otherwise ignore the failure
165

    
166
  if stop_daemons:
167
    # stop/kill the rapi and the master daemon
168
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
170

    
171
  return True
172

    
173

    
174
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175
  """Joins this node to the cluster.
176

177
  This does the following:
178
      - updates the hostkeys of the machine (rsa and dsa)
179
      - adds the ssh private key to the user
180
      - adds the ssh public key to the users' authorized_keys file
181

182
  """
183
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187
  for name, content, mode in sshd_keys:
188
    utils.WriteFile(name, data=content, mode=mode)
189

    
190
  try:
191
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
192
                                                    mkdir=True)
193
  except errors.OpExecError, err:
194
    logging.exception("Error while processing user ssh files")
195
    return False
196

    
197
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198
    utils.WriteFile(name, data=content, mode=0600)
199

    
200
  utils.AddAuthorizedKey(auth_keys, sshpub)
201

    
202
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
203

    
204
  return True
205

    
206

    
207
def LeaveCluster():
208
  """Cleans up the current node and prepares it to be removed from the cluster.
209

210
  """
211
  _CleanDirectory(constants.DATA_DIR)
212
  JobQueuePurge()
213

    
214
  try:
215
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216
  except errors.OpExecError:
217
    logging.exception("Error while processing ssh files")
218
    return
219

    
220
  f = open(pub_key, 'r')
221
  try:
222
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
223
  finally:
224
    f.close()
225

    
226
  utils.RemoveFile(priv_key)
227
  utils.RemoveFile(pub_key)
228

    
229
  # Return a reassuring string to the caller, and quit
230
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
231

    
232

    
233
def GetNodeInfo(vgname, hypervisor_type):
234
  """Gives back a hash with different informations about the node.
235

236
  @type vgname: C{string}
237
  @param vgname: the name of the volume group to ask for disk space information
238
  @type hypervisor_type: C{str}
239
  @param hypervisor_type: the name of the hypervisor to ask for
240
      memory information
241
  @rtype: C{dict}
242
  @return: dictionary with the following keys:
243
      - vg_size is the size of the configured volume group in MiB
244
      - vg_free is the free size of the volume group in MiB
245
      - memory_dom0 is the memory allocated for domain0 in MiB
246
      - memory_free is the currently available (free) ram in MiB
247
      - memory_total is the total number of ram in MiB
248

249
  """
250
  outputarray = {}
251
  vginfo = _GetVGInfo(vgname)
252
  outputarray['vg_size'] = vginfo['vg_size']
253
  outputarray['vg_free'] = vginfo['vg_free']
254

    
255
  hyper = hypervisor.GetHypervisor(hypervisor_type)
256
  hyp_info = hyper.GetNodeInfo()
257
  if hyp_info is not None:
258
    outputarray.update(hyp_info)
259

    
260
  f = open("/proc/sys/kernel/random/boot_id", 'r')
261
  try:
262
    outputarray["bootid"] = f.read(128).rstrip("\n")
263
  finally:
264
    f.close()
265

    
266
  return outputarray
267

    
268

    
269
def VerifyNode(what, cluster_name):
270
  """Verify the status of the local node.
271

272
  Based on the input L{what} parameter, various checks are done on the
273
  local node.
274

275
  If the I{filelist} key is present, this list of
276
  files is checksummed and the file/checksum pairs are returned.
277

278
  If the I{nodelist} key is present, we check that we have
279
  connectivity via ssh with the target nodes (and check the hostname
280
  report).
281

282
  If the I{node-net-test} key is present, we check that we have
283
  connectivity to the given nodes via both primary IP and, if
284
  applicable, secondary IPs.
285

286
  @type what: C{dict}
287
  @param what: a dictionary of things to check:
288
      - filelist: list of files for which to compute checksums
289
      - nodelist: list of nodes we should check ssh communication with
290
      - node-net-test: list of nodes we should check node daemon port
291
        connectivity with
292
      - hypervisor: list with hypervisors to run the verify for
293

294
  """
295
  result = {}
296

    
297
  if 'hypervisor' in what:
298
    result['hypervisor'] = my_dict = {}
299
    for hv_name in what['hypervisor']:
300
      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
301

    
302
  if 'filelist' in what:
303
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
304

    
305
  if 'nodelist' in what:
306
    result['nodelist'] = {}
307
    random.shuffle(what['nodelist'])
308
    for node in what['nodelist']:
309
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
310
      if not success:
311
        result['nodelist'][node] = message
312
  if 'node-net-test' in what:
313
    result['node-net-test'] = {}
314
    my_name = utils.HostInfo().name
315
    my_pip = my_sip = None
316
    for name, pip, sip in what['node-net-test']:
317
      if name == my_name:
318
        my_pip = pip
319
        my_sip = sip
320
        break
321
    if not my_pip:
322
      result['node-net-test'][my_name] = ("Can't find my own"
323
                                          " primary/secondary IP"
324
                                          " in the node list")
325
    else:
326
      port = utils.GetNodeDaemonPort()
327
      for name, pip, sip in what['node-net-test']:
328
        fail = []
329
        if not utils.TcpPing(pip, port, source=my_pip):
330
          fail.append("primary")
331
        if sip != pip:
332
          if not utils.TcpPing(sip, port, source=my_sip):
333
            fail.append("secondary")
334
        if fail:
335
          result['node-net-test'][name] = ("failure using the %s"
336
                                           " interface(s)" %
337
                                           " and ".join(fail))
338

    
339
  return result
340

    
341

    
342
def GetVolumeList(vg_name):
343
  """Compute list of logical volumes and their size.
344

345
  Returns:
346
    dictionary of all partions (key) with their size (in MiB), inactive
347
    and online status:
348
    {'test1': ('20.06', True, True)}
349

350
  """
351
  lvs = {}
352
  sep = '|'
353
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354
                         "--separator=%s" % sep,
355
                         "-olv_name,lv_size,lv_attr", vg_name])
356
  if result.failed:
357
    logging.error("Failed to list logical volumes, lvs output: %s",
358
                  result.output)
359
    return result.output
360

    
361
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362
  for line in result.stdout.splitlines():
363
    line = line.strip()
364
    match = valid_line_re.match(line)
365
    if not match:
366
      logging.error("Invalid line returned from lvs output: '%s'", line)
367
      continue
368
    name, size, attr = match.groups()
369
    inactive = attr[4] == '-'
370
    online = attr[5] == 'o'
371
    lvs[name] = (size, inactive, online)
372

    
373
  return lvs
374

    
375

    
376
def ListVolumeGroups():
377
  """List the volume groups and their size.
378

379
  Returns:
380
    Dictionary with keys volume name and values the size of the volume
381

382
  """
383
  return utils.ListVolumeGroups()
384

    
385

    
386
def NodeVolumes():
387
  """List all volumes on this node.
388

389
  """
390
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
391
                         "--separator=|",
392
                         "--options=lv_name,lv_size,devices,vg_name"])
393
  if result.failed:
394
    logging.error("Failed to list logical volumes, lvs output: %s",
395
                  result.output)
396
    return {}
397

    
398
  def parse_dev(dev):
399
    if '(' in dev:
400
      return dev.split('(')[0]
401
    else:
402
      return dev
403

    
404
  def map_line(line):
405
    return {
406
      'name': line[0].strip(),
407
      'size': line[1].strip(),
408
      'dev': parse_dev(line[2].strip()),
409
      'vg': line[3].strip(),
410
    }
411

    
412
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
413
          if line.count('|') >= 3]
414

    
415

    
416
def BridgesExist(bridges_list):
417
  """Check if a list of bridges exist on the current node.
418

419
  Returns:
420
    True if all of them exist, false otherwise
421

422
  """
423
  for bridge in bridges_list:
424
    if not utils.BridgeExists(bridge):
425
      return False
426

    
427
  return True
428

    
429

    
430
def GetInstanceList(hypervisor_list):
431
  """Provides a list of instances.
432

433
  @type hypervisor_list: list
434
  @param hypervisor_list: the list of hypervisors to query information
435

436
  @rtype: list
437
  @return: a list of all running instances on the current node
438
             - instance1.example.com
439
             - instance2.example.com
440

441
  """
442
  results = []
443
  for hname in hypervisor_list:
444
    try:
445
      names = hypervisor.GetHypervisor(hname).ListInstances()
446
      results.extend(names)
447
    except errors.HypervisorError, err:
448
      logging.exception("Error enumerating instances for hypevisor %s", hname)
449
      # FIXME: should we somehow not propagate this to the master?
450
      raise
451

    
452
  return results
453

    
454

    
455
def GetInstanceInfo(instance, hname):
456
  """Gives back the informations about an instance as a dictionary.
457

458
  @type instance: string
459
  @param instance: the instance name
460
  @type hname: string
461
  @param hname: the hypervisor type of the instance
462

463
  @rtype: dict
464
  @return: dictionary with the following keys:
465
      - memory: memory size of instance (int)
466
      - state: xen state of instance (string)
467
      - time: cpu time of instance (float)
468

469
  """
470
  output = {}
471

    
472
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473
  if iinfo is not None:
474
    output['memory'] = iinfo[2]
475
    output['state'] = iinfo[4]
476
    output['time'] = iinfo[5]
477

    
478
  return output
479

    
480

    
481
def GetAllInstancesInfo(hypervisor_list):
482
  """Gather data about all instances.
483

484
  This is the equivalent of `GetInstanceInfo()`, except that it
485
  computes data for all instances at once, thus being faster if one
486
  needs data about more than one instance.
487

488
  @type hypervisor_list: list
489
  @param hypervisor_list: list of hypervisors to query for instance data
490

491
  @rtype: dict of dicts
492
  @return: dictionary of instance: data, with data having the following keys:
493
      - memory: memory size of instance (int)
494
      - state: xen state of instance (string)
495
      - time: cpu time of instance (float)
496
      - vcpuus: the number of vcpus
497

498
  """
499
  output = {}
500

    
501
  for hname in hypervisor_list:
502
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
503
    if iinfo:
504
      for name, inst_id, memory, vcpus, state, times in iinfo:
505
        if name in output:
506
          raise errors.HypervisorError("Instance %s running duplicate" % name)
507
        output[name] = {
508
          'memory': memory,
509
          'vcpus': vcpus,
510
          'state': state,
511
          'time': times,
512
          }
513

    
514
  return output
515

    
516

    
517
def AddOSToInstance(instance, os_disk, swap_disk):
518
  """Add an OS to an instance.
519

520
  Args:
521
    instance: the instance object
522
    os_disk: the instance-visible name of the os device
523
    swap_disk: the instance-visible name of the swap device
524

525
  """
526
  inst_os = OSFromDisk(instance.os)
527

    
528
  create_script = inst_os.create_script
529

    
530
  os_device = instance.FindDisk(os_disk)
531
  if os_device is None:
532
    logging.error("Can't find this device-visible name '%s'", os_disk)
533
    return False
534

    
535
  swap_device = instance.FindDisk(swap_disk)
536
  if swap_device is None:
537
    logging.error("Can't find this device-visible name '%s'", swap_disk)
538
    return False
539

    
540
  real_os_dev = _RecursiveFindBD(os_device)
541
  if real_os_dev is None:
542
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
543
                                  str(os_device))
544
  real_os_dev.Open()
545

    
546
  real_swap_dev = _RecursiveFindBD(swap_device)
547
  if real_swap_dev is None:
548
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
549
                                  str(swap_device))
550
  real_swap_dev.Open()
551

    
552
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
553
                                     instance.name, int(time.time()))
554
  if not os.path.exists(constants.LOG_OS_DIR):
555
    os.mkdir(constants.LOG_OS_DIR, 0750)
556

    
557
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
558
                                inst_os.path, create_script, instance.name,
559
                                real_os_dev.dev_path, real_swap_dev.dev_path,
560
                                logfile)
561
  env = {'HYPERVISOR': instance.hypervisor}
562

    
563
  result = utils.RunCmd(command, env=env)
564
  if result.failed:
565
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
566
                  " output: %s", command, result.fail_reason, logfile,
567
                  result.output)
568
    return False
569

    
570
  return True
571

    
572

    
573
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
574
  """Run the OS rename script for an instance.
575

576
  Args:
577
    instance: the instance object
578
    old_name: the old name of the instance
579
    os_disk: the instance-visible name of the os device
580
    swap_disk: the instance-visible name of the swap device
581

582
  """
583
  inst_os = OSFromDisk(instance.os)
584

    
585
  script = inst_os.rename_script
586

    
587
  os_device = instance.FindDisk(os_disk)
588
  if os_device is None:
589
    logging.error("Can't find this device-visible name '%s'", os_disk)
590
    return False
591

    
592
  swap_device = instance.FindDisk(swap_disk)
593
  if swap_device is None:
594
    logging.error("Can't find this device-visible name '%s'", swap_disk)
595
    return False
596

    
597
  real_os_dev = _RecursiveFindBD(os_device)
598
  if real_os_dev is None:
599
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
600
                                  str(os_device))
601
  real_os_dev.Open()
602

    
603
  real_swap_dev = _RecursiveFindBD(swap_device)
604
  if real_swap_dev is None:
605
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
606
                                  str(swap_device))
607
  real_swap_dev.Open()
608

    
609
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
610
                                           old_name,
611
                                           instance.name, int(time.time()))
612
  if not os.path.exists(constants.LOG_OS_DIR):
613
    os.mkdir(constants.LOG_OS_DIR, 0750)
614

    
615
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
616
                                inst_os.path, script, old_name, instance.name,
617
                                real_os_dev.dev_path, real_swap_dev.dev_path,
618
                                logfile)
619

    
620
  result = utils.RunCmd(command)
621

    
622
  if result.failed:
623
    logging.error("os create command '%s' returned error: %s output: %s",
624
                  command, result.fail_reason, result.output)
625
    return False
626

    
627
  return True
628

    
629

    
630
def _GetVGInfo(vg_name):
631
  """Get informations about the volume group.
632

633
  Args:
634
    vg_name: the volume group
635

636
  Returns:
637
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
638
    where
639
    vg_size is the total size of the volume group in MiB
640
    vg_free is the free size of the volume group in MiB
641
    pv_count are the number of physical disks in that vg
642

643
  If an error occurs during gathering of data, we return the same dict
644
  with keys all set to None.
645

646
  """
647
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
648

    
649
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
650
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
651

    
652
  if retval.failed:
653
    logging.error("volume group %s not present", vg_name)
654
    return retdic
655
  valarr = retval.stdout.strip().rstrip(':').split(':')
656
  if len(valarr) == 3:
657
    try:
658
      retdic = {
659
        "vg_size": int(round(float(valarr[0]), 0)),
660
        "vg_free": int(round(float(valarr[1]), 0)),
661
        "pv_count": int(valarr[2]),
662
        }
663
    except ValueError, err:
664
      logging.exception("Fail to parse vgs output")
665
  else:
666
    logging.error("vgs output has the wrong number of fields (expected"
667
                  " three): %s", str(valarr))
668
  return retdic
669

    
670

    
671
def _GatherBlockDevs(instance):
672
  """Set up an instance's block device(s).
673

674
  This is run on the primary node at instance startup. The block
675
  devices must be already assembled.
676

677
  """
678
  block_devices = []
679
  for disk in instance.disks:
680
    device = _RecursiveFindBD(disk)
681
    if device is None:
682
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
683
                                    str(disk))
684
    device.Open()
685
    block_devices.append((disk, device))
686
  return block_devices
687

    
688

    
689
def StartInstance(instance, extra_args):
690
  """Start an instance.
691

692
  @type instance: instance object
693
  @param instance: the instance object
694
  @rtype: boolean
695
  @return: whether the startup was successful or not
696

697
  """
698
  running_instances = GetInstanceList([instance.hypervisor])
699

    
700
  if instance.name in running_instances:
701
    return True
702

    
703
  block_devices = _GatherBlockDevs(instance)
704
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
705

    
706
  try:
707
    hyper.StartInstance(instance, block_devices, extra_args)
708
  except errors.HypervisorError, err:
709
    logging.exception("Failed to start instance")
710
    return False
711

    
712
  return True
713

    
714

    
715
def ShutdownInstance(instance):
716
  """Shut an instance down.
717

718
  @type instance: instance object
719
  @param instance: the instance object
720
  @rtype: boolean
721
  @return: whether the startup was successful or not
722

723
  """
724
  hv_name = instance.hypervisor
725
  running_instances = GetInstanceList([hv_name])
726

    
727
  if instance.name not in running_instances:
728
    return True
729

    
730
  hyper = hypervisor.GetHypervisor(hv_name)
731
  try:
732
    hyper.StopInstance(instance)
733
  except errors.HypervisorError, err:
734
    logging.error("Failed to stop instance")
735
    return False
736

    
737
  # test every 10secs for 2min
738
  shutdown_ok = False
739

    
740
  time.sleep(1)
741
  for dummy in range(11):
742
    if instance.name not in GetInstanceList([hv_name]):
743
      break
744
    time.sleep(10)
745
  else:
746
    # the shutdown did not succeed
747
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
748

    
749
    try:
750
      hyper.StopInstance(instance, force=True)
751
    except errors.HypervisorError, err:
752
      logging.exception("Failed to stop instance")
753
      return False
754

    
755
    time.sleep(1)
756
    if instance.name in GetInstanceList([hv_name]):
757
      logging.error("could not shutdown instance '%s' even by destroy",
758
                    instance.name)
759
      return False
760

    
761
  return True
762

    
763

    
764
def RebootInstance(instance, reboot_type, extra_args):
765
  """Reboot an instance.
766

767
  Args:
768
    instance    - name of instance to reboot
769
    reboot_type - how to reboot [soft,hard,full]
770

771
  """
772
  running_instances = GetInstanceList([instance.hypervisor])
773

    
774
  if instance.name not in running_instances:
775
    logging.error("Cannot reboot instance that is not running")
776
    return False
777

    
778
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
779
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
780
    try:
781
      hyper.RebootInstance(instance)
782
    except errors.HypervisorError, err:
783
      logging.exception("Failed to soft reboot instance")
784
      return False
785
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
786
    try:
787
      ShutdownInstance(instance)
788
      StartInstance(instance, extra_args)
789
    except errors.HypervisorError, err:
790
      logging.exception("Failed to hard reboot instance")
791
      return False
792
  else:
793
    raise errors.ParameterError("reboot_type invalid")
794

    
795
  return True
796

    
797

    
798
def MigrateInstance(instance, target, live):
799
  """Migrates an instance to another node.
800

801
  @type instance: C{objects.Instance}
802
  @param instance: the instance definition
803
  @type target: string
804
  @param target: the target node name
805
  @type live: boolean
806
  @param live: whether the migration should be done live or not (the
807
      interpretation of this parameter is left to the hypervisor)
808
  @rtype: tuple
809
  @return: a tuple of (success, msg) where:
810
      - succes is a boolean denoting the success/failure of the operation
811
      - msg is a string with details in case of failure
812

813
  """
814
  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
815

    
816
  try:
817
    hyper.MigrateInstance(instance.name, target, live)
818
  except errors.HypervisorError, err:
819
    msg = "Failed to migrate instance: %s" % str(err)
820
    logging.error(msg)
821
    return (False, msg)
822
  return (True, "Migration successfull")
823

    
824

    
825
def CreateBlockDevice(disk, size, owner, on_primary, info):
826
  """Creates a block device for an instance.
827

828
  Args:
829
   disk: a ganeti.objects.Disk object
830
   size: the size of the physical underlying device
831
   owner: a string with the name of the instance
832
   on_primary: a boolean indicating if it is the primary node or not
833
   info: string that will be sent to the physical device creation
834

835
  Returns:
836
    the new unique_id of the device (this can sometime be
837
    computed only after creation), or None. On secondary nodes,
838
    it's not required to return anything.
839

840
  """
841
  clist = []
842
  if disk.children:
843
    for child in disk.children:
844
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
845
      if on_primary or disk.AssembleOnSecondary():
846
        # we need the children open in case the device itself has to
847
        # be assembled
848
        crdev.Open()
849
      clist.append(crdev)
850
  try:
851
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
852
    if device is not None:
853
      logging.info("removing existing device %s", disk)
854
      device.Remove()
855
  except errors.BlockDeviceError, err:
856
    pass
857

    
858
  device = bdev.Create(disk.dev_type, disk.physical_id,
859
                       clist, size)
860
  if device is None:
861
    raise ValueError("Can't create child device for %s, %s" %
862
                     (disk, size))
863
  if on_primary or disk.AssembleOnSecondary():
864
    if not device.Assemble():
865
      errorstring = "Can't assemble device after creation"
866
      logging.error(errorstring)
867
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
868
                                    " daemon logs" % errorstring)
869
    device.SetSyncSpeed(constants.SYNC_SPEED)
870
    if on_primary or disk.OpenOnSecondary():
871
      device.Open(force=True)
872
    DevCacheManager.UpdateCache(device.dev_path, owner,
873
                                on_primary, disk.iv_name)
874

    
875
  device.SetInfo(info)
876

    
877
  physical_id = device.unique_id
878
  return physical_id
879

    
880

    
881
def RemoveBlockDevice(disk):
882
  """Remove a block device.
883

884
  This is intended to be called recursively.
885

886
  """
887
  try:
888
    # since we are removing the device, allow a partial match
889
    # this allows removal of broken mirrors
890
    rdev = _RecursiveFindBD(disk, allow_partial=True)
891
  except errors.BlockDeviceError, err:
892
    # probably can't attach
893
    logging.info("Can't attach to device %s in remove", disk)
894
    rdev = None
895
  if rdev is not None:
896
    r_path = rdev.dev_path
897
    result = rdev.Remove()
898
    if result:
899
      DevCacheManager.RemoveCache(r_path)
900
  else:
901
    result = True
902
  if disk.children:
903
    for child in disk.children:
904
      result = result and RemoveBlockDevice(child)
905
  return result
906

    
907

    
908
def _RecursiveAssembleBD(disk, owner, as_primary):
909
  """Activate a block device for an instance.
910

911
  This is run on the primary and secondary nodes for an instance.
912

913
  This function is called recursively.
914

915
  Args:
916
    disk: a objects.Disk object
917
    as_primary: if we should make the block device read/write
918

919
  Returns:
920
    the assembled device or None (in case no device was assembled)
921

922
  If the assembly is not successful, an exception is raised.
923

924
  """
925
  children = []
926
  if disk.children:
927
    mcn = disk.ChildrenNeeded()
928
    if mcn == -1:
929
      mcn = 0 # max number of Nones allowed
930
    else:
931
      mcn = len(disk.children) - mcn # max number of Nones
932
    for chld_disk in disk.children:
933
      try:
934
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
935
      except errors.BlockDeviceError, err:
936
        if children.count(None) >= mcn:
937
          raise
938
        cdev = None
939
        logging.debug("Error in child activation: %s", str(err))
940
      children.append(cdev)
941

    
942
  if as_primary or disk.AssembleOnSecondary():
943
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
944
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
945
    result = r_dev
946
    if as_primary or disk.OpenOnSecondary():
947
      r_dev.Open()
948
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
949
                                as_primary, disk.iv_name)
950

    
951
  else:
952
    result = True
953
  return result
954

    
955

    
956
def AssembleBlockDevice(disk, owner, as_primary):
957
  """Activate a block device for an instance.
958

959
  This is a wrapper over _RecursiveAssembleBD.
960

961
  Returns:
962
    a /dev path for primary nodes
963
    True for secondary nodes
964

965
  """
966
  result = _RecursiveAssembleBD(disk, owner, as_primary)
967
  if isinstance(result, bdev.BlockDev):
968
    result = result.dev_path
969
  return result
970

    
971

    
972
def ShutdownBlockDevice(disk):
973
  """Shut down a block device.
974

975
  First, if the device is assembled (can `Attach()`), then the device
976
  is shutdown. Then the children of the device are shutdown.
977

978
  This function is called recursively. Note that we don't cache the
979
  children or such, as oppossed to assemble, shutdown of different
980
  devices doesn't require that the upper device was active.
981

982
  """
983
  r_dev = _RecursiveFindBD(disk)
984
  if r_dev is not None:
985
    r_path = r_dev.dev_path
986
    result = r_dev.Shutdown()
987
    if result:
988
      DevCacheManager.RemoveCache(r_path)
989
  else:
990
    result = True
991
  if disk.children:
992
    for child in disk.children:
993
      result = result and ShutdownBlockDevice(child)
994
  return result
995

    
996

    
997
def MirrorAddChildren(parent_cdev, new_cdevs):
998
  """Extend a mirrored block device.
999

1000
  """
1001
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
1002
  if parent_bdev is None:
1003
    logging.error("Can't find parent device")
1004
    return False
1005
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1006
  if new_bdevs.count(None) > 0:
1007
    logging.error("Can't find new device(s) to add: %s:%s",
1008
                  new_bdevs, new_cdevs)
1009
    return False
1010
  parent_bdev.AddChildren(new_bdevs)
1011
  return True
1012

    
1013

    
1014
def MirrorRemoveChildren(parent_cdev, new_cdevs):
1015
  """Shrink a mirrored block device.
1016

1017
  """
1018
  parent_bdev = _RecursiveFindBD(parent_cdev)
1019
  if parent_bdev is None:
1020
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1021
    return False
1022
  devs = []
1023
  for disk in new_cdevs:
1024
    rpath = disk.StaticDevPath()
1025
    if rpath is None:
1026
      bd = _RecursiveFindBD(disk)
1027
      if bd is None:
1028
        logging.error("Can't find dynamic device %s while removing children",
1029
                      disk)
1030
        return False
1031
      else:
1032
        devs.append(bd.dev_path)
1033
    else:
1034
      devs.append(rpath)
1035
  parent_bdev.RemoveChildren(devs)
1036
  return True
1037

    
1038

    
1039
def GetMirrorStatus(disks):
1040
  """Get the mirroring status of a list of devices.
1041

1042
  Args:
1043
    disks: list of `objects.Disk`
1044

1045
  Returns:
1046
    list of (mirror_done, estimated_time) tuples, which
1047
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1048

1049
  """
1050
  stats = []
1051
  for dsk in disks:
1052
    rbd = _RecursiveFindBD(dsk)
1053
    if rbd is None:
1054
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1055
    stats.append(rbd.CombinedSyncStatus())
1056
  return stats
1057

    
1058

    
1059
def _RecursiveFindBD(disk, allow_partial=False):
1060
  """Check if a device is activated.
1061

1062
  If so, return informations about the real device.
1063

1064
  Args:
1065
    disk: the objects.Disk instance
1066
    allow_partial: don't abort the find if a child of the
1067
                   device can't be found; this is intended to be
1068
                   used when repairing mirrors
1069

1070
  Returns:
1071
    None if the device can't be found
1072
    otherwise the device instance
1073

1074
  """
1075
  children = []
1076
  if disk.children:
1077
    for chdisk in disk.children:
1078
      children.append(_RecursiveFindBD(chdisk))
1079

    
1080
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1081

    
1082

    
1083
def FindBlockDevice(disk):
1084
  """Check if a device is activated.
1085

1086
  If so, return informations about the real device.
1087

1088
  Args:
1089
    disk: the objects.Disk instance
1090
  Returns:
1091
    None if the device can't be found
1092
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1093

1094
  """
1095
  rbd = _RecursiveFindBD(disk)
1096
  if rbd is None:
1097
    return rbd
1098
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1099

    
1100

    
1101
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1102
  """Write a file to the filesystem.
1103

1104
  This allows the master to overwrite(!) a file. It will only perform
1105
  the operation if the file belongs to a list of configuration files.
1106

1107
  """
1108
  if not os.path.isabs(file_name):
1109
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1110
                  file_name)
1111
    return False
1112

    
1113
  allowed_files = [
1114
    constants.CLUSTER_CONF_FILE,
1115
    constants.ETC_HOSTS,
1116
    constants.SSH_KNOWN_HOSTS_FILE,
1117
    constants.VNC_PASSWORD_FILE,
1118
    ]
1119

    
1120
  if file_name not in allowed_files:
1121
    logging.error("Filename passed to UploadFile not in allowed"
1122
                 " upload targets: '%s'", file_name)
1123
    return False
1124

    
1125
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1126
                  atime=atime, mtime=mtime)
1127
  return True
1128

    
1129

    
1130
def _ErrnoOrStr(err):
1131
  """Format an EnvironmentError exception.
1132

1133
  If the `err` argument has an errno attribute, it will be looked up
1134
  and converted into a textual EXXXX description. Otherwise the string
1135
  representation of the error will be returned.
1136

1137
  """
1138
  if hasattr(err, 'errno'):
1139
    detail = errno.errorcode[err.errno]
1140
  else:
1141
    detail = str(err)
1142
  return detail
1143

    
1144

    
1145
def _OSOndiskVersion(name, os_dir):
1146
  """Compute and return the API version of a given OS.
1147

1148
  This function will try to read the API version of the os given by
1149
  the 'name' parameter and residing in the 'os_dir' directory.
1150

1151
  Return value will be either an integer denoting the version or None in the
1152
  case when this is not a valid OS name.
1153

1154
  """
1155
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1156

    
1157
  try:
1158
    st = os.stat(api_file)
1159
  except EnvironmentError, err:
1160
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1161
                           " found (%s)" % _ErrnoOrStr(err))
1162

    
1163
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1164
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1165
                           " a regular file")
1166

    
1167
  try:
1168
    f = open(api_file)
1169
    try:
1170
      api_versions = f.readlines()
1171
    finally:
1172
      f.close()
1173
  except EnvironmentError, err:
1174
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1175
                           " API version (%s)" % _ErrnoOrStr(err))
1176

    
1177
  api_versions = [version.strip() for version in api_versions]
1178
  try:
1179
    api_versions = [int(version) for version in api_versions]
1180
  except (TypeError, ValueError), err:
1181
    raise errors.InvalidOS(name, os_dir,
1182
                           "API version is not integer (%s)" % str(err))
1183

    
1184
  return api_versions
1185

    
1186

    
1187
def DiagnoseOS(top_dirs=None):
1188
  """Compute the validity for all OSes.
1189

1190
  Returns an OS object for each name in all the given top directories
1191
  (if not given defaults to constants.OS_SEARCH_PATH)
1192

1193
  Returns:
1194
    list of OS objects
1195

1196
  """
1197
  if top_dirs is None:
1198
    top_dirs = constants.OS_SEARCH_PATH
1199

    
1200
  result = []
1201
  for dir_name in top_dirs:
1202
    if os.path.isdir(dir_name):
1203
      try:
1204
        f_names = utils.ListVisibleFiles(dir_name)
1205
      except EnvironmentError, err:
1206
        logging.exception("Can't list the OS directory %s", dir_name)
1207
        break
1208
      for name in f_names:
1209
        try:
1210
          os_inst = OSFromDisk(name, base_dir=dir_name)
1211
          result.append(os_inst)
1212
        except errors.InvalidOS, err:
1213
          result.append(objects.OS.FromInvalidOS(err))
1214

    
1215
  return result
1216

    
1217

    
1218
def OSFromDisk(name, base_dir=None):
1219
  """Create an OS instance from disk.
1220

1221
  This function will return an OS instance if the given name is a
1222
  valid OS name. Otherwise, it will raise an appropriate
1223
  `errors.InvalidOS` exception, detailing why this is not a valid
1224
  OS.
1225

1226
  Args:
1227
    os_dir: Directory containing the OS scripts. Defaults to a search
1228
            in all the OS_SEARCH_PATH directories.
1229

1230
  """
1231

    
1232
  if base_dir is None:
1233
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1234
    if os_dir is None:
1235
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1236
  else:
1237
    os_dir = os.path.sep.join([base_dir, name])
1238

    
1239
  api_versions = _OSOndiskVersion(name, os_dir)
1240

    
1241
  if constants.OS_API_VERSION not in api_versions:
1242
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1243
                           " (found %s want %s)"
1244
                           % (api_versions, constants.OS_API_VERSION))
1245

    
1246
  # OS Scripts dictionary, we will populate it with the actual script names
1247
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1248

    
1249
  for script in os_scripts:
1250
    os_scripts[script] = os.path.sep.join([os_dir, script])
1251

    
1252
    try:
1253
      st = os.stat(os_scripts[script])
1254
    except EnvironmentError, err:
1255
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1256
                             (script, _ErrnoOrStr(err)))
1257

    
1258
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1259
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1260
                             script)
1261

    
1262
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1263
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1264
                             script)
1265

    
1266

    
1267
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1268
                    create_script=os_scripts['create'],
1269
                    export_script=os_scripts['export'],
1270
                    import_script=os_scripts['import'],
1271
                    rename_script=os_scripts['rename'],
1272
                    api_versions=api_versions)
1273

    
1274

    
1275
def GrowBlockDevice(disk, amount):
1276
  """Grow a stack of block devices.
1277

1278
  This function is called recursively, with the childrens being the
1279
  first one resize.
1280

1281
  Args:
1282
    disk: the disk to be grown
1283

1284
  Returns: a tuple of (status, result), with:
1285
    status: the result (true/false) of the operation
1286
    result: the error message if the operation failed, otherwise not used
1287

1288
  """
1289
  r_dev = _RecursiveFindBD(disk)
1290
  if r_dev is None:
1291
    return False, "Cannot find block device %s" % (disk,)
1292

    
1293
  try:
1294
    r_dev.Grow(amount)
1295
  except errors.BlockDeviceError, err:
1296
    return False, str(err)
1297

    
1298
  return True, None
1299

    
1300

    
1301
def SnapshotBlockDevice(disk):
1302
  """Create a snapshot copy of a block device.
1303

1304
  This function is called recursively, and the snapshot is actually created
1305
  just for the leaf lvm backend device.
1306

1307
  Args:
1308
    disk: the disk to be snapshotted
1309

1310
  Returns:
1311
    a config entry for the actual lvm device snapshotted.
1312

1313
  """
1314
  if disk.children:
1315
    if len(disk.children) == 1:
1316
      # only one child, let's recurse on it
1317
      return SnapshotBlockDevice(disk.children[0])
1318
    else:
1319
      # more than one child, choose one that matches
1320
      for child in disk.children:
1321
        if child.size == disk.size:
1322
          # return implies breaking the loop
1323
          return SnapshotBlockDevice(child)
1324
  elif disk.dev_type == constants.LD_LV:
1325
    r_dev = _RecursiveFindBD(disk)
1326
    if r_dev is not None:
1327
      # let's stay on the safe side and ask for the full size, for now
1328
      return r_dev.Snapshot(disk.size)
1329
    else:
1330
      return None
1331
  else:
1332
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1333
                                 " '%s' of type '%s'" %
1334
                                 (disk.unique_id, disk.dev_type))
1335

    
1336

    
1337
def ExportSnapshot(disk, dest_node, instance, cluster_name):
1338
  """Export a block device snapshot to a remote node.
1339

1340
  Args:
1341
    disk: the snapshot block device
1342
    dest_node: the node to send the image to
1343
    instance: instance being exported
1344

1345
  Returns:
1346
    True if successful, False otherwise.
1347

1348
  """
1349
  inst_os = OSFromDisk(instance.os)
1350
  export_script = inst_os.export_script
1351

    
1352
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1353
                                     instance.name, int(time.time()))
1354
  if not os.path.exists(constants.LOG_OS_DIR):
1355
    os.mkdir(constants.LOG_OS_DIR, 0750)
1356

    
1357
  real_os_dev = _RecursiveFindBD(disk)
1358
  if real_os_dev is None:
1359
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1360
                                  str(disk))
1361
  real_os_dev.Open()
1362

    
1363
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1364
  destfile = disk.physical_id[1]
1365

    
1366
  # the target command is built out of three individual commands,
1367
  # which are joined by pipes; we check each individual command for
1368
  # valid parameters
1369

    
1370
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1371
                               export_script, instance.name,
1372
                               real_os_dev.dev_path, logfile)
1373

    
1374
  comprcmd = "gzip"
1375

    
1376
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1377
                                destdir, destdir, destfile)
1378
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1379
                                                   constants.GANETI_RUNAS,
1380
                                                   destcmd)
1381

    
1382
  # all commands have been checked, so we're safe to combine them
1383
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1384

    
1385
  result = utils.RunCmd(command)
1386

    
1387
  if result.failed:
1388
    logging.error("os snapshot export command '%s' returned error: %s"
1389
                  " output: %s", command, result.fail_reason, result.output)
1390
    return False
1391

    
1392
  return True
1393

    
1394

    
1395
def FinalizeExport(instance, snap_disks):
1396
  """Write out the export configuration information.
1397

1398
  Args:
1399
    instance: instance configuration
1400
    snap_disks: snapshot block devices
1401

1402
  Returns:
1403
    False in case of error, True otherwise.
1404

1405
  """
1406
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1407
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1408

    
1409
  config = objects.SerializableConfigParser()
1410

    
1411
  config.add_section(constants.INISECT_EXP)
1412
  config.set(constants.INISECT_EXP, 'version', '0')
1413
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1414
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1415
  config.set(constants.INISECT_EXP, 'os', instance.os)
1416
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1417

    
1418
  config.add_section(constants.INISECT_INS)
1419
  config.set(constants.INISECT_INS, 'name', instance.name)
1420
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1421
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1422
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1423

    
1424
  nic_count = 0
1425
  for nic_count, nic in enumerate(instance.nics):
1426
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1427
               nic_count, '%s' % nic.mac)
1428
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1429
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1430
               '%s' % nic.bridge)
1431
  # TODO: redundant: on load can read nics until it doesn't exist
1432
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1433

    
1434
  disk_count = 0
1435
  for disk_count, disk in enumerate(snap_disks):
1436
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1437
               ('%s' % disk.iv_name))
1438
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1439
               ('%s' % disk.physical_id[1]))
1440
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1441
               ('%d' % disk.size))
1442
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1443

    
1444
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1445
  cfo = open(cff, 'w')
1446
  try:
1447
    config.write(cfo)
1448
  finally:
1449
    cfo.close()
1450

    
1451
  shutil.rmtree(finaldestdir, True)
1452
  shutil.move(destdir, finaldestdir)
1453

    
1454
  return True
1455

    
1456

    
1457
def ExportInfo(dest):
1458
  """Get export configuration information.
1459

1460
  Args:
1461
    dest: directory containing the export
1462

1463
  Returns:
1464
    A serializable config file containing the export info.
1465

1466
  """
1467
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1468

    
1469
  config = objects.SerializableConfigParser()
1470
  config.read(cff)
1471

    
1472
  if (not config.has_section(constants.INISECT_EXP) or
1473
      not config.has_section(constants.INISECT_INS)):
1474
    return None
1475

    
1476
  return config
1477

    
1478

    
1479
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1480
                         cluster_name):
1481
  """Import an os image into an instance.
1482

1483
  Args:
1484
    instance: the instance object
1485
    os_disk: the instance-visible name of the os device
1486
    swap_disk: the instance-visible name of the swap device
1487
    src_node: node holding the source image
1488
    src_image: path to the source image on src_node
1489

1490
  Returns:
1491
    False in case of error, True otherwise.
1492

1493
  """
1494
  inst_os = OSFromDisk(instance.os)
1495
  import_script = inst_os.import_script
1496

    
1497
  os_device = instance.FindDisk(os_disk)
1498
  if os_device is None:
1499
    logging.error("Can't find this device-visible name '%s'", os_disk)
1500
    return False
1501

    
1502
  swap_device = instance.FindDisk(swap_disk)
1503
  if swap_device is None:
1504
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1505
    return False
1506

    
1507
  real_os_dev = _RecursiveFindBD(os_device)
1508
  if real_os_dev is None:
1509
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1510
                                  str(os_device))
1511
  real_os_dev.Open()
1512

    
1513
  real_swap_dev = _RecursiveFindBD(swap_device)
1514
  if real_swap_dev is None:
1515
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1516
                                  str(swap_device))
1517
  real_swap_dev.Open()
1518

    
1519
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1520
                                        instance.name, int(time.time()))
1521
  if not os.path.exists(constants.LOG_OS_DIR):
1522
    os.mkdir(constants.LOG_OS_DIR, 0750)
1523

    
1524
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1525
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1526
                                                   constants.GANETI_RUNAS,
1527
                                                   destcmd)
1528

    
1529
  comprcmd = "gunzip"
1530
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1531
                               inst_os.path, import_script, instance.name,
1532
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1533
                               logfile)
1534

    
1535
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1536
  env = {'HYPERVISOR': instance.hypervisor}
1537

    
1538
  result = utils.RunCmd(command, env=env)
1539

    
1540
  if result.failed:
1541
    logging.error("os import command '%s' returned error: %s"
1542
                  " output: %s", command, result.fail_reason, result.output)
1543
    return False
1544

    
1545
  return True
1546

    
1547

    
1548
def ListExports():
1549
  """Return a list of exports currently available on this machine.
1550

1551
  """
1552
  if os.path.isdir(constants.EXPORT_DIR):
1553
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1554
  else:
1555
    return []
1556

    
1557

    
1558
def RemoveExport(export):
1559
  """Remove an existing export from the node.
1560

1561
  Args:
1562
    export: the name of the export to remove
1563

1564
  Returns:
1565
    False in case of error, True otherwise.
1566

1567
  """
1568
  target = os.path.join(constants.EXPORT_DIR, export)
1569

    
1570
  shutil.rmtree(target)
1571
  # TODO: catch some of the relevant exceptions and provide a pretty
1572
  # error message if rmtree fails.
1573

    
1574
  return True
1575

    
1576

    
1577
def RenameBlockDevices(devlist):
1578
  """Rename a list of block devices.
1579

1580
  The devlist argument is a list of tuples (disk, new_logical,
1581
  new_physical). The return value will be a combined boolean result
1582
  (True only if all renames succeeded).
1583

1584
  """
1585
  result = True
1586
  for disk, unique_id in devlist:
1587
    dev = _RecursiveFindBD(disk)
1588
    if dev is None:
1589
      result = False
1590
      continue
1591
    try:
1592
      old_rpath = dev.dev_path
1593
      dev.Rename(unique_id)
1594
      new_rpath = dev.dev_path
1595
      if old_rpath != new_rpath:
1596
        DevCacheManager.RemoveCache(old_rpath)
1597
        # FIXME: we should add the new cache information here, like:
1598
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1599
        # but we don't have the owner here - maybe parse from existing
1600
        # cache? for now, we only lose lvm data when we rename, which
1601
        # is less critical than DRBD or MD
1602
    except errors.BlockDeviceError, err:
1603
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1604
      result = False
1605
  return result
1606

    
1607

    
1608
def _TransformFileStorageDir(file_storage_dir):
1609
  """Checks whether given file_storage_dir is valid.
1610

1611
  Checks wheter the given file_storage_dir is within the cluster-wide
1612
  default file_storage_dir stored in SimpleStore. Only paths under that
1613
  directory are allowed.
1614

1615
  Args:
1616
    file_storage_dir: string with path
1617

1618
  Returns:
1619
    normalized file_storage_dir (string) if valid, None otherwise
1620

1621
  """
1622
  cfg = _GetConfig()
1623
  file_storage_dir = os.path.normpath(file_storage_dir)
1624
  base_file_storage_dir = cfg.GetFileStorageDir()
1625
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1626
      base_file_storage_dir):
1627
    logging.error("file storage directory '%s' is not under base file"
1628
                  " storage directory '%s'",
1629
                  file_storage_dir, base_file_storage_dir)
1630
    return None
1631
  return file_storage_dir
1632

    
1633

    
1634
def CreateFileStorageDir(file_storage_dir):
1635
  """Create file storage directory.
1636

1637
  Args:
1638
    file_storage_dir: string containing the path
1639

1640
  Returns:
1641
    tuple with first element a boolean indicating wheter dir
1642
    creation was successful or not
1643

1644
  """
1645
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1646
  result = True,
1647
  if not file_storage_dir:
1648
    result = False,
1649
  else:
1650
    if os.path.exists(file_storage_dir):
1651
      if not os.path.isdir(file_storage_dir):
1652
        logging.error("'%s' is not a directory", file_storage_dir)
1653
        result = False,
1654
    else:
1655
      try:
1656
        os.makedirs(file_storage_dir, 0750)
1657
      except OSError, err:
1658
        logging.error("Cannot create file storage directory '%s': %s",
1659
                      file_storage_dir, err)
1660
        result = False,
1661
  return result
1662

    
1663

    
1664
def RemoveFileStorageDir(file_storage_dir):
1665
  """Remove file storage directory.
1666

1667
  Remove it only if it's empty. If not log an error and return.
1668

1669
  Args:
1670
    file_storage_dir: string containing the path
1671

1672
  Returns:
1673
    tuple with first element a boolean indicating wheter dir
1674
    removal was successful or not
1675

1676
  """
1677
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1678
  result = True,
1679
  if not file_storage_dir:
1680
    result = False,
1681
  else:
1682
    if os.path.exists(file_storage_dir):
1683
      if not os.path.isdir(file_storage_dir):
1684
        logging.error("'%s' is not a directory", file_storage_dir)
1685
        result = False,
1686
      # deletes dir only if empty, otherwise we want to return False
1687
      try:
1688
        os.rmdir(file_storage_dir)
1689
      except OSError, err:
1690
        logging.exception("Cannot remove file storage directory '%s'",
1691
                          file_storage_dir)
1692
        result = False,
1693
  return result
1694

    
1695

    
1696
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1697
  """Rename the file storage directory.
1698

1699
  Args:
1700
    old_file_storage_dir: string containing the old path
1701
    new_file_storage_dir: string containing the new path
1702

1703
  Returns:
1704
    tuple with first element a boolean indicating wheter dir
1705
    rename was successful or not
1706

1707
  """
1708
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1709
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1710
  result = True,
1711
  if not old_file_storage_dir or not new_file_storage_dir:
1712
    result = False,
1713
  else:
1714
    if not os.path.exists(new_file_storage_dir):
1715
      if os.path.isdir(old_file_storage_dir):
1716
        try:
1717
          os.rename(old_file_storage_dir, new_file_storage_dir)
1718
        except OSError, err:
1719
          logging.exception("Cannot rename '%s' to '%s'",
1720
                            old_file_storage_dir, new_file_storage_dir)
1721
          result =  False,
1722
      else:
1723
        logging.error("'%s' is not a directory", old_file_storage_dir)
1724
        result = False,
1725
    else:
1726
      if os.path.exists(old_file_storage_dir):
1727
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1728
                      old_file_storage_dir, new_file_storage_dir)
1729
        result = False,
1730
  return result
1731

    
1732

    
1733
def _IsJobQueueFile(file_name):
1734
  """Checks whether the given filename is in the queue directory.
1735

1736
  """
1737
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1738
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1739

    
1740
  if not result:
1741
    logging.error("'%s' is not a file in the queue directory",
1742
                  file_name)
1743

    
1744
  return result
1745

    
1746

    
1747
def JobQueueUpdate(file_name, content):
1748
  """Updates a file in the queue directory.
1749

1750
  """
1751
  if not _IsJobQueueFile(file_name):
1752
    return False
1753

    
1754
  # Write and replace the file atomically
1755
  utils.WriteFile(file_name, data=content)
1756

    
1757
  return True
1758

    
1759

    
1760
def JobQueueRename(old, new):
1761
  """Renames a job queue file.
1762

1763
  """
1764
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1765
    return False
1766

    
1767
  os.rename(old, new)
1768

    
1769
  return True
1770

    
1771

    
1772
def CloseBlockDevices(disks):
1773
  """Closes the given block devices.
1774

1775
  This means they will be switched to secondary mode (in case of DRBD).
1776

1777
  """
1778
  bdevs = []
1779
  for cf in disks:
1780
    rd = _RecursiveFindBD(cf)
1781
    if rd is None:
1782
      return (False, "Can't find device %s" % cf)
1783
    bdevs.append(rd)
1784

    
1785
  msg = []
1786
  for rd in bdevs:
1787
    try:
1788
      rd.Close()
1789
    except errors.BlockDeviceError, err:
1790
      msg.append(str(err))
1791
  if msg:
1792
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1793
  else:
1794
    return (True, "All devices secondary")
1795

    
1796

    
1797
class HooksRunner(object):
1798
  """Hook runner.
1799

1800
  This class is instantiated on the node side (ganeti-noded) and not on
1801
  the master side.
1802

1803
  """
1804
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1805

    
1806
  def __init__(self, hooks_base_dir=None):
1807
    """Constructor for hooks runner.
1808

1809
    Args:
1810
      - hooks_base_dir: if not None, this overrides the
1811
        constants.HOOKS_BASE_DIR (useful for unittests)
1812

1813
    """
1814
    if hooks_base_dir is None:
1815
      hooks_base_dir = constants.HOOKS_BASE_DIR
1816
    self._BASE_DIR = hooks_base_dir
1817

    
1818
  @staticmethod
1819
  def ExecHook(script, env):
1820
    """Exec one hook script.
1821

1822
    Args:
1823
     - script: the full path to the script
1824
     - env: the environment with which to exec the script
1825

1826
    """
1827
    # exec the process using subprocess and log the output
1828
    fdstdin = None
1829
    try:
1830
      fdstdin = open("/dev/null", "r")
1831
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1832
                               stderr=subprocess.STDOUT, close_fds=True,
1833
                               shell=False, cwd="/", env=env)
1834
      output = ""
1835
      try:
1836
        output = child.stdout.read(4096)
1837
        child.stdout.close()
1838
      except EnvironmentError, err:
1839
        output += "Hook script error: %s" % str(err)
1840

    
1841
      while True:
1842
        try:
1843
          result = child.wait()
1844
          break
1845
        except EnvironmentError, err:
1846
          if err.errno == errno.EINTR:
1847
            continue
1848
          raise
1849
    finally:
1850
      # try not to leak fds
1851
      for fd in (fdstdin, ):
1852
        if fd is not None:
1853
          try:
1854
            fd.close()
1855
          except EnvironmentError, err:
1856
            # just log the error
1857
            #logging.exception("Error while closing fd %s", fd)
1858
            pass
1859

    
1860
    return result == 0, output
1861

    
1862
  def RunHooks(self, hpath, phase, env):
1863
    """Run the scripts in the hooks directory.
1864

1865
    This method will not be usually overriden by child opcodes.
1866

1867
    """
1868
    if phase == constants.HOOKS_PHASE_PRE:
1869
      suffix = "pre"
1870
    elif phase == constants.HOOKS_PHASE_POST:
1871
      suffix = "post"
1872
    else:
1873
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1874
    rr = []
1875

    
1876
    subdir = "%s-%s.d" % (hpath, suffix)
1877
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1878
    try:
1879
      dir_contents = utils.ListVisibleFiles(dir_name)
1880
    except OSError, err:
1881
      # must log
1882
      return rr
1883

    
1884
    # we use the standard python sort order,
1885
    # so 00name is the recommended naming scheme
1886
    dir_contents.sort()
1887
    for relname in dir_contents:
1888
      fname = os.path.join(dir_name, relname)
1889
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1890
          self.RE_MASK.match(relname) is not None):
1891
        rrval = constants.HKR_SKIP
1892
        output = ""
1893
      else:
1894
        result, output = self.ExecHook(fname, env)
1895
        if not result:
1896
          rrval = constants.HKR_FAIL
1897
        else:
1898
          rrval = constants.HKR_SUCCESS
1899
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1900

    
1901
    return rr
1902

    
1903

    
1904
class IAllocatorRunner(object):
1905
  """IAllocator runner.
1906

1907
  This class is instantiated on the node side (ganeti-noded) and not on
1908
  the master side.
1909

1910
  """
1911
  def Run(self, name, idata):
1912
    """Run an iallocator script.
1913

1914
    Return value: tuple of:
1915
       - run status (one of the IARUN_ constants)
1916
       - stdout
1917
       - stderr
1918
       - fail reason (as from utils.RunResult)
1919

1920
    """
1921
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1922
                                  os.path.isfile)
1923
    if alloc_script is None:
1924
      return (constants.IARUN_NOTFOUND, None, None, None)
1925

    
1926
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1927
    try:
1928
      os.write(fd, idata)
1929
      os.close(fd)
1930
      result = utils.RunCmd([alloc_script, fin_name])
1931
      if result.failed:
1932
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1933
                result.fail_reason)
1934
    finally:
1935
      os.unlink(fin_name)
1936

    
1937
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1938

    
1939

    
1940
class DevCacheManager(object):
1941
  """Simple class for managing a cache of block device information.
1942

1943
  """
1944
  _DEV_PREFIX = "/dev/"
1945
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1946

    
1947
  @classmethod
1948
  def _ConvertPath(cls, dev_path):
1949
    """Converts a /dev/name path to the cache file name.
1950

1951
    This replaces slashes with underscores and strips the /dev
1952
    prefix. It then returns the full path to the cache file
1953

1954
    """
1955
    if dev_path.startswith(cls._DEV_PREFIX):
1956
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1957
    dev_path = dev_path.replace("/", "_")
1958
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1959
    return fpath
1960

    
1961
  @classmethod
1962
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1963
    """Updates the cache information for a given device.
1964

1965
    """
1966
    if dev_path is None:
1967
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1968
      return
1969
    fpath = cls._ConvertPath(dev_path)
1970
    if on_primary:
1971
      state = "primary"
1972
    else:
1973
      state = "secondary"
1974
    if iv_name is None:
1975
      iv_name = "not_visible"
1976
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1977
    try:
1978
      utils.WriteFile(fpath, data=fdata)
1979
    except EnvironmentError, err:
1980
      logging.exception("Can't update bdev cache for %s", dev_path)
1981

    
1982
  @classmethod
1983
  def RemoveCache(cls, dev_path):
1984
    """Remove data for a dev_path.
1985

1986
    """
1987
    if dev_path is None:
1988
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
1989
      return
1990
    fpath = cls._ConvertPath(dev_path)
1991
    try:
1992
      utils.RemoveFile(fpath)
1993
    except EnvironmentError, err:
1994
      logging.exception("Can't update bdev cache for %s", dev_path)