Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ ff38b6c0

History | View | Annotate | Download (59.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetConfig():
48
  return ssconf.SimpleConfigReader()
49

    
50

    
51
def _GetSshRunner(cluster_name):
52
  return ssh.SshRunner(cluster_name)
53

    
54

    
55
def _CleanDirectory(path, exclude=[]):
56
  """Removes all regular files in a directory.
57

58
  @param exclude: List of files to be excluded.
59
  @type exclude: list
60

61
  """
62
  if not os.path.isdir(path):
63
    return
64

    
65
  # Normalize excluded paths
66
  exclude = [os.path.normpath(i) for i in exclude]
67

    
68
  for rel_name in utils.ListVisibleFiles(path):
69
    full_name = os.path.normpath(os.path.join(path, rel_name))
70
    if full_name in exclude:
71
      continue
72
    if os.path.isfile(full_name) and not os.path.islink(full_name):
73
      utils.RemoveFile(full_name)
74

    
75

    
76
def JobQueuePurge():
77
  """Removes job queue files and archived jobs
78

79
  """
80
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82

    
83

    
84
def GetMasterInfo():
85
  """Returns master information.
86

87
  This is an utility function to compute master information, either
88
  for consumption here or from the node daemon.
89

90
  @rtype: tuple
91
  @return: (master_netdev, master_ip, master_name)
92

93
  """
94
  try:
95
    cfg = _GetConfig()
96
    master_netdev = cfg.GetMasterNetdev()
97
    master_ip = cfg.GetMasterIP()
98
    master_node = cfg.GetMasterNode()
99
  except errors.ConfigurationError, err:
100
    logging.exception("Cluster configuration incomplete")
101
    return (None, None)
102
  return (master_netdev, master_ip, master_node)
103

    
104

    
105
def StartMaster(start_daemons):
106
  """Activate local node as master node.
107

108
  The function will always try activate the IP address of the master
109
  (if someone else has it, then it won't). Then, if the start_daemons
110
  parameter is True, it will also start the master daemons
111
  (ganet-masterd and ganeti-rapi).
112

113
  """
114
  ok = True
115
  master_netdev, master_ip, _ = GetMasterInfo()
116
  if not master_netdev:
117
    return False
118

    
119
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120
    if utils.OwnIpAddress(master_ip):
121
      # we already have the ip:
122
      logging.debug("Already started")
123
    else:
124
      logging.error("Someone else has the master ip, not activating")
125
      ok = False
126
  else:
127
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
128
                           "dev", master_netdev, "label",
129
                           "%s:0" % master_netdev])
130
    if result.failed:
131
      logging.error("Can't activate master IP: %s", result.output)
132
      ok = False
133

    
134
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
135
                           "-s", master_ip, master_ip])
136
    # we'll ignore the exit code of arping
137

    
138
  # and now start the master and rapi daemons
139
  if start_daemons:
140
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
141
      result = utils.RunCmd([daemon])
142
      if result.failed:
143
        logging.error("Can't start daemon %s: %s", daemon, result.output)
144
        ok = False
145
  return ok
146

    
147

    
148
def StopMaster(stop_daemons):
149
  """Deactivate this node as master.
150

151
  The function will always try to deactivate the IP address of the
152
  master. Then, if the stop_daemons parameter is True, it will also
153
  stop the master daemons (ganet-masterd and ganeti-rapi).
154

155
  """
156
  master_netdev, master_ip, _ = GetMasterInfo()
157
  if not master_netdev:
158
    return False
159

    
160
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
161
                         "dev", master_netdev])
162
  if result.failed:
163
    logging.error("Can't remove the master IP, error: %s", result.output)
164
    # but otherwise ignore the failure
165

    
166
  if stop_daemons:
167
    # stop/kill the rapi and the master daemon
168
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
169
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
170

    
171
  return True
172

    
173

    
174
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
175
  """Joins this node to the cluster.
176

177
  This does the following:
178
      - updates the hostkeys of the machine (rsa and dsa)
179
      - adds the ssh private key to the user
180
      - adds the ssh public key to the users' authorized_keys file
181

182
  """
183
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
184
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
185
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
186
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
187
  for name, content, mode in sshd_keys:
188
    utils.WriteFile(name, data=content, mode=mode)
189

    
190
  try:
191
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
192
                                                    mkdir=True)
193
  except errors.OpExecError, err:
194
    logging.exception("Error while processing user ssh files")
195
    return False
196

    
197
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
198
    utils.WriteFile(name, data=content, mode=0600)
199

    
200
  utils.AddAuthorizedKey(auth_keys, sshpub)
201

    
202
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
203

    
204
  return True
205

    
206

    
207
def LeaveCluster():
208
  """Cleans up the current node and prepares it to be removed from the cluster.
209

210
  """
211
  _CleanDirectory(constants.DATA_DIR)
212
  JobQueuePurge()
213

    
214
  try:
215
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
216
  except errors.OpExecError:
217
    logging.exception("Error while processing ssh files")
218
    return
219

    
220
  f = open(pub_key, 'r')
221
  try:
222
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
223
  finally:
224
    f.close()
225

    
226
  utils.RemoveFile(priv_key)
227
  utils.RemoveFile(pub_key)
228

    
229
  # Return a reassuring string to the caller, and quit
230
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
231

    
232

    
233
def GetNodeInfo(vgname, hypervisor_type):
234
  """Gives back a hash with different informations about the node.
235

236
  @type vgname: C{string}
237
  @param vgname: the name of the volume group to ask for disk space information
238
  @type hypervisor_type: C{str}
239
  @param hypervisor_type: the name of the hypervisor to ask for
240
      memory information
241
  @rtype: C{dict}
242
  @return: dictionary with the following keys:
243
      - vg_size is the size of the configured volume group in MiB
244
      - vg_free is the free size of the volume group in MiB
245
      - memory_dom0 is the memory allocated for domain0 in MiB
246
      - memory_free is the currently available (free) ram in MiB
247
      - memory_total is the total number of ram in MiB
248

249
  """
250
  outputarray = {}
251
  vginfo = _GetVGInfo(vgname)
252
  outputarray['vg_size'] = vginfo['vg_size']
253
  outputarray['vg_free'] = vginfo['vg_free']
254

    
255
  hyper = hypervisor.GetHypervisor(hypervisor_type)
256
  hyp_info = hyper.GetNodeInfo()
257
  if hyp_info is not None:
258
    outputarray.update(hyp_info)
259

    
260
  f = open("/proc/sys/kernel/random/boot_id", 'r')
261
  try:
262
    outputarray["bootid"] = f.read(128).rstrip("\n")
263
  finally:
264
    f.close()
265

    
266
  return outputarray
267

    
268

    
269
def VerifyNode(what, cluster_name):
270
  """Verify the status of the local node.
271

272
  Based on the input L{what} parameter, various checks are done on the
273
  local node.
274

275
  If the I{filelist} key is present, this list of
276
  files is checksummed and the file/checksum pairs are returned.
277

278
  If the I{nodelist} key is present, we check that we have
279
  connectivity via ssh with the target nodes (and check the hostname
280
  report).
281

282
  If the I{node-net-test} key is present, we check that we have
283
  connectivity to the given nodes via both primary IP and, if
284
  applicable, secondary IPs.
285

286
  @type what: C{dict}
287
  @param what: a dictionary of things to check:
288
      - filelist: list of files for which to compute checksums
289
      - nodelist: list of nodes we should check ssh communication with
290
      - node-net-test: list of nodes we should check node daemon port
291
        connectivity with
292
      - hypervisor: list with hypervisors to run the verify for
293

294
  """
295
  result = {}
296

    
297
  if 'hypervisor' in what:
298
    result['hypervisor'] = my_dict = {}
299
    for hv_name in what['hypervisor']:
300
      my_dict[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
301

    
302
  if 'filelist' in what:
303
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
304

    
305
  if 'nodelist' in what:
306
    result['nodelist'] = {}
307
    random.shuffle(what['nodelist'])
308
    for node in what['nodelist']:
309
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
310
      if not success:
311
        result['nodelist'][node] = message
312
  if 'node-net-test' in what:
313
    result['node-net-test'] = {}
314
    my_name = utils.HostInfo().name
315
    my_pip = my_sip = None
316
    for name, pip, sip in what['node-net-test']:
317
      if name == my_name:
318
        my_pip = pip
319
        my_sip = sip
320
        break
321
    if not my_pip:
322
      result['node-net-test'][my_name] = ("Can't find my own"
323
                                          " primary/secondary IP"
324
                                          " in the node list")
325
    else:
326
      port = utils.GetNodeDaemonPort()
327
      for name, pip, sip in what['node-net-test']:
328
        fail = []
329
        if not utils.TcpPing(pip, port, source=my_pip):
330
          fail.append("primary")
331
        if sip != pip:
332
          if not utils.TcpPing(sip, port, source=my_sip):
333
            fail.append("secondary")
334
        if fail:
335
          result['node-net-test'][name] = ("failure using the %s"
336
                                           " interface(s)" %
337
                                           " and ".join(fail))
338

    
339
  return result
340

    
341

    
342
def GetVolumeList(vg_name):
343
  """Compute list of logical volumes and their size.
344

345
  Returns:
346
    dictionary of all partions (key) with their size (in MiB), inactive
347
    and online status:
348
    {'test1': ('20.06', True, True)}
349

350
  """
351
  lvs = {}
352
  sep = '|'
353
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
354
                         "--separator=%s" % sep,
355
                         "-olv_name,lv_size,lv_attr", vg_name])
356
  if result.failed:
357
    logging.error("Failed to list logical volumes, lvs output: %s",
358
                  result.output)
359
    return result.output
360

    
361
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
362
  for line in result.stdout.splitlines():
363
    line = line.strip()
364
    match = valid_line_re.match(line)
365
    if not match:
366
      logging.error("Invalid line returned from lvs output: '%s'", line)
367
      continue
368
    name, size, attr = match.groups()
369
    inactive = attr[4] == '-'
370
    online = attr[5] == 'o'
371
    lvs[name] = (size, inactive, online)
372

    
373
  return lvs
374

    
375

    
376
def ListVolumeGroups():
377
  """List the volume groups and their size.
378

379
  Returns:
380
    Dictionary with keys volume name and values the size of the volume
381

382
  """
383
  return utils.ListVolumeGroups()
384

    
385

    
386
def NodeVolumes():
387
  """List all volumes on this node.
388

389
  """
390
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
391
                         "--separator=|",
392
                         "--options=lv_name,lv_size,devices,vg_name"])
393
  if result.failed:
394
    logging.error("Failed to list logical volumes, lvs output: %s",
395
                  result.output)
396
    return {}
397

    
398
  def parse_dev(dev):
399
    if '(' in dev:
400
      return dev.split('(')[0]
401
    else:
402
      return dev
403

    
404
  def map_line(line):
405
    return {
406
      'name': line[0].strip(),
407
      'size': line[1].strip(),
408
      'dev': parse_dev(line[2].strip()),
409
      'vg': line[3].strip(),
410
    }
411

    
412
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
413
          if line.count('|') >= 3]
414

    
415

    
416
def BridgesExist(bridges_list):
417
  """Check if a list of bridges exist on the current node.
418

419
  Returns:
420
    True if all of them exist, false otherwise
421

422
  """
423
  for bridge in bridges_list:
424
    if not utils.BridgeExists(bridge):
425
      return False
426

    
427
  return True
428

    
429

    
430
def GetInstanceList(hypervisor_list):
431
  """Provides a list of instances.
432

433
  @type hypervisor_list: list
434
  @param hypervisor_list: the list of hypervisors to query information
435

436
  @rtype: list
437
  @return: a list of all running instances on the current node
438
             - instance1.example.com
439
             - instance2.example.com
440

441
  """
442
  results = []
443
  for hname in hypervisor_list:
444
    try:
445
      names = hypervisor.GetHypervisor(hname).ListInstances()
446
      results.extend(names)
447
    except errors.HypervisorError, err:
448
      logging.exception("Error enumerating instances for hypevisor %s", hname)
449
      # FIXME: should we somehow not propagate this to the master?
450
      raise
451

    
452
  return results
453

    
454

    
455
def GetInstanceInfo(instance, hname):
456
  """Gives back the informations about an instance as a dictionary.
457

458
  @type instance: string
459
  @param instance: the instance name
460
  @type hname: string
461
  @param hname: the hypervisor type of the instance
462

463
  @rtype: dict
464
  @return: dictionary with the following keys:
465
      - memory: memory size of instance (int)
466
      - state: xen state of instance (string)
467
      - time: cpu time of instance (float)
468

469
  """
470
  output = {}
471

    
472
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
473
  if iinfo is not None:
474
    output['memory'] = iinfo[2]
475
    output['state'] = iinfo[4]
476
    output['time'] = iinfo[5]
477

    
478
  return output
479

    
480

    
481
def GetAllInstancesInfo(hypervisor_list):
482
  """Gather data about all instances.
483

484
  This is the equivalent of `GetInstanceInfo()`, except that it
485
  computes data for all instances at once, thus being faster if one
486
  needs data about more than one instance.
487

488
  @type hypervisor_list: list
489
  @param hypervisor_list: list of hypervisors to query for instance data
490

491
  @rtype: dict of dicts
492
  @return: dictionary of instance: data, with data having the following keys:
493
      - memory: memory size of instance (int)
494
      - state: xen state of instance (string)
495
      - time: cpu time of instance (float)
496
      - vcpuus: the number of vcpus
497

498
  """
499
  output = {}
500

    
501
  for hname in hypervisor_list:
502
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
503
    if iinfo:
504
      for name, inst_id, memory, vcpus, state, times in iinfo:
505
        value = {
506
          'memory': memory,
507
          'vcpus': vcpus,
508
          'state': state,
509
          'time': times,
510
          }
511
        if name in output and output[name] != value:
512
          raise errors.HypervisorError("Instance %s running duplicate"
513
                                       " with different parameters" % name)
514
        output[name] = value
515

    
516
  return output
517

    
518

    
519
def AddOSToInstance(instance, os_disk, swap_disk):
520
  """Add an OS to an instance.
521

522
  Args:
523
    instance: the instance object
524
    os_disk: the instance-visible name of the os device
525
    swap_disk: the instance-visible name of the swap device
526

527
  """
528
  inst_os = OSFromDisk(instance.os)
529

    
530
  create_script = inst_os.create_script
531

    
532
  os_device = instance.FindDisk(os_disk)
533
  if os_device is None:
534
    logging.error("Can't find this device-visible name '%s'", os_disk)
535
    return False
536

    
537
  swap_device = instance.FindDisk(swap_disk)
538
  if swap_device is None:
539
    logging.error("Can't find this device-visible name '%s'", swap_disk)
540
    return False
541

    
542
  real_os_dev = _RecursiveFindBD(os_device)
543
  if real_os_dev is None:
544
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
545
                                  str(os_device))
546
  real_os_dev.Open()
547

    
548
  real_swap_dev = _RecursiveFindBD(swap_device)
549
  if real_swap_dev is None:
550
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
551
                                  str(swap_device))
552
  real_swap_dev.Open()
553

    
554
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
555
                                     instance.name, int(time.time()))
556
  if not os.path.exists(constants.LOG_OS_DIR):
557
    os.mkdir(constants.LOG_OS_DIR, 0750)
558

    
559
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
560
                                inst_os.path, create_script, instance.name,
561
                                real_os_dev.dev_path, real_swap_dev.dev_path,
562
                                logfile)
563
  env = {'HYPERVISOR': instance.hypervisor}
564

    
565
  result = utils.RunCmd(command, env=env)
566
  if result.failed:
567
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
568
                  " output: %s", command, result.fail_reason, logfile,
569
                  result.output)
570
    return False
571

    
572
  return True
573

    
574

    
575
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
576
  """Run the OS rename script for an instance.
577

578
  Args:
579
    instance: the instance object
580
    old_name: the old name of the instance
581
    os_disk: the instance-visible name of the os device
582
    swap_disk: the instance-visible name of the swap device
583

584
  """
585
  inst_os = OSFromDisk(instance.os)
586

    
587
  script = inst_os.rename_script
588
  rename_env = OSEnvironment(instance)
589
  rename_env['OLD_INSTANCE_NAME'] = old_name
590

    
591
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
592
                                           old_name,
593
                                           instance.name, int(time.time()))
594
  if not os.path.exists(constants.LOG_OS_DIR):
595
    os.mkdir(constants.LOG_OS_DIR, 0750)
596

    
597
  command = utils.BuildShellCmd("cd %s && %s &>%s",
598
                                inst_os.path, script, logfile)
599

    
600
  result = utils.RunCmd(command, env=rename_env)
601

    
602
  if result.failed:
603
    logging.error("os create command '%s' returned error: %s output: %s",
604
                  command, result.fail_reason, result.output)
605
    return False
606

    
607
  return True
608

    
609

    
610
def _GetVGInfo(vg_name):
611
  """Get informations about the volume group.
612

613
  Args:
614
    vg_name: the volume group
615

616
  Returns:
617
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
618
    where
619
    vg_size is the total size of the volume group in MiB
620
    vg_free is the free size of the volume group in MiB
621
    pv_count are the number of physical disks in that vg
622

623
  If an error occurs during gathering of data, we return the same dict
624
  with keys all set to None.
625

626
  """
627
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
628

    
629
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
630
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
631

    
632
  if retval.failed:
633
    logging.error("volume group %s not present", vg_name)
634
    return retdic
635
  valarr = retval.stdout.strip().rstrip(':').split(':')
636
  if len(valarr) == 3:
637
    try:
638
      retdic = {
639
        "vg_size": int(round(float(valarr[0]), 0)),
640
        "vg_free": int(round(float(valarr[1]), 0)),
641
        "pv_count": int(valarr[2]),
642
        }
643
    except ValueError, err:
644
      logging.exception("Fail to parse vgs output")
645
  else:
646
    logging.error("vgs output has the wrong number of fields (expected"
647
                  " three): %s", str(valarr))
648
  return retdic
649

    
650

    
651
def _GatherBlockDevs(instance):
652
  """Set up an instance's block device(s).
653

654
  This is run on the primary node at instance startup. The block
655
  devices must be already assembled.
656

657
  """
658
  block_devices = []
659
  for disk in instance.disks:
660
    device = _RecursiveFindBD(disk)
661
    if device is None:
662
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
663
                                    str(disk))
664
    device.Open()
665
    block_devices.append((disk, device))
666
  return block_devices
667

    
668

    
669
def StartInstance(instance, extra_args):
670
  """Start an instance.
671

672
  @type instance: instance object
673
  @param instance: the instance object
674
  @rtype: boolean
675
  @return: whether the startup was successful or not
676

677
  """
678
  running_instances = GetInstanceList([instance.hypervisor])
679

    
680
  if instance.name in running_instances:
681
    return True
682

    
683
  block_devices = _GatherBlockDevs(instance)
684
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
685

    
686
  try:
687
    hyper.StartInstance(instance, block_devices, extra_args)
688
  except errors.HypervisorError, err:
689
    logging.exception("Failed to start instance")
690
    return False
691

    
692
  return True
693

    
694

    
695
def ShutdownInstance(instance):
696
  """Shut an instance down.
697

698
  @type instance: instance object
699
  @param instance: the instance object
700
  @rtype: boolean
701
  @return: whether the startup was successful or not
702

703
  """
704
  hv_name = instance.hypervisor
705
  running_instances = GetInstanceList([hv_name])
706

    
707
  if instance.name not in running_instances:
708
    return True
709

    
710
  hyper = hypervisor.GetHypervisor(hv_name)
711
  try:
712
    hyper.StopInstance(instance)
713
  except errors.HypervisorError, err:
714
    logging.error("Failed to stop instance")
715
    return False
716

    
717
  # test every 10secs for 2min
718
  shutdown_ok = False
719

    
720
  time.sleep(1)
721
  for dummy in range(11):
722
    if instance.name not in GetInstanceList([hv_name]):
723
      break
724
    time.sleep(10)
725
  else:
726
    # the shutdown did not succeed
727
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
728

    
729
    try:
730
      hyper.StopInstance(instance, force=True)
731
    except errors.HypervisorError, err:
732
      logging.exception("Failed to stop instance")
733
      return False
734

    
735
    time.sleep(1)
736
    if instance.name in GetInstanceList([hv_name]):
737
      logging.error("could not shutdown instance '%s' even by destroy",
738
                    instance.name)
739
      return False
740

    
741
  return True
742

    
743

    
744
def RebootInstance(instance, reboot_type, extra_args):
745
  """Reboot an instance.
746

747
  Args:
748
    instance    - name of instance to reboot
749
    reboot_type - how to reboot [soft,hard,full]
750

751
  """
752
  running_instances = GetInstanceList([instance.hypervisor])
753

    
754
  if instance.name not in running_instances:
755
    logging.error("Cannot reboot instance that is not running")
756
    return False
757

    
758
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
759
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
760
    try:
761
      hyper.RebootInstance(instance)
762
    except errors.HypervisorError, err:
763
      logging.exception("Failed to soft reboot instance")
764
      return False
765
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
766
    try:
767
      ShutdownInstance(instance)
768
      StartInstance(instance, extra_args)
769
    except errors.HypervisorError, err:
770
      logging.exception("Failed to hard reboot instance")
771
      return False
772
  else:
773
    raise errors.ParameterError("reboot_type invalid")
774

    
775
  return True
776

    
777

    
778
def MigrateInstance(instance, target, live):
779
  """Migrates an instance to another node.
780

781
  @type instance: C{objects.Instance}
782
  @param instance: the instance definition
783
  @type target: string
784
  @param target: the target node name
785
  @type live: boolean
786
  @param live: whether the migration should be done live or not (the
787
      interpretation of this parameter is left to the hypervisor)
788
  @rtype: tuple
789
  @return: a tuple of (success, msg) where:
790
      - succes is a boolean denoting the success/failure of the operation
791
      - msg is a string with details in case of failure
792

793
  """
794
  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
795

    
796
  try:
797
    hyper.MigrateInstance(instance.name, target, live)
798
  except errors.HypervisorError, err:
799
    msg = "Failed to migrate instance: %s" % str(err)
800
    logging.error(msg)
801
    return (False, msg)
802
  return (True, "Migration successfull")
803

    
804

    
805
def CreateBlockDevice(disk, size, owner, on_primary, info):
806
  """Creates a block device for an instance.
807

808
  Args:
809
   disk: a ganeti.objects.Disk object
810
   size: the size of the physical underlying device
811
   owner: a string with the name of the instance
812
   on_primary: a boolean indicating if it is the primary node or not
813
   info: string that will be sent to the physical device creation
814

815
  Returns:
816
    the new unique_id of the device (this can sometime be
817
    computed only after creation), or None. On secondary nodes,
818
    it's not required to return anything.
819

820
  """
821
  clist = []
822
  if disk.children:
823
    for child in disk.children:
824
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
825
      if on_primary or disk.AssembleOnSecondary():
826
        # we need the children open in case the device itself has to
827
        # be assembled
828
        crdev.Open()
829
      clist.append(crdev)
830
  try:
831
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
832
    if device is not None:
833
      logging.info("removing existing device %s", disk)
834
      device.Remove()
835
  except errors.BlockDeviceError, err:
836
    pass
837

    
838
  device = bdev.Create(disk.dev_type, disk.physical_id,
839
                       clist, size)
840
  if device is None:
841
    raise ValueError("Can't create child device for %s, %s" %
842
                     (disk, size))
843
  if on_primary or disk.AssembleOnSecondary():
844
    if not device.Assemble():
845
      errorstring = "Can't assemble device after creation"
846
      logging.error(errorstring)
847
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
848
                                    " daemon logs" % errorstring)
849
    device.SetSyncSpeed(constants.SYNC_SPEED)
850
    if on_primary or disk.OpenOnSecondary():
851
      device.Open(force=True)
852
    DevCacheManager.UpdateCache(device.dev_path, owner,
853
                                on_primary, disk.iv_name)
854

    
855
  device.SetInfo(info)
856

    
857
  physical_id = device.unique_id
858
  return physical_id
859

    
860

    
861
def RemoveBlockDevice(disk):
862
  """Remove a block device.
863

864
  This is intended to be called recursively.
865

866
  """
867
  try:
868
    # since we are removing the device, allow a partial match
869
    # this allows removal of broken mirrors
870
    rdev = _RecursiveFindBD(disk, allow_partial=True)
871
  except errors.BlockDeviceError, err:
872
    # probably can't attach
873
    logging.info("Can't attach to device %s in remove", disk)
874
    rdev = None
875
  if rdev is not None:
876
    r_path = rdev.dev_path
877
    result = rdev.Remove()
878
    if result:
879
      DevCacheManager.RemoveCache(r_path)
880
  else:
881
    result = True
882
  if disk.children:
883
    for child in disk.children:
884
      result = result and RemoveBlockDevice(child)
885
  return result
886

    
887

    
888
def _RecursiveAssembleBD(disk, owner, as_primary):
889
  """Activate a block device for an instance.
890

891
  This is run on the primary and secondary nodes for an instance.
892

893
  This function is called recursively.
894

895
  Args:
896
    disk: a objects.Disk object
897
    as_primary: if we should make the block device read/write
898

899
  Returns:
900
    the assembled device or None (in case no device was assembled)
901

902
  If the assembly is not successful, an exception is raised.
903

904
  """
905
  children = []
906
  if disk.children:
907
    mcn = disk.ChildrenNeeded()
908
    if mcn == -1:
909
      mcn = 0 # max number of Nones allowed
910
    else:
911
      mcn = len(disk.children) - mcn # max number of Nones
912
    for chld_disk in disk.children:
913
      try:
914
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
915
      except errors.BlockDeviceError, err:
916
        if children.count(None) >= mcn:
917
          raise
918
        cdev = None
919
        logging.debug("Error in child activation: %s", str(err))
920
      children.append(cdev)
921

    
922
  if as_primary or disk.AssembleOnSecondary():
923
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
924
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
925
    result = r_dev
926
    if as_primary or disk.OpenOnSecondary():
927
      r_dev.Open()
928
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
929
                                as_primary, disk.iv_name)
930

    
931
  else:
932
    result = True
933
  return result
934

    
935

    
936
def AssembleBlockDevice(disk, owner, as_primary):
937
  """Activate a block device for an instance.
938

939
  This is a wrapper over _RecursiveAssembleBD.
940

941
  Returns:
942
    a /dev path for primary nodes
943
    True for secondary nodes
944

945
  """
946
  result = _RecursiveAssembleBD(disk, owner, as_primary)
947
  if isinstance(result, bdev.BlockDev):
948
    result = result.dev_path
949
  return result
950

    
951

    
952
def ShutdownBlockDevice(disk):
953
  """Shut down a block device.
954

955
  First, if the device is assembled (can `Attach()`), then the device
956
  is shutdown. Then the children of the device are shutdown.
957

958
  This function is called recursively. Note that we don't cache the
959
  children or such, as oppossed to assemble, shutdown of different
960
  devices doesn't require that the upper device was active.
961

962
  """
963
  r_dev = _RecursiveFindBD(disk)
964
  if r_dev is not None:
965
    r_path = r_dev.dev_path
966
    result = r_dev.Shutdown()
967
    if result:
968
      DevCacheManager.RemoveCache(r_path)
969
  else:
970
    result = True
971
  if disk.children:
972
    for child in disk.children:
973
      result = result and ShutdownBlockDevice(child)
974
  return result
975

    
976

    
977
def MirrorAddChildren(parent_cdev, new_cdevs):
978
  """Extend a mirrored block device.
979

980
  """
981
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
982
  if parent_bdev is None:
983
    logging.error("Can't find parent device")
984
    return False
985
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
986
  if new_bdevs.count(None) > 0:
987
    logging.error("Can't find new device(s) to add: %s:%s",
988
                  new_bdevs, new_cdevs)
989
    return False
990
  parent_bdev.AddChildren(new_bdevs)
991
  return True
992

    
993

    
994
def MirrorRemoveChildren(parent_cdev, new_cdevs):
995
  """Shrink a mirrored block device.
996

997
  """
998
  parent_bdev = _RecursiveFindBD(parent_cdev)
999
  if parent_bdev is None:
1000
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1001
    return False
1002
  devs = []
1003
  for disk in new_cdevs:
1004
    rpath = disk.StaticDevPath()
1005
    if rpath is None:
1006
      bd = _RecursiveFindBD(disk)
1007
      if bd is None:
1008
        logging.error("Can't find dynamic device %s while removing children",
1009
                      disk)
1010
        return False
1011
      else:
1012
        devs.append(bd.dev_path)
1013
    else:
1014
      devs.append(rpath)
1015
  parent_bdev.RemoveChildren(devs)
1016
  return True
1017

    
1018

    
1019
def GetMirrorStatus(disks):
1020
  """Get the mirroring status of a list of devices.
1021

1022
  Args:
1023
    disks: list of `objects.Disk`
1024

1025
  Returns:
1026
    list of (mirror_done, estimated_time) tuples, which
1027
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1028

1029
  """
1030
  stats = []
1031
  for dsk in disks:
1032
    rbd = _RecursiveFindBD(dsk)
1033
    if rbd is None:
1034
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1035
    stats.append(rbd.CombinedSyncStatus())
1036
  return stats
1037

    
1038

    
1039
def _RecursiveFindBD(disk, allow_partial=False):
1040
  """Check if a device is activated.
1041

1042
  If so, return informations about the real device.
1043

1044
  Args:
1045
    disk: the objects.Disk instance
1046
    allow_partial: don't abort the find if a child of the
1047
                   device can't be found; this is intended to be
1048
                   used when repairing mirrors
1049

1050
  Returns:
1051
    None if the device can't be found
1052
    otherwise the device instance
1053

1054
  """
1055
  children = []
1056
  if disk.children:
1057
    for chdisk in disk.children:
1058
      children.append(_RecursiveFindBD(chdisk))
1059

    
1060
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1061

    
1062

    
1063
def FindBlockDevice(disk):
1064
  """Check if a device is activated.
1065

1066
  If so, return informations about the real device.
1067

1068
  Args:
1069
    disk: the objects.Disk instance
1070
  Returns:
1071
    None if the device can't be found
1072
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1073

1074
  """
1075
  rbd = _RecursiveFindBD(disk)
1076
  if rbd is None:
1077
    return rbd
1078
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1079

    
1080

    
1081
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1082
  """Write a file to the filesystem.
1083

1084
  This allows the master to overwrite(!) a file. It will only perform
1085
  the operation if the file belongs to a list of configuration files.
1086

1087
  """
1088
  if not os.path.isabs(file_name):
1089
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1090
                  file_name)
1091
    return False
1092

    
1093
  allowed_files = [
1094
    constants.CLUSTER_CONF_FILE,
1095
    constants.ETC_HOSTS,
1096
    constants.SSH_KNOWN_HOSTS_FILE,
1097
    constants.VNC_PASSWORD_FILE,
1098
    ]
1099

    
1100
  if file_name not in allowed_files:
1101
    logging.error("Filename passed to UploadFile not in allowed"
1102
                 " upload targets: '%s'", file_name)
1103
    return False
1104

    
1105
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1106
                  atime=atime, mtime=mtime)
1107
  return True
1108

    
1109

    
1110
def _ErrnoOrStr(err):
1111
  """Format an EnvironmentError exception.
1112

1113
  If the `err` argument has an errno attribute, it will be looked up
1114
  and converted into a textual EXXXX description. Otherwise the string
1115
  representation of the error will be returned.
1116

1117
  """
1118
  if hasattr(err, 'errno'):
1119
    detail = errno.errorcode[err.errno]
1120
  else:
1121
    detail = str(err)
1122
  return detail
1123

    
1124

    
1125
def _OSOndiskVersion(name, os_dir):
1126
  """Compute and return the API version of a given OS.
1127

1128
  This function will try to read the API version of the os given by
1129
  the 'name' parameter and residing in the 'os_dir' directory.
1130

1131
  Return value will be either an integer denoting the version or None in the
1132
  case when this is not a valid OS name.
1133

1134
  """
1135
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1136

    
1137
  try:
1138
    st = os.stat(api_file)
1139
  except EnvironmentError, err:
1140
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1141
                           " found (%s)" % _ErrnoOrStr(err))
1142

    
1143
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1144
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1145
                           " a regular file")
1146

    
1147
  try:
1148
    f = open(api_file)
1149
    try:
1150
      api_versions = f.readlines()
1151
    finally:
1152
      f.close()
1153
  except EnvironmentError, err:
1154
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1155
                           " API version (%s)" % _ErrnoOrStr(err))
1156

    
1157
  api_versions = [version.strip() for version in api_versions]
1158
  try:
1159
    api_versions = [int(version) for version in api_versions]
1160
  except (TypeError, ValueError), err:
1161
    raise errors.InvalidOS(name, os_dir,
1162
                           "API version is not integer (%s)" % str(err))
1163

    
1164
  return api_versions
1165

    
1166

    
1167
def DiagnoseOS(top_dirs=None):
1168
  """Compute the validity for all OSes.
1169

1170
  Returns an OS object for each name in all the given top directories
1171
  (if not given defaults to constants.OS_SEARCH_PATH)
1172

1173
  Returns:
1174
    list of OS objects
1175

1176
  """
1177
  if top_dirs is None:
1178
    top_dirs = constants.OS_SEARCH_PATH
1179

    
1180
  result = []
1181
  for dir_name in top_dirs:
1182
    if os.path.isdir(dir_name):
1183
      try:
1184
        f_names = utils.ListVisibleFiles(dir_name)
1185
      except EnvironmentError, err:
1186
        logging.exception("Can't list the OS directory %s", dir_name)
1187
        break
1188
      for name in f_names:
1189
        try:
1190
          os_inst = OSFromDisk(name, base_dir=dir_name)
1191
          result.append(os_inst)
1192
        except errors.InvalidOS, err:
1193
          result.append(objects.OS.FromInvalidOS(err))
1194

    
1195
  return result
1196

    
1197

    
1198
def OSFromDisk(name, base_dir=None):
1199
  """Create an OS instance from disk.
1200

1201
  This function will return an OS instance if the given name is a
1202
  valid OS name. Otherwise, it will raise an appropriate
1203
  `errors.InvalidOS` exception, detailing why this is not a valid
1204
  OS.
1205

1206
  @type base_dir: string
1207
  @keyword base_dir: Base directory containing OS installations.
1208
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1209

1210
  """
1211

    
1212
  if base_dir is None:
1213
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1214
    if os_dir is None:
1215
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1216
  else:
1217
    os_dir = os.path.sep.join([base_dir, name])
1218

    
1219
  api_versions = _OSOndiskVersion(name, os_dir)
1220

    
1221
  if constants.OS_API_VERSION not in api_versions:
1222
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1223
                           " (found %s want %s)"
1224
                           % (api_versions, constants.OS_API_VERSION))
1225

    
1226
  # OS Scripts dictionary, we will populate it with the actual script names
1227
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1228

    
1229
  for script in os_scripts:
1230
    os_scripts[script] = os.path.sep.join([os_dir, script])
1231

    
1232
    try:
1233
      st = os.stat(os_scripts[script])
1234
    except EnvironmentError, err:
1235
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1236
                             (script, _ErrnoOrStr(err)))
1237

    
1238
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1239
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1240
                             script)
1241

    
1242
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1243
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1244
                             script)
1245

    
1246

    
1247
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1248
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1249
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1250
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1251
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1252
                    api_versions=api_versions)
1253

    
1254
def OSEnvironment(instance, debug=0):
1255
  """Calculate the environment for an os script.
1256

1257
  @type instance: instance object
1258
  @param instance: target instance for the os script run
1259
  @type debug: integer
1260
  @param debug: debug level (0 or 1, for os api 10)
1261
  @rtype: dict
1262
  @return: dict of environment variables
1263

1264
  """
1265
  result = {}
1266
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1267
  result['INSTANCE_NAME'] = instance.name
1268
  result['HYPERVISOR'] = instance.hypervisor
1269
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1270
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1271
  result['DEBUG_LEVEL'] = '%d' % debug
1272
  for idx, disk in enumerate(instance.disks):
1273
    real_disk = _RecursiveFindBD(disk)
1274
    if real_disk is None:
1275
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1276
                                    str(disk))
1277
    real_disk.Open()
1278
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1279
    # FIXME: When disks will have read-only mode, populate this
1280
    result['DISK_%d_ACCESS' % idx] = 'W'
1281
    if constants.HV_DISK_TYPE in instance.hvparams:
1282
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1283
        instance.hvparams[constants.HV_DISK_TYPE]
1284
    if disk.dev_type in constants.LDS_BLOCK:
1285
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1286
    elif disk.dev_type == constants.LD_FILE:
1287
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1288
        'file:%s' % disk.physical_id[0]
1289
  for idx, nic in enumerate(instance.nics):
1290
    result['NIC_%d_MAC' % idx] = nic.mac
1291
    if nic.ip:
1292
      result['NIC_%d_IP' % idx] = nic.ip
1293
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1294
    if constants.HV_NIC_TYPE in instance.hvparams:
1295
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1296
        instance.hvparams[constants.HV_NIC_TYPE]
1297

    
1298
  return result
1299

    
1300
def GrowBlockDevice(disk, amount):
1301
  """Grow a stack of block devices.
1302

1303
  This function is called recursively, with the childrens being the
1304
  first one resize.
1305

1306
  Args:
1307
    disk: the disk to be grown
1308

1309
  Returns: a tuple of (status, result), with:
1310
    status: the result (true/false) of the operation
1311
    result: the error message if the operation failed, otherwise not used
1312

1313
  """
1314
  r_dev = _RecursiveFindBD(disk)
1315
  if r_dev is None:
1316
    return False, "Cannot find block device %s" % (disk,)
1317

    
1318
  try:
1319
    r_dev.Grow(amount)
1320
  except errors.BlockDeviceError, err:
1321
    return False, str(err)
1322

    
1323
  return True, None
1324

    
1325

    
1326
def SnapshotBlockDevice(disk):
1327
  """Create a snapshot copy of a block device.
1328

1329
  This function is called recursively, and the snapshot is actually created
1330
  just for the leaf lvm backend device.
1331

1332
  Args:
1333
    disk: the disk to be snapshotted
1334

1335
  Returns:
1336
    a config entry for the actual lvm device snapshotted.
1337

1338
  """
1339
  if disk.children:
1340
    if len(disk.children) == 1:
1341
      # only one child, let's recurse on it
1342
      return SnapshotBlockDevice(disk.children[0])
1343
    else:
1344
      # more than one child, choose one that matches
1345
      for child in disk.children:
1346
        if child.size == disk.size:
1347
          # return implies breaking the loop
1348
          return SnapshotBlockDevice(child)
1349
  elif disk.dev_type == constants.LD_LV:
1350
    r_dev = _RecursiveFindBD(disk)
1351
    if r_dev is not None:
1352
      # let's stay on the safe side and ask for the full size, for now
1353
      return r_dev.Snapshot(disk.size)
1354
    else:
1355
      return None
1356
  else:
1357
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1358
                                 " '%s' of type '%s'" %
1359
                                 (disk.unique_id, disk.dev_type))
1360

    
1361

    
1362
def ExportSnapshot(disk, dest_node, instance, cluster_name):
1363
  """Export a block device snapshot to a remote node.
1364

1365
  Args:
1366
    disk: the snapshot block device
1367
    dest_node: the node to send the image to
1368
    instance: instance being exported
1369

1370
  Returns:
1371
    True if successful, False otherwise.
1372

1373
  """
1374
  inst_os = OSFromDisk(instance.os)
1375
  export_script = inst_os.export_script
1376

    
1377
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1378
                                     instance.name, int(time.time()))
1379
  if not os.path.exists(constants.LOG_OS_DIR):
1380
    os.mkdir(constants.LOG_OS_DIR, 0750)
1381

    
1382
  real_os_dev = _RecursiveFindBD(disk)
1383
  if real_os_dev is None:
1384
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1385
                                  str(disk))
1386
  real_os_dev.Open()
1387

    
1388
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1389
  destfile = disk.physical_id[1]
1390

    
1391
  # the target command is built out of three individual commands,
1392
  # which are joined by pipes; we check each individual command for
1393
  # valid parameters
1394

    
1395
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1396
                               export_script, instance.name,
1397
                               real_os_dev.dev_path, logfile)
1398

    
1399
  comprcmd = "gzip"
1400

    
1401
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1402
                                destdir, destdir, destfile)
1403
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1404
                                                   constants.GANETI_RUNAS,
1405
                                                   destcmd)
1406

    
1407
  # all commands have been checked, so we're safe to combine them
1408
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1409

    
1410
  result = utils.RunCmd(command)
1411

    
1412
  if result.failed:
1413
    logging.error("os snapshot export command '%s' returned error: %s"
1414
                  " output: %s", command, result.fail_reason, result.output)
1415
    return False
1416

    
1417
  return True
1418

    
1419

    
1420
def FinalizeExport(instance, snap_disks):
1421
  """Write out the export configuration information.
1422

1423
  Args:
1424
    instance: instance configuration
1425
    snap_disks: snapshot block devices
1426

1427
  Returns:
1428
    False in case of error, True otherwise.
1429

1430
  """
1431
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1432
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1433

    
1434
  config = objects.SerializableConfigParser()
1435

    
1436
  config.add_section(constants.INISECT_EXP)
1437
  config.set(constants.INISECT_EXP, 'version', '0')
1438
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1439
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1440
  config.set(constants.INISECT_EXP, 'os', instance.os)
1441
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1442

    
1443
  config.add_section(constants.INISECT_INS)
1444
  config.set(constants.INISECT_INS, 'name', instance.name)
1445
  config.set(constants.INISECT_INS, 'memory', '%d' %
1446
             instance.beparams[constants.BE_MEMORY])
1447
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1448
             instance.beparams[constants.BE_VCPUS])
1449
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1450

    
1451
  nic_count = 0
1452
  for nic_count, nic in enumerate(instance.nics):
1453
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1454
               nic_count, '%s' % nic.mac)
1455
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1456
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1457
               '%s' % nic.bridge)
1458
  # TODO: redundant: on load can read nics until it doesn't exist
1459
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1460

    
1461
  disk_count = 0
1462
  for disk_count, disk in enumerate(snap_disks):
1463
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1464
               ('%s' % disk.iv_name))
1465
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1466
               ('%s' % disk.physical_id[1]))
1467
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1468
               ('%d' % disk.size))
1469
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1470

    
1471
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1472
  cfo = open(cff, 'w')
1473
  try:
1474
    config.write(cfo)
1475
  finally:
1476
    cfo.close()
1477

    
1478
  shutil.rmtree(finaldestdir, True)
1479
  shutil.move(destdir, finaldestdir)
1480

    
1481
  return True
1482

    
1483

    
1484
def ExportInfo(dest):
1485
  """Get export configuration information.
1486

1487
  Args:
1488
    dest: directory containing the export
1489

1490
  Returns:
1491
    A serializable config file containing the export info.
1492

1493
  """
1494
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1495

    
1496
  config = objects.SerializableConfigParser()
1497
  config.read(cff)
1498

    
1499
  if (not config.has_section(constants.INISECT_EXP) or
1500
      not config.has_section(constants.INISECT_INS)):
1501
    return None
1502

    
1503
  return config
1504

    
1505

    
1506
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1507
                         cluster_name):
1508
  """Import an os image into an instance.
1509

1510
  Args:
1511
    instance: the instance object
1512
    os_disk: the instance-visible name of the os device
1513
    swap_disk: the instance-visible name of the swap device
1514
    src_node: node holding the source image
1515
    src_image: path to the source image on src_node
1516

1517
  Returns:
1518
    False in case of error, True otherwise.
1519

1520
  """
1521
  inst_os = OSFromDisk(instance.os)
1522
  import_script = inst_os.import_script
1523

    
1524
  os_device = instance.FindDisk(os_disk)
1525
  if os_device is None:
1526
    logging.error("Can't find this device-visible name '%s'", os_disk)
1527
    return False
1528

    
1529
  swap_device = instance.FindDisk(swap_disk)
1530
  if swap_device is None:
1531
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1532
    return False
1533

    
1534
  real_os_dev = _RecursiveFindBD(os_device)
1535
  if real_os_dev is None:
1536
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1537
                                  str(os_device))
1538
  real_os_dev.Open()
1539

    
1540
  real_swap_dev = _RecursiveFindBD(swap_device)
1541
  if real_swap_dev is None:
1542
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1543
                                  str(swap_device))
1544
  real_swap_dev.Open()
1545

    
1546
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1547
                                        instance.name, int(time.time()))
1548
  if not os.path.exists(constants.LOG_OS_DIR):
1549
    os.mkdir(constants.LOG_OS_DIR, 0750)
1550

    
1551
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1552
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1553
                                                   constants.GANETI_RUNAS,
1554
                                                   destcmd)
1555

    
1556
  comprcmd = "gunzip"
1557
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1558
                               inst_os.path, import_script, instance.name,
1559
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1560
                               logfile)
1561

    
1562
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1563
  env = {'HYPERVISOR': instance.hypervisor}
1564

    
1565
  result = utils.RunCmd(command, env=env)
1566

    
1567
  if result.failed:
1568
    logging.error("os import command '%s' returned error: %s"
1569
                  " output: %s", command, result.fail_reason, result.output)
1570
    return False
1571

    
1572
  return True
1573

    
1574

    
1575
def ListExports():
1576
  """Return a list of exports currently available on this machine.
1577

1578
  """
1579
  if os.path.isdir(constants.EXPORT_DIR):
1580
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1581
  else:
1582
    return []
1583

    
1584

    
1585
def RemoveExport(export):
1586
  """Remove an existing export from the node.
1587

1588
  Args:
1589
    export: the name of the export to remove
1590

1591
  Returns:
1592
    False in case of error, True otherwise.
1593

1594
  """
1595
  target = os.path.join(constants.EXPORT_DIR, export)
1596

    
1597
  shutil.rmtree(target)
1598
  # TODO: catch some of the relevant exceptions and provide a pretty
1599
  # error message if rmtree fails.
1600

    
1601
  return True
1602

    
1603

    
1604
def RenameBlockDevices(devlist):
1605
  """Rename a list of block devices.
1606

1607
  The devlist argument is a list of tuples (disk, new_logical,
1608
  new_physical). The return value will be a combined boolean result
1609
  (True only if all renames succeeded).
1610

1611
  """
1612
  result = True
1613
  for disk, unique_id in devlist:
1614
    dev = _RecursiveFindBD(disk)
1615
    if dev is None:
1616
      result = False
1617
      continue
1618
    try:
1619
      old_rpath = dev.dev_path
1620
      dev.Rename(unique_id)
1621
      new_rpath = dev.dev_path
1622
      if old_rpath != new_rpath:
1623
        DevCacheManager.RemoveCache(old_rpath)
1624
        # FIXME: we should add the new cache information here, like:
1625
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1626
        # but we don't have the owner here - maybe parse from existing
1627
        # cache? for now, we only lose lvm data when we rename, which
1628
        # is less critical than DRBD or MD
1629
    except errors.BlockDeviceError, err:
1630
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1631
      result = False
1632
  return result
1633

    
1634

    
1635
def _TransformFileStorageDir(file_storage_dir):
1636
  """Checks whether given file_storage_dir is valid.
1637

1638
  Checks wheter the given file_storage_dir is within the cluster-wide
1639
  default file_storage_dir stored in SimpleStore. Only paths under that
1640
  directory are allowed.
1641

1642
  Args:
1643
    file_storage_dir: string with path
1644

1645
  Returns:
1646
    normalized file_storage_dir (string) if valid, None otherwise
1647

1648
  """
1649
  cfg = _GetConfig()
1650
  file_storage_dir = os.path.normpath(file_storage_dir)
1651
  base_file_storage_dir = cfg.GetFileStorageDir()
1652
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1653
      base_file_storage_dir):
1654
    logging.error("file storage directory '%s' is not under base file"
1655
                  " storage directory '%s'",
1656
                  file_storage_dir, base_file_storage_dir)
1657
    return None
1658
  return file_storage_dir
1659

    
1660

    
1661
def CreateFileStorageDir(file_storage_dir):
1662
  """Create file storage directory.
1663

1664
  Args:
1665
    file_storage_dir: string containing the path
1666

1667
  Returns:
1668
    tuple with first element a boolean indicating wheter dir
1669
    creation was successful or not
1670

1671
  """
1672
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1673
  result = True,
1674
  if not file_storage_dir:
1675
    result = False,
1676
  else:
1677
    if os.path.exists(file_storage_dir):
1678
      if not os.path.isdir(file_storage_dir):
1679
        logging.error("'%s' is not a directory", file_storage_dir)
1680
        result = False,
1681
    else:
1682
      try:
1683
        os.makedirs(file_storage_dir, 0750)
1684
      except OSError, err:
1685
        logging.error("Cannot create file storage directory '%s': %s",
1686
                      file_storage_dir, err)
1687
        result = False,
1688
  return result
1689

    
1690

    
1691
def RemoveFileStorageDir(file_storage_dir):
1692
  """Remove file storage directory.
1693

1694
  Remove it only if it's empty. If not log an error and return.
1695

1696
  Args:
1697
    file_storage_dir: string containing the path
1698

1699
  Returns:
1700
    tuple with first element a boolean indicating wheter dir
1701
    removal was successful or not
1702

1703
  """
1704
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1705
  result = True,
1706
  if not file_storage_dir:
1707
    result = False,
1708
  else:
1709
    if os.path.exists(file_storage_dir):
1710
      if not os.path.isdir(file_storage_dir):
1711
        logging.error("'%s' is not a directory", file_storage_dir)
1712
        result = False,
1713
      # deletes dir only if empty, otherwise we want to return False
1714
      try:
1715
        os.rmdir(file_storage_dir)
1716
      except OSError, err:
1717
        logging.exception("Cannot remove file storage directory '%s'",
1718
                          file_storage_dir)
1719
        result = False,
1720
  return result
1721

    
1722

    
1723
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1724
  """Rename the file storage directory.
1725

1726
  Args:
1727
    old_file_storage_dir: string containing the old path
1728
    new_file_storage_dir: string containing the new path
1729

1730
  Returns:
1731
    tuple with first element a boolean indicating wheter dir
1732
    rename was successful or not
1733

1734
  """
1735
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1736
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1737
  result = True,
1738
  if not old_file_storage_dir or not new_file_storage_dir:
1739
    result = False,
1740
  else:
1741
    if not os.path.exists(new_file_storage_dir):
1742
      if os.path.isdir(old_file_storage_dir):
1743
        try:
1744
          os.rename(old_file_storage_dir, new_file_storage_dir)
1745
        except OSError, err:
1746
          logging.exception("Cannot rename '%s' to '%s'",
1747
                            old_file_storage_dir, new_file_storage_dir)
1748
          result =  False,
1749
      else:
1750
        logging.error("'%s' is not a directory", old_file_storage_dir)
1751
        result = False,
1752
    else:
1753
      if os.path.exists(old_file_storage_dir):
1754
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1755
                      old_file_storage_dir, new_file_storage_dir)
1756
        result = False,
1757
  return result
1758

    
1759

    
1760
def _IsJobQueueFile(file_name):
1761
  """Checks whether the given filename is in the queue directory.
1762

1763
  """
1764
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1765
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1766

    
1767
  if not result:
1768
    logging.error("'%s' is not a file in the queue directory",
1769
                  file_name)
1770

    
1771
  return result
1772

    
1773

    
1774
def JobQueueUpdate(file_name, content):
1775
  """Updates a file in the queue directory.
1776

1777
  """
1778
  if not _IsJobQueueFile(file_name):
1779
    return False
1780

    
1781
  # Write and replace the file atomically
1782
  utils.WriteFile(file_name, data=content)
1783

    
1784
  return True
1785

    
1786

    
1787
def JobQueueRename(old, new):
1788
  """Renames a job queue file.
1789

1790
  """
1791
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1792
    return False
1793

    
1794
  os.rename(old, new)
1795

    
1796
  return True
1797

    
1798

    
1799
def JobQueueSetDrainFlag(drain_flag):
1800
  """Set the drain flag for the queue.
1801

1802
  This will set or unset the queue drain flag.
1803

1804
  @type drain_flag: bool
1805
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
1806

1807
  """
1808
  if drain_flag:
1809
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
1810
  else:
1811
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
1812

    
1813
  return True
1814

    
1815

    
1816
def CloseBlockDevices(disks):
1817
  """Closes the given block devices.
1818

1819
  This means they will be switched to secondary mode (in case of DRBD).
1820

1821
  """
1822
  bdevs = []
1823
  for cf in disks:
1824
    rd = _RecursiveFindBD(cf)
1825
    if rd is None:
1826
      return (False, "Can't find device %s" % cf)
1827
    bdevs.append(rd)
1828

    
1829
  msg = []
1830
  for rd in bdevs:
1831
    try:
1832
      rd.Close()
1833
    except errors.BlockDeviceError, err:
1834
      msg.append(str(err))
1835
  if msg:
1836
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1837
  else:
1838
    return (True, "All devices secondary")
1839

    
1840

    
1841
def ValidateHVParams(hvname, hvparams):
1842
  """Validates the given hypervisor parameters.
1843

1844
  @type hvname: string
1845
  @param hvname: the hypervisor name
1846
  @type hvparams: dict
1847
  @param hvparams: the hypervisor parameters to be validated
1848
  @rtype: tuple (bool, str)
1849
  @return: tuple of (success, message)
1850

1851
  """
1852
  try:
1853
    hv_type = hypervisor.GetHypervisor(hvname)
1854
    hv_type.ValidateParameters(hvparams)
1855
    return (True, "Validation passed")
1856
  except errors.HypervisorError, err:
1857
    return (False, str(err))
1858

    
1859

    
1860
class HooksRunner(object):
1861
  """Hook runner.
1862

1863
  This class is instantiated on the node side (ganeti-noded) and not on
1864
  the master side.
1865

1866
  """
1867
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1868

    
1869
  def __init__(self, hooks_base_dir=None):
1870
    """Constructor for hooks runner.
1871

1872
    Args:
1873
      - hooks_base_dir: if not None, this overrides the
1874
        constants.HOOKS_BASE_DIR (useful for unittests)
1875

1876
    """
1877
    if hooks_base_dir is None:
1878
      hooks_base_dir = constants.HOOKS_BASE_DIR
1879
    self._BASE_DIR = hooks_base_dir
1880

    
1881
  @staticmethod
1882
  def ExecHook(script, env):
1883
    """Exec one hook script.
1884

1885
    Args:
1886
     - script: the full path to the script
1887
     - env: the environment with which to exec the script
1888

1889
    """
1890
    # exec the process using subprocess and log the output
1891
    fdstdin = None
1892
    try:
1893
      fdstdin = open("/dev/null", "r")
1894
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1895
                               stderr=subprocess.STDOUT, close_fds=True,
1896
                               shell=False, cwd="/", env=env)
1897
      output = ""
1898
      try:
1899
        output = child.stdout.read(4096)
1900
        child.stdout.close()
1901
      except EnvironmentError, err:
1902
        output += "Hook script error: %s" % str(err)
1903

    
1904
      while True:
1905
        try:
1906
          result = child.wait()
1907
          break
1908
        except EnvironmentError, err:
1909
          if err.errno == errno.EINTR:
1910
            continue
1911
          raise
1912
    finally:
1913
      # try not to leak fds
1914
      for fd in (fdstdin, ):
1915
        if fd is not None:
1916
          try:
1917
            fd.close()
1918
          except EnvironmentError, err:
1919
            # just log the error
1920
            #logging.exception("Error while closing fd %s", fd)
1921
            pass
1922

    
1923
    return result == 0, output
1924

    
1925
  def RunHooks(self, hpath, phase, env):
1926
    """Run the scripts in the hooks directory.
1927

1928
    This method will not be usually overriden by child opcodes.
1929

1930
    """
1931
    if phase == constants.HOOKS_PHASE_PRE:
1932
      suffix = "pre"
1933
    elif phase == constants.HOOKS_PHASE_POST:
1934
      suffix = "post"
1935
    else:
1936
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1937
    rr = []
1938

    
1939
    subdir = "%s-%s.d" % (hpath, suffix)
1940
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1941
    try:
1942
      dir_contents = utils.ListVisibleFiles(dir_name)
1943
    except OSError, err:
1944
      # must log
1945
      return rr
1946

    
1947
    # we use the standard python sort order,
1948
    # so 00name is the recommended naming scheme
1949
    dir_contents.sort()
1950
    for relname in dir_contents:
1951
      fname = os.path.join(dir_name, relname)
1952
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1953
          self.RE_MASK.match(relname) is not None):
1954
        rrval = constants.HKR_SKIP
1955
        output = ""
1956
      else:
1957
        result, output = self.ExecHook(fname, env)
1958
        if not result:
1959
          rrval = constants.HKR_FAIL
1960
        else:
1961
          rrval = constants.HKR_SUCCESS
1962
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1963

    
1964
    return rr
1965

    
1966

    
1967
class IAllocatorRunner(object):
1968
  """IAllocator runner.
1969

1970
  This class is instantiated on the node side (ganeti-noded) and not on
1971
  the master side.
1972

1973
  """
1974
  def Run(self, name, idata):
1975
    """Run an iallocator script.
1976

1977
    Return value: tuple of:
1978
       - run status (one of the IARUN_ constants)
1979
       - stdout
1980
       - stderr
1981
       - fail reason (as from utils.RunResult)
1982

1983
    """
1984
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1985
                                  os.path.isfile)
1986
    if alloc_script is None:
1987
      return (constants.IARUN_NOTFOUND, None, None, None)
1988

    
1989
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1990
    try:
1991
      os.write(fd, idata)
1992
      os.close(fd)
1993
      result = utils.RunCmd([alloc_script, fin_name])
1994
      if result.failed:
1995
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1996
                result.fail_reason)
1997
    finally:
1998
      os.unlink(fin_name)
1999

    
2000
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2001

    
2002

    
2003
class DevCacheManager(object):
2004
  """Simple class for managing a cache of block device information.
2005

2006
  """
2007
  _DEV_PREFIX = "/dev/"
2008
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2009

    
2010
  @classmethod
2011
  def _ConvertPath(cls, dev_path):
2012
    """Converts a /dev/name path to the cache file name.
2013

2014
    This replaces slashes with underscores and strips the /dev
2015
    prefix. It then returns the full path to the cache file
2016

2017
    """
2018
    if dev_path.startswith(cls._DEV_PREFIX):
2019
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2020
    dev_path = dev_path.replace("/", "_")
2021
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2022
    return fpath
2023

    
2024
  @classmethod
2025
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2026
    """Updates the cache information for a given device.
2027

2028
    """
2029
    if dev_path is None:
2030
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2031
      return
2032
    fpath = cls._ConvertPath(dev_path)
2033
    if on_primary:
2034
      state = "primary"
2035
    else:
2036
      state = "secondary"
2037
    if iv_name is None:
2038
      iv_name = "not_visible"
2039
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2040
    try:
2041
      utils.WriteFile(fpath, data=fdata)
2042
    except EnvironmentError, err:
2043
      logging.exception("Can't update bdev cache for %s", dev_path)
2044

    
2045
  @classmethod
2046
  def RemoveCache(cls, dev_path):
2047
    """Remove data for a dev_path.
2048

2049
    """
2050
    if dev_path is None:
2051
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2052
      return
2053
    fpath = cls._ConvertPath(dev_path)
2054
    try:
2055
      utils.RemoveFile(fpath)
2056
    except EnvironmentError, err:
2057
      logging.exception("Can't update bdev cache for %s", dev_path)