Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 9f0e6b37

History | View | Annotate | Download (56.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetConfig():
48
  return ssconf.SimpleConfigReader()
49

    
50

    
51
def _GetSshRunner(cluster_name):
52
  return ssh.SshRunner(cluster_name)
53

    
54

    
55
def _CleanDirectory(path, exclude=[]):
56
  """Removes all regular files in a directory.
57

58
  @param exclude: List of files to be excluded.
59
  @type exclude: list
60

61
  """
62
  if not os.path.isdir(path):
63
    return
64

    
65
  # Normalize excluded paths
66
  exclude = [os.path.normpath(i) for i in exclude]
67

    
68
  for rel_name in utils.ListVisibleFiles(path):
69
    full_name = os.path.normpath(os.path.join(path, rel_name))
70
    if full_name in exclude:
71
      continue
72
    if os.path.isfile(full_name) and not os.path.islink(full_name):
73
      utils.RemoveFile(full_name)
74

    
75

    
76
def JobQueuePurge():
77
  """Removes job queue files and archived jobs
78

79
  """
80
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82

    
83

    
84
def GetMasterInfo():
85
  """Returns master information.
86

87
  This is an utility function to compute master information, either
88
  for consumption here or from the node daemon.
89

90
  @rtype: tuple
91
  @return: (master_netdev, master_ip, master_name)
92

93
  """
94
  try:
95
    cfg = _GetConfig()
96
    master_netdev = cfg.GetMasterNetdev()
97
    master_ip = cfg.GetMasterIP()
98
    master_node = cfg.GetMasterNode()
99
  except errors.ConfigurationError, err:
100
    logging.exception("Cluster configuration incomplete")
101
    return (None, None)
102
  return (master_netdev, master_ip, master_node)
103

    
104

    
105
def StartMaster(start_daemons):
106
  """Activate local node as master node.
107

108
  The function will always try activate the IP address of the master
109
  (if someone else has it, then it won't). Then, if the start_daemons
110
  parameter is True, it will also start the master daemons
111
  (ganet-masterd and ganeti-rapi).
112

113
  """
114
  ok = True
115
  master_netdev, master_ip, _ = GetMasterInfo()
116
  if not master_netdev:
117
    return False
118

    
119
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120
    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
121
                     source=constants.LOCALHOST_IP_ADDRESS):
122
      # we already have the ip:
123
      logging.debug("Already started")
124
    else:
125
      logging.error("Someone else has the master ip, not activating")
126
      ok = False
127
  else:
128
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
129
                           "dev", master_netdev, "label",
130
                           "%s:0" % master_netdev])
131
    if result.failed:
132
      logging.error("Can't activate master IP: %s", result.output)
133
      ok = False
134

    
135
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
136
                           "-s", master_ip, master_ip])
137
    # we'll ignore the exit code of arping
138

    
139
  # and now start the master and rapi daemons
140
  if start_daemons:
141
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
142
      result = utils.RunCmd([daemon])
143
      if result.failed:
144
        logging.error("Can't start daemon %s: %s", daemon, result.output)
145
        ok = False
146
  return ok
147

    
148

    
149
def StopMaster(stop_daemons):
150
  """Deactivate this node as master.
151

152
  The function will always try to deactivate the IP address of the
153
  master. Then, if the stop_daemons parameter is True, it will also
154
  stop the master daemons (ganet-masterd and ganeti-rapi).
155

156
  """
157
  master_netdev, master_ip, _ = GetMasterInfo()
158
  if not master_netdev:
159
    return False
160

    
161
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
162
                         "dev", master_netdev])
163
  if result.failed:
164
    logging.error("Can't remove the master IP, error: %s", result.output)
165
    # but otherwise ignore the failure
166

    
167
  if stop_daemons:
168
    # stop/kill the rapi and the master daemon
169
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
170
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
171

    
172
  return True
173

    
174

    
175
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
176
  """Joins this node to the cluster.
177

178
  This does the following:
179
      - updates the hostkeys of the machine (rsa and dsa)
180
      - adds the ssh private key to the user
181
      - adds the ssh public key to the users' authorized_keys file
182

183
  """
184
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
185
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
186
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
187
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
188
  for name, content, mode in sshd_keys:
189
    utils.WriteFile(name, data=content, mode=mode)
190

    
191
  try:
192
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
193
                                                    mkdir=True)
194
  except errors.OpExecError, err:
195
    logging.exception("Error while processing user ssh files")
196
    return False
197

    
198
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
199
    utils.WriteFile(name, data=content, mode=0600)
200

    
201
  utils.AddAuthorizedKey(auth_keys, sshpub)
202

    
203
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
204

    
205
  return True
206

    
207

    
208
def LeaveCluster():
209
  """Cleans up the current node and prepares it to be removed from the cluster.
210

211
  """
212
  _CleanDirectory(constants.DATA_DIR)
213
  JobQueuePurge()
214

    
215
  try:
216
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
217
  except errors.OpExecError:
218
    logging.exception("Error while processing ssh files")
219
    return
220

    
221
  f = open(pub_key, 'r')
222
  try:
223
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
224
  finally:
225
    f.close()
226

    
227
  utils.RemoveFile(priv_key)
228
  utils.RemoveFile(pub_key)
229

    
230
  # Return a reassuring string to the caller, and quit
231
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
232

    
233

    
234
def GetNodeInfo(vgname):
235
  """Gives back a hash with different informations about the node.
236

237
  Returns:
238
    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
239
      'memory_free' : xxx, 'memory_total' : xxx }
240
    where
241
    vg_size is the size of the configured volume group in MiB
242
    vg_free is the free size of the volume group in MiB
243
    memory_dom0 is the memory allocated for domain0 in MiB
244
    memory_free is the currently available (free) ram in MiB
245
    memory_total is the total number of ram in MiB
246

247
  """
248
  outputarray = {}
249
  vginfo = _GetVGInfo(vgname)
250
  outputarray['vg_size'] = vginfo['vg_size']
251
  outputarray['vg_free'] = vginfo['vg_free']
252

    
253
  hyper = hypervisor.GetHypervisor(_GetConfig())
254
  hyp_info = hyper.GetNodeInfo()
255
  if hyp_info is not None:
256
    outputarray.update(hyp_info)
257

    
258
  f = open("/proc/sys/kernel/random/boot_id", 'r')
259
  try:
260
    outputarray["bootid"] = f.read(128).rstrip("\n")
261
  finally:
262
    f.close()
263

    
264
  return outputarray
265

    
266

    
267
def VerifyNode(what, cluster_name):
268
  """Verify the status of the local node.
269

270
  Args:
271
    what - a dictionary of things to check:
272
      'filelist' : list of files for which to compute checksums
273
      'nodelist' : list of nodes we should check communication with
274
      'hypervisor': run the hypervisor-specific verify
275

276
  Requested files on local node are checksummed and the result returned.
277

278
  The nodelist is traversed, with the following checks being made
279
  for each node:
280
  - known_hosts key correct
281
  - correct resolving of node name (target node returns its own hostname
282
    by ssh-execution of 'hostname', result compared against name in list.
283

284
  """
285
  result = {}
286

    
287
  if 'hypervisor' in what:
288
    result['hypervisor'] = hypervisor.GetHypervisor(_GetConfig()).Verify()
289

    
290
  if 'filelist' in what:
291
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
292

    
293
  if 'nodelist' in what:
294
    result['nodelist'] = {}
295
    random.shuffle(what['nodelist'])
296
    for node in what['nodelist']:
297
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
298
      if not success:
299
        result['nodelist'][node] = message
300
  if 'node-net-test' in what:
301
    result['node-net-test'] = {}
302
    my_name = utils.HostInfo().name
303
    my_pip = my_sip = None
304
    for name, pip, sip in what['node-net-test']:
305
      if name == my_name:
306
        my_pip = pip
307
        my_sip = sip
308
        break
309
    if not my_pip:
310
      result['node-net-test'][my_name] = ("Can't find my own"
311
                                          " primary/secondary IP"
312
                                          " in the node list")
313
    else:
314
      port = utils.GetNodeDaemonPort()
315
      for name, pip, sip in what['node-net-test']:
316
        fail = []
317
        if not utils.TcpPing(pip, port, source=my_pip):
318
          fail.append("primary")
319
        if sip != pip:
320
          if not utils.TcpPing(sip, port, source=my_sip):
321
            fail.append("secondary")
322
        if fail:
323
          result['node-net-test'][name] = ("failure using the %s"
324
                                           " interface(s)" %
325
                                           " and ".join(fail))
326

    
327
  return result
328

    
329

    
330
def GetVolumeList(vg_name):
331
  """Compute list of logical volumes and their size.
332

333
  Returns:
334
    dictionary of all partions (key) with their size (in MiB), inactive
335
    and online status:
336
    {'test1': ('20.06', True, True)}
337

338
  """
339
  lvs = {}
340
  sep = '|'
341
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
342
                         "--separator=%s" % sep,
343
                         "-olv_name,lv_size,lv_attr", vg_name])
344
  if result.failed:
345
    logging.error("Failed to list logical volumes, lvs output: %s",
346
                  result.output)
347
    return result.output
348

    
349
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
350
  for line in result.stdout.splitlines():
351
    line = line.strip()
352
    match = valid_line_re.match(line)
353
    if not match:
354
      logging.error("Invalid line returned from lvs output: '%s'", line)
355
      continue
356
    name, size, attr = match.groups()
357
    inactive = attr[4] == '-'
358
    online = attr[5] == 'o'
359
    lvs[name] = (size, inactive, online)
360

    
361
  return lvs
362

    
363

    
364
def ListVolumeGroups():
365
  """List the volume groups and their size.
366

367
  Returns:
368
    Dictionary with keys volume name and values the size of the volume
369

370
  """
371
  return utils.ListVolumeGroups()
372

    
373

    
374
def NodeVolumes():
375
  """List all volumes on this node.
376

377
  """
378
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
379
                         "--separator=|",
380
                         "--options=lv_name,lv_size,devices,vg_name"])
381
  if result.failed:
382
    logging.error("Failed to list logical volumes, lvs output: %s",
383
                  result.output)
384
    return {}
385

    
386
  def parse_dev(dev):
387
    if '(' in dev:
388
      return dev.split('(')[0]
389
    else:
390
      return dev
391

    
392
  def map_line(line):
393
    return {
394
      'name': line[0].strip(),
395
      'size': line[1].strip(),
396
      'dev': parse_dev(line[2].strip()),
397
      'vg': line[3].strip(),
398
    }
399

    
400
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
401
          if line.count('|') >= 3]
402

    
403

    
404
def BridgesExist(bridges_list):
405
  """Check if a list of bridges exist on the current node.
406

407
  Returns:
408
    True if all of them exist, false otherwise
409

410
  """
411
  for bridge in bridges_list:
412
    if not utils.BridgeExists(bridge):
413
      return False
414

    
415
  return True
416

    
417

    
418
def GetInstanceList():
419
  """Provides a list of instances.
420

421
  Returns:
422
    A list of all running instances on the current node
423
    - instance1.example.com
424
    - instance2.example.com
425

426
  """
427
  try:
428
    names = hypervisor.GetHypervisor(_GetConfig()).ListInstances()
429
  except errors.HypervisorError, err:
430
    logging.exception("Error enumerating instances")
431
    raise
432

    
433
  return names
434

    
435

    
436
def GetInstanceInfo(instance):
437
  """Gives back the informations about an instance as a dictionary.
438

439
  Args:
440
    instance: name of the instance (ex. instance1.example.com)
441

442
  Returns:
443
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
444
    where
445
    memory: memory size of instance (int)
446
    state: xen state of instance (string)
447
    time: cpu time of instance (float)
448

449
  """
450
  output = {}
451

    
452
  iinfo = hypervisor.GetHypervisor(_GetConfig()).GetInstanceInfo(instance)
453
  if iinfo is not None:
454
    output['memory'] = iinfo[2]
455
    output['state'] = iinfo[4]
456
    output['time'] = iinfo[5]
457

    
458
  return output
459

    
460

    
461
def GetAllInstancesInfo():
462
  """Gather data about all instances.
463

464
  This is the equivalent of `GetInstanceInfo()`, except that it
465
  computes data for all instances at once, thus being faster if one
466
  needs data about more than one instance.
467

468
  Returns: a dictionary of dictionaries, keys being the instance name,
469
    and with values:
470
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
471
    where
472
    memory: memory size of instance (int)
473
    state: xen state of instance (string)
474
    time: cpu time of instance (float)
475
    vcpus: the number of cpus
476

477
  """
478
  output = {}
479

    
480
  iinfo = hypervisor.GetHypervisor(_GetConfig()).GetAllInstancesInfo()
481
  if iinfo:
482
    for name, inst_id, memory, vcpus, state, times in iinfo:
483
      output[name] = {
484
        'memory': memory,
485
        'vcpus': vcpus,
486
        'state': state,
487
        'time': times,
488
        }
489

    
490
  return output
491

    
492

    
493
def AddOSToInstance(instance, os_disk, swap_disk):
494
  """Add an OS to an instance.
495

496
  Args:
497
    instance: the instance object
498
    os_disk: the instance-visible name of the os device
499
    swap_disk: the instance-visible name of the swap device
500

501
  """
502
  cfg = _GetConfig()
503
  inst_os = OSFromDisk(instance.os)
504

    
505
  create_script = inst_os.create_script
506

    
507
  os_device = instance.FindDisk(os_disk)
508
  if os_device is None:
509
    logging.error("Can't find this device-visible name '%s'", os_disk)
510
    return False
511

    
512
  swap_device = instance.FindDisk(swap_disk)
513
  if swap_device is None:
514
    logging.error("Can't find this device-visible name '%s'", swap_disk)
515
    return False
516

    
517
  real_os_dev = _RecursiveFindBD(os_device)
518
  if real_os_dev is None:
519
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
520
                                  str(os_device))
521
  real_os_dev.Open()
522

    
523
  real_swap_dev = _RecursiveFindBD(swap_device)
524
  if real_swap_dev is None:
525
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
526
                                  str(swap_device))
527
  real_swap_dev.Open()
528

    
529
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
530
                                     instance.name, int(time.time()))
531
  if not os.path.exists(constants.LOG_OS_DIR):
532
    os.mkdir(constants.LOG_OS_DIR, 0750)
533

    
534
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
535
                                inst_os.path, create_script, instance.name,
536
                                real_os_dev.dev_path, real_swap_dev.dev_path,
537
                                logfile)
538
  env = {'HYPERVISOR': cfg.GetHypervisorType()}
539

    
540
  result = utils.RunCmd(command, env=env)
541
  if result.failed:
542
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
543
                  " output: %s", command, result.fail_reason, logfile,
544
                  result.output)
545
    return False
546

    
547
  return True
548

    
549

    
550
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
551
  """Run the OS rename script for an instance.
552

553
  Args:
554
    instance: the instance object
555
    old_name: the old name of the instance
556
    os_disk: the instance-visible name of the os device
557
    swap_disk: the instance-visible name of the swap device
558

559
  """
560
  inst_os = OSFromDisk(instance.os)
561

    
562
  script = inst_os.rename_script
563

    
564
  os_device = instance.FindDisk(os_disk)
565
  if os_device is None:
566
    logging.error("Can't find this device-visible name '%s'", os_disk)
567
    return False
568

    
569
  swap_device = instance.FindDisk(swap_disk)
570
  if swap_device is None:
571
    logging.error("Can't find this device-visible name '%s'", swap_disk)
572
    return False
573

    
574
  real_os_dev = _RecursiveFindBD(os_device)
575
  if real_os_dev is None:
576
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
577
                                  str(os_device))
578
  real_os_dev.Open()
579

    
580
  real_swap_dev = _RecursiveFindBD(swap_device)
581
  if real_swap_dev is None:
582
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
583
                                  str(swap_device))
584
  real_swap_dev.Open()
585

    
586
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
587
                                           old_name,
588
                                           instance.name, int(time.time()))
589
  if not os.path.exists(constants.LOG_OS_DIR):
590
    os.mkdir(constants.LOG_OS_DIR, 0750)
591

    
592
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
593
                                inst_os.path, script, old_name, instance.name,
594
                                real_os_dev.dev_path, real_swap_dev.dev_path,
595
                                logfile)
596

    
597
  result = utils.RunCmd(command)
598

    
599
  if result.failed:
600
    logging.error("os create command '%s' returned error: %s output: %s",
601
                  command, result.fail_reason, result.output)
602
    return False
603

    
604
  return True
605

    
606

    
607
def _GetVGInfo(vg_name):
608
  """Get informations about the volume group.
609

610
  Args:
611
    vg_name: the volume group
612

613
  Returns:
614
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
615
    where
616
    vg_size is the total size of the volume group in MiB
617
    vg_free is the free size of the volume group in MiB
618
    pv_count are the number of physical disks in that vg
619

620
  If an error occurs during gathering of data, we return the same dict
621
  with keys all set to None.
622

623
  """
624
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
625

    
626
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
627
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
628

    
629
  if retval.failed:
630
    logging.error("volume group %s not present", vg_name)
631
    return retdic
632
  valarr = retval.stdout.strip().rstrip(':').split(':')
633
  if len(valarr) == 3:
634
    try:
635
      retdic = {
636
        "vg_size": int(round(float(valarr[0]), 0)),
637
        "vg_free": int(round(float(valarr[1]), 0)),
638
        "pv_count": int(valarr[2]),
639
        }
640
    except ValueError, err:
641
      logging.exception("Fail to parse vgs output")
642
  else:
643
    logging.error("vgs output has the wrong number of fields (expected"
644
                  " three): %s", str(valarr))
645
  return retdic
646

    
647

    
648
def _GatherBlockDevs(instance):
649
  """Set up an instance's block device(s).
650

651
  This is run on the primary node at instance startup. The block
652
  devices must be already assembled.
653

654
  """
655
  block_devices = []
656
  for disk in instance.disks:
657
    device = _RecursiveFindBD(disk)
658
    if device is None:
659
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
660
                                    str(disk))
661
    device.Open()
662
    block_devices.append((disk, device))
663
  return block_devices
664

    
665

    
666
def StartInstance(instance, extra_args):
667
  """Start an instance.
668

669
  Args:
670
    instance - name of instance to start.
671

672
  """
673
  running_instances = GetInstanceList()
674

    
675
  if instance.name in running_instances:
676
    return True
677

    
678
  block_devices = _GatherBlockDevs(instance)
679
  hyper = hypervisor.GetHypervisor(_GetConfig())
680

    
681
  try:
682
    hyper.StartInstance(instance, block_devices, extra_args)
683
  except errors.HypervisorError, err:
684
    logging.exception("Failed to start instance")
685
    return False
686

    
687
  return True
688

    
689

    
690
def ShutdownInstance(instance):
691
  """Shut an instance down.
692

693
  Args:
694
    instance - name of instance to shutdown.
695

696
  """
697
  running_instances = GetInstanceList()
698

    
699
  if instance.name not in running_instances:
700
    return True
701

    
702
  hyper = hypervisor.GetHypervisor(_GetConfig())
703
  try:
704
    hyper.StopInstance(instance)
705
  except errors.HypervisorError, err:
706
    logging.error("Failed to stop instance")
707
    return False
708

    
709
  # test every 10secs for 2min
710
  shutdown_ok = False
711

    
712
  time.sleep(1)
713
  for dummy in range(11):
714
    if instance.name not in GetInstanceList():
715
      break
716
    time.sleep(10)
717
  else:
718
    # the shutdown did not succeed
719
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
720

    
721
    try:
722
      hyper.StopInstance(instance, force=True)
723
    except errors.HypervisorError, err:
724
      logging.exception("Failed to stop instance")
725
      return False
726

    
727
    time.sleep(1)
728
    if instance.name in GetInstanceList():
729
      logging.error("could not shutdown instance '%s' even by destroy",
730
                    instance.name)
731
      return False
732

    
733
  return True
734

    
735

    
736
def RebootInstance(instance, reboot_type, extra_args):
737
  """Reboot an instance.
738

739
  Args:
740
    instance    - name of instance to reboot
741
    reboot_type - how to reboot [soft,hard,full]
742

743
  """
744
  running_instances = GetInstanceList()
745

    
746
  if instance.name not in running_instances:
747
    logging.error("Cannot reboot instance that is not running")
748
    return False
749

    
750
  hyper = hypervisor.GetHypervisor(_GetConfig())
751
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
752
    try:
753
      hyper.RebootInstance(instance)
754
    except errors.HypervisorError, err:
755
      logging.exception("Failed to soft reboot instance")
756
      return False
757
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
758
    try:
759
      ShutdownInstance(instance)
760
      StartInstance(instance, extra_args)
761
    except errors.HypervisorError, err:
762
      logging.exception("Failed to hard reboot instance")
763
      return False
764
  else:
765
    raise errors.ParameterError("reboot_type invalid")
766

    
767

    
768
  return True
769

    
770

    
771
def MigrateInstance(instance, target, live):
772
  """Migrates an instance to another node.
773

774
  @type instance: C{objects.Instance}
775
  @param instance: the instance definition
776
  @type target: string
777
  @param target: the target node name
778
  @type live: boolean
779
  @param live: whether the migration should be done live or not (the
780
      interpretation of this parameter is left to the hypervisor)
781
  @rtype: tuple
782
  @return: a tuple of (success, msg) where:
783
      - succes is a boolean denoting the success/failure of the operation
784
      - msg is a string with details in case of failure
785

786
  """
787
  hyper = hypervisor.GetHypervisor(_GetConfig())
788

    
789
  try:
790
    hyper.MigrateInstance(instance.name, target, live)
791
  except errors.HypervisorError, err:
792
    msg = "Failed to migrate instance: %s" % str(err)
793
    logging.error(msg)
794
    return (False, msg)
795
  return (True, "Migration successfull")
796

    
797

    
798
def CreateBlockDevice(disk, size, owner, on_primary, info):
799
  """Creates a block device for an instance.
800

801
  Args:
802
   disk: a ganeti.objects.Disk object
803
   size: the size of the physical underlying device
804
   owner: a string with the name of the instance
805
   on_primary: a boolean indicating if it is the primary node or not
806
   info: string that will be sent to the physical device creation
807

808
  Returns:
809
    the new unique_id of the device (this can sometime be
810
    computed only after creation), or None. On secondary nodes,
811
    it's not required to return anything.
812

813
  """
814
  clist = []
815
  if disk.children:
816
    for child in disk.children:
817
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
818
      if on_primary or disk.AssembleOnSecondary():
819
        # we need the children open in case the device itself has to
820
        # be assembled
821
        crdev.Open()
822
      clist.append(crdev)
823
  try:
824
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
825
    if device is not None:
826
      logging.info("removing existing device %s", disk)
827
      device.Remove()
828
  except errors.BlockDeviceError, err:
829
    pass
830

    
831
  device = bdev.Create(disk.dev_type, disk.physical_id,
832
                       clist, size)
833
  if device is None:
834
    raise ValueError("Can't create child device for %s, %s" %
835
                     (disk, size))
836
  if on_primary or disk.AssembleOnSecondary():
837
    if not device.Assemble():
838
      errorstring = "Can't assemble device after creation"
839
      logging.error(errorstring)
840
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
841
                                    " daemon logs" % errorstring)
842
    device.SetSyncSpeed(constants.SYNC_SPEED)
843
    if on_primary or disk.OpenOnSecondary():
844
      device.Open(force=True)
845
    DevCacheManager.UpdateCache(device.dev_path, owner,
846
                                on_primary, disk.iv_name)
847

    
848
  device.SetInfo(info)
849

    
850
  physical_id = device.unique_id
851
  return physical_id
852

    
853

    
854
def RemoveBlockDevice(disk):
855
  """Remove a block device.
856

857
  This is intended to be called recursively.
858

859
  """
860
  try:
861
    # since we are removing the device, allow a partial match
862
    # this allows removal of broken mirrors
863
    rdev = _RecursiveFindBD(disk, allow_partial=True)
864
  except errors.BlockDeviceError, err:
865
    # probably can't attach
866
    logging.info("Can't attach to device %s in remove", disk)
867
    rdev = None
868
  if rdev is not None:
869
    r_path = rdev.dev_path
870
    result = rdev.Remove()
871
    if result:
872
      DevCacheManager.RemoveCache(r_path)
873
  else:
874
    result = True
875
  if disk.children:
876
    for child in disk.children:
877
      result = result and RemoveBlockDevice(child)
878
  return result
879

    
880

    
881
def _RecursiveAssembleBD(disk, owner, as_primary):
882
  """Activate a block device for an instance.
883

884
  This is run on the primary and secondary nodes for an instance.
885

886
  This function is called recursively.
887

888
  Args:
889
    disk: a objects.Disk object
890
    as_primary: if we should make the block device read/write
891

892
  Returns:
893
    the assembled device or None (in case no device was assembled)
894

895
  If the assembly is not successful, an exception is raised.
896

897
  """
898
  children = []
899
  if disk.children:
900
    mcn = disk.ChildrenNeeded()
901
    if mcn == -1:
902
      mcn = 0 # max number of Nones allowed
903
    else:
904
      mcn = len(disk.children) - mcn # max number of Nones
905
    for chld_disk in disk.children:
906
      try:
907
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
908
      except errors.BlockDeviceError, err:
909
        if children.count(None) >= mcn:
910
          raise
911
        cdev = None
912
        logging.debug("Error in child activation: %s", str(err))
913
      children.append(cdev)
914

    
915
  if as_primary or disk.AssembleOnSecondary():
916
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
917
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
918
    result = r_dev
919
    if as_primary or disk.OpenOnSecondary():
920
      r_dev.Open()
921
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
922
                                as_primary, disk.iv_name)
923

    
924
  else:
925
    result = True
926
  return result
927

    
928

    
929
def AssembleBlockDevice(disk, owner, as_primary):
930
  """Activate a block device for an instance.
931

932
  This is a wrapper over _RecursiveAssembleBD.
933

934
  Returns:
935
    a /dev path for primary nodes
936
    True for secondary nodes
937

938
  """
939
  result = _RecursiveAssembleBD(disk, owner, as_primary)
940
  if isinstance(result, bdev.BlockDev):
941
    result = result.dev_path
942
  return result
943

    
944

    
945
def ShutdownBlockDevice(disk):
946
  """Shut down a block device.
947

948
  First, if the device is assembled (can `Attach()`), then the device
949
  is shutdown. Then the children of the device are shutdown.
950

951
  This function is called recursively. Note that we don't cache the
952
  children or such, as oppossed to assemble, shutdown of different
953
  devices doesn't require that the upper device was active.
954

955
  """
956
  r_dev = _RecursiveFindBD(disk)
957
  if r_dev is not None:
958
    r_path = r_dev.dev_path
959
    result = r_dev.Shutdown()
960
    if result:
961
      DevCacheManager.RemoveCache(r_path)
962
  else:
963
    result = True
964
  if disk.children:
965
    for child in disk.children:
966
      result = result and ShutdownBlockDevice(child)
967
  return result
968

    
969

    
970
def MirrorAddChildren(parent_cdev, new_cdevs):
971
  """Extend a mirrored block device.
972

973
  """
974
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
975
  if parent_bdev is None:
976
    logging.error("Can't find parent device")
977
    return False
978
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
979
  if new_bdevs.count(None) > 0:
980
    logging.error("Can't find new device(s) to add: %s:%s",
981
                  new_bdevs, new_cdevs)
982
    return False
983
  parent_bdev.AddChildren(new_bdevs)
984
  return True
985

    
986

    
987
def MirrorRemoveChildren(parent_cdev, new_cdevs):
988
  """Shrink a mirrored block device.
989

990
  """
991
  parent_bdev = _RecursiveFindBD(parent_cdev)
992
  if parent_bdev is None:
993
    logging.error("Can't find parent in remove children: %s", parent_cdev)
994
    return False
995
  devs = []
996
  for disk in new_cdevs:
997
    rpath = disk.StaticDevPath()
998
    if rpath is None:
999
      bd = _RecursiveFindBD(disk)
1000
      if bd is None:
1001
        logging.error("Can't find dynamic device %s while removing children",
1002
                      disk)
1003
        return False
1004
      else:
1005
        devs.append(bd.dev_path)
1006
    else:
1007
      devs.append(rpath)
1008
  parent_bdev.RemoveChildren(devs)
1009
  return True
1010

    
1011

    
1012
def GetMirrorStatus(disks):
1013
  """Get the mirroring status of a list of devices.
1014

1015
  Args:
1016
    disks: list of `objects.Disk`
1017

1018
  Returns:
1019
    list of (mirror_done, estimated_time) tuples, which
1020
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1021

1022
  """
1023
  stats = []
1024
  for dsk in disks:
1025
    rbd = _RecursiveFindBD(dsk)
1026
    if rbd is None:
1027
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1028
    stats.append(rbd.CombinedSyncStatus())
1029
  return stats
1030

    
1031

    
1032
def _RecursiveFindBD(disk, allow_partial=False):
1033
  """Check if a device is activated.
1034

1035
  If so, return informations about the real device.
1036

1037
  Args:
1038
    disk: the objects.Disk instance
1039
    allow_partial: don't abort the find if a child of the
1040
                   device can't be found; this is intended to be
1041
                   used when repairing mirrors
1042

1043
  Returns:
1044
    None if the device can't be found
1045
    otherwise the device instance
1046

1047
  """
1048
  children = []
1049
  if disk.children:
1050
    for chdisk in disk.children:
1051
      children.append(_RecursiveFindBD(chdisk))
1052

    
1053
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1054

    
1055

    
1056
def FindBlockDevice(disk):
1057
  """Check if a device is activated.
1058

1059
  If so, return informations about the real device.
1060

1061
  Args:
1062
    disk: the objects.Disk instance
1063
  Returns:
1064
    None if the device can't be found
1065
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1066

1067
  """
1068
  rbd = _RecursiveFindBD(disk)
1069
  if rbd is None:
1070
    return rbd
1071
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1072

    
1073

    
1074
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1075
  """Write a file to the filesystem.
1076

1077
  This allows the master to overwrite(!) a file. It will only perform
1078
  the operation if the file belongs to a list of configuration files.
1079

1080
  """
1081
  if not os.path.isabs(file_name):
1082
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1083
                  file_name)
1084
    return False
1085

    
1086
  allowed_files = [
1087
    constants.CLUSTER_CONF_FILE,
1088
    constants.ETC_HOSTS,
1089
    constants.SSH_KNOWN_HOSTS_FILE,
1090
    constants.VNC_PASSWORD_FILE,
1091
    ]
1092

    
1093
  if file_name not in allowed_files:
1094
    logging.error("Filename passed to UploadFile not in allowed"
1095
                 " upload targets: '%s'", file_name)
1096
    return False
1097

    
1098
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1099
                  atime=atime, mtime=mtime)
1100
  return True
1101

    
1102

    
1103
def _ErrnoOrStr(err):
1104
  """Format an EnvironmentError exception.
1105

1106
  If the `err` argument has an errno attribute, it will be looked up
1107
  and converted into a textual EXXXX description. Otherwise the string
1108
  representation of the error will be returned.
1109

1110
  """
1111
  if hasattr(err, 'errno'):
1112
    detail = errno.errorcode[err.errno]
1113
  else:
1114
    detail = str(err)
1115
  return detail
1116

    
1117

    
1118
def _OSOndiskVersion(name, os_dir):
1119
  """Compute and return the API version of a given OS.
1120

1121
  This function will try to read the API version of the os given by
1122
  the 'name' parameter and residing in the 'os_dir' directory.
1123

1124
  Return value will be either an integer denoting the version or None in the
1125
  case when this is not a valid OS name.
1126

1127
  """
1128
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1129

    
1130
  try:
1131
    st = os.stat(api_file)
1132
  except EnvironmentError, err:
1133
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1134
                           " found (%s)" % _ErrnoOrStr(err))
1135

    
1136
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1137
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1138
                           " a regular file")
1139

    
1140
  try:
1141
    f = open(api_file)
1142
    try:
1143
      api_version = f.read(256)
1144
    finally:
1145
      f.close()
1146
  except EnvironmentError, err:
1147
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1148
                           " API version (%s)" % _ErrnoOrStr(err))
1149

    
1150
  api_version = api_version.strip()
1151
  try:
1152
    api_version = int(api_version)
1153
  except (TypeError, ValueError), err:
1154
    raise errors.InvalidOS(name, os_dir,
1155
                           "API version is not integer (%s)" % str(err))
1156

    
1157
  return api_version
1158

    
1159

    
1160
def DiagnoseOS(top_dirs=None):
1161
  """Compute the validity for all OSes.
1162

1163
  Returns an OS object for each name in all the given top directories
1164
  (if not given defaults to constants.OS_SEARCH_PATH)
1165

1166
  Returns:
1167
    list of OS objects
1168

1169
  """
1170
  if top_dirs is None:
1171
    top_dirs = constants.OS_SEARCH_PATH
1172

    
1173
  result = []
1174
  for dir_name in top_dirs:
1175
    if os.path.isdir(dir_name):
1176
      try:
1177
        f_names = utils.ListVisibleFiles(dir_name)
1178
      except EnvironmentError, err:
1179
        logging.exception("Can't list the OS directory %s", dir_name)
1180
        break
1181
      for name in f_names:
1182
        try:
1183
          os_inst = OSFromDisk(name, base_dir=dir_name)
1184
          result.append(os_inst)
1185
        except errors.InvalidOS, err:
1186
          result.append(objects.OS.FromInvalidOS(err))
1187

    
1188
  return result
1189

    
1190

    
1191
def OSFromDisk(name, base_dir=None):
1192
  """Create an OS instance from disk.
1193

1194
  This function will return an OS instance if the given name is a
1195
  valid OS name. Otherwise, it will raise an appropriate
1196
  `errors.InvalidOS` exception, detailing why this is not a valid
1197
  OS.
1198

1199
  Args:
1200
    os_dir: Directory containing the OS scripts. Defaults to a search
1201
            in all the OS_SEARCH_PATH directories.
1202

1203
  """
1204

    
1205
  if base_dir is None:
1206
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1207
    if os_dir is None:
1208
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1209
  else:
1210
    os_dir = os.path.sep.join([base_dir, name])
1211

    
1212
  api_version = _OSOndiskVersion(name, os_dir)
1213

    
1214
  if api_version != constants.OS_API_VERSION:
1215
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1216
                           " (found %s want %s)"
1217
                           % (api_version, constants.OS_API_VERSION))
1218

    
1219
  # OS Scripts dictionary, we will populate it with the actual script names
1220
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1221

    
1222
  for script in os_scripts:
1223
    os_scripts[script] = os.path.sep.join([os_dir, script])
1224

    
1225
    try:
1226
      st = os.stat(os_scripts[script])
1227
    except EnvironmentError, err:
1228
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1229
                             (script, _ErrnoOrStr(err)))
1230

    
1231
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1232
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1233
                             script)
1234

    
1235
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1236
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1237
                             script)
1238

    
1239

    
1240
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1241
                    create_script=os_scripts['create'],
1242
                    export_script=os_scripts['export'],
1243
                    import_script=os_scripts['import'],
1244
                    rename_script=os_scripts['rename'],
1245
                    api_version=api_version)
1246

    
1247

    
1248
def GrowBlockDevice(disk, amount):
1249
  """Grow a stack of block devices.
1250

1251
  This function is called recursively, with the childrens being the
1252
  first one resize.
1253

1254
  Args:
1255
    disk: the disk to be grown
1256

1257
  Returns: a tuple of (status, result), with:
1258
    status: the result (true/false) of the operation
1259
    result: the error message if the operation failed, otherwise not used
1260

1261
  """
1262
  r_dev = _RecursiveFindBD(disk)
1263
  if r_dev is None:
1264
    return False, "Cannot find block device %s" % (disk,)
1265

    
1266
  try:
1267
    r_dev.Grow(amount)
1268
  except errors.BlockDeviceError, err:
1269
    return False, str(err)
1270

    
1271
  return True, None
1272

    
1273

    
1274
def SnapshotBlockDevice(disk):
1275
  """Create a snapshot copy of a block device.
1276

1277
  This function is called recursively, and the snapshot is actually created
1278
  just for the leaf lvm backend device.
1279

1280
  Args:
1281
    disk: the disk to be snapshotted
1282

1283
  Returns:
1284
    a config entry for the actual lvm device snapshotted.
1285

1286
  """
1287
  if disk.children:
1288
    if len(disk.children) == 1:
1289
      # only one child, let's recurse on it
1290
      return SnapshotBlockDevice(disk.children[0])
1291
    else:
1292
      # more than one child, choose one that matches
1293
      for child in disk.children:
1294
        if child.size == disk.size:
1295
          # return implies breaking the loop
1296
          return SnapshotBlockDevice(child)
1297
  elif disk.dev_type == constants.LD_LV:
1298
    r_dev = _RecursiveFindBD(disk)
1299
    if r_dev is not None:
1300
      # let's stay on the safe side and ask for the full size, for now
1301
      return r_dev.Snapshot(disk.size)
1302
    else:
1303
      return None
1304
  else:
1305
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1306
                                 " '%s' of type '%s'" %
1307
                                 (disk.unique_id, disk.dev_type))
1308

    
1309

    
1310
def ExportSnapshot(disk, dest_node, instance, cluster_name):
1311
  """Export a block device snapshot to a remote node.
1312

1313
  Args:
1314
    disk: the snapshot block device
1315
    dest_node: the node to send the image to
1316
    instance: instance being exported
1317

1318
  Returns:
1319
    True if successful, False otherwise.
1320

1321
  """
1322
  inst_os = OSFromDisk(instance.os)
1323
  export_script = inst_os.export_script
1324

    
1325
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1326
                                     instance.name, int(time.time()))
1327
  if not os.path.exists(constants.LOG_OS_DIR):
1328
    os.mkdir(constants.LOG_OS_DIR, 0750)
1329

    
1330
  real_os_dev = _RecursiveFindBD(disk)
1331
  if real_os_dev is None:
1332
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1333
                                  str(disk))
1334
  real_os_dev.Open()
1335

    
1336
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1337
  destfile = disk.physical_id[1]
1338

    
1339
  # the target command is built out of three individual commands,
1340
  # which are joined by pipes; we check each individual command for
1341
  # valid parameters
1342

    
1343
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1344
                               export_script, instance.name,
1345
                               real_os_dev.dev_path, logfile)
1346

    
1347
  comprcmd = "gzip"
1348

    
1349
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1350
                                destdir, destdir, destfile)
1351
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1352
                                                   constants.GANETI_RUNAS,
1353
                                                   destcmd)
1354

    
1355
  # all commands have been checked, so we're safe to combine them
1356
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1357

    
1358
  result = utils.RunCmd(command)
1359

    
1360
  if result.failed:
1361
    logging.error("os snapshot export command '%s' returned error: %s"
1362
                  " output: %s", command, result.fail_reason, result.output)
1363
    return False
1364

    
1365
  return True
1366

    
1367

    
1368
def FinalizeExport(instance, snap_disks):
1369
  """Write out the export configuration information.
1370

1371
  Args:
1372
    instance: instance configuration
1373
    snap_disks: snapshot block devices
1374

1375
  Returns:
1376
    False in case of error, True otherwise.
1377

1378
  """
1379
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1380
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1381

    
1382
  config = objects.SerializableConfigParser()
1383

    
1384
  config.add_section(constants.INISECT_EXP)
1385
  config.set(constants.INISECT_EXP, 'version', '0')
1386
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1387
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1388
  config.set(constants.INISECT_EXP, 'os', instance.os)
1389
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1390

    
1391
  config.add_section(constants.INISECT_INS)
1392
  config.set(constants.INISECT_INS, 'name', instance.name)
1393
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1394
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1395
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1396

    
1397
  nic_count = 0
1398
  for nic_count, nic in enumerate(instance.nics):
1399
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1400
               nic_count, '%s' % nic.mac)
1401
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1402
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1403
               '%s' % nic.bridge)
1404
  # TODO: redundant: on load can read nics until it doesn't exist
1405
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1406

    
1407
  disk_count = 0
1408
  for disk_count, disk in enumerate(snap_disks):
1409
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1410
               ('%s' % disk.iv_name))
1411
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1412
               ('%s' % disk.physical_id[1]))
1413
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1414
               ('%d' % disk.size))
1415
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1416

    
1417
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1418
  cfo = open(cff, 'w')
1419
  try:
1420
    config.write(cfo)
1421
  finally:
1422
    cfo.close()
1423

    
1424
  shutil.rmtree(finaldestdir, True)
1425
  shutil.move(destdir, finaldestdir)
1426

    
1427
  return True
1428

    
1429

    
1430
def ExportInfo(dest):
1431
  """Get export configuration information.
1432

1433
  Args:
1434
    dest: directory containing the export
1435

1436
  Returns:
1437
    A serializable config file containing the export info.
1438

1439
  """
1440
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1441

    
1442
  config = objects.SerializableConfigParser()
1443
  config.read(cff)
1444

    
1445
  if (not config.has_section(constants.INISECT_EXP) or
1446
      not config.has_section(constants.INISECT_INS)):
1447
    return None
1448

    
1449
  return config
1450

    
1451

    
1452
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image,
1453
                         cluster_name):
1454
  """Import an os image into an instance.
1455

1456
  Args:
1457
    instance: the instance object
1458
    os_disk: the instance-visible name of the os device
1459
    swap_disk: the instance-visible name of the swap device
1460
    src_node: node holding the source image
1461
    src_image: path to the source image on src_node
1462

1463
  Returns:
1464
    False in case of error, True otherwise.
1465

1466
  """
1467
  cfg = _GetConfig()
1468
  inst_os = OSFromDisk(instance.os)
1469
  import_script = inst_os.import_script
1470

    
1471
  os_device = instance.FindDisk(os_disk)
1472
  if os_device is None:
1473
    logging.error("Can't find this device-visible name '%s'", os_disk)
1474
    return False
1475

    
1476
  swap_device = instance.FindDisk(swap_disk)
1477
  if swap_device is None:
1478
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1479
    return False
1480

    
1481
  real_os_dev = _RecursiveFindBD(os_device)
1482
  if real_os_dev is None:
1483
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1484
                                  str(os_device))
1485
  real_os_dev.Open()
1486

    
1487
  real_swap_dev = _RecursiveFindBD(swap_device)
1488
  if real_swap_dev is None:
1489
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1490
                                  str(swap_device))
1491
  real_swap_dev.Open()
1492

    
1493
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1494
                                        instance.name, int(time.time()))
1495
  if not os.path.exists(constants.LOG_OS_DIR):
1496
    os.mkdir(constants.LOG_OS_DIR, 0750)
1497

    
1498
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1499
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1500
                                                   constants.GANETI_RUNAS,
1501
                                                   destcmd)
1502

    
1503
  comprcmd = "gunzip"
1504
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1505
                               inst_os.path, import_script, instance.name,
1506
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1507
                               logfile)
1508

    
1509
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1510
  env = {'HYPERVISOR': cfg.GetHypervisorType()}
1511

    
1512
  result = utils.RunCmd(command, env=env)
1513

    
1514
  if result.failed:
1515
    logging.error("os import command '%s' returned error: %s"
1516
                  " output: %s", command, result.fail_reason, result.output)
1517
    return False
1518

    
1519
  return True
1520

    
1521

    
1522
def ListExports():
1523
  """Return a list of exports currently available on this machine.
1524

1525
  """
1526
  if os.path.isdir(constants.EXPORT_DIR):
1527
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1528
  else:
1529
    return []
1530

    
1531

    
1532
def RemoveExport(export):
1533
  """Remove an existing export from the node.
1534

1535
  Args:
1536
    export: the name of the export to remove
1537

1538
  Returns:
1539
    False in case of error, True otherwise.
1540

1541
  """
1542
  target = os.path.join(constants.EXPORT_DIR, export)
1543

    
1544
  shutil.rmtree(target)
1545
  # TODO: catch some of the relevant exceptions and provide a pretty
1546
  # error message if rmtree fails.
1547

    
1548
  return True
1549

    
1550

    
1551
def RenameBlockDevices(devlist):
1552
  """Rename a list of block devices.
1553

1554
  The devlist argument is a list of tuples (disk, new_logical,
1555
  new_physical). The return value will be a combined boolean result
1556
  (True only if all renames succeeded).
1557

1558
  """
1559
  result = True
1560
  for disk, unique_id in devlist:
1561
    dev = _RecursiveFindBD(disk)
1562
    if dev is None:
1563
      result = False
1564
      continue
1565
    try:
1566
      old_rpath = dev.dev_path
1567
      dev.Rename(unique_id)
1568
      new_rpath = dev.dev_path
1569
      if old_rpath != new_rpath:
1570
        DevCacheManager.RemoveCache(old_rpath)
1571
        # FIXME: we should add the new cache information here, like:
1572
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1573
        # but we don't have the owner here - maybe parse from existing
1574
        # cache? for now, we only lose lvm data when we rename, which
1575
        # is less critical than DRBD or MD
1576
    except errors.BlockDeviceError, err:
1577
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1578
      result = False
1579
  return result
1580

    
1581

    
1582
def _TransformFileStorageDir(file_storage_dir):
1583
  """Checks whether given file_storage_dir is valid.
1584

1585
  Checks wheter the given file_storage_dir is within the cluster-wide
1586
  default file_storage_dir stored in SimpleStore. Only paths under that
1587
  directory are allowed.
1588

1589
  Args:
1590
    file_storage_dir: string with path
1591

1592
  Returns:
1593
    normalized file_storage_dir (string) if valid, None otherwise
1594

1595
  """
1596
  cfg = _GetConfig()
1597
  file_storage_dir = os.path.normpath(file_storage_dir)
1598
  base_file_storage_dir = cfg.GetFileStorageDir()
1599
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1600
      base_file_storage_dir):
1601
    logging.error("file storage directory '%s' is not under base file"
1602
                  " storage directory '%s'",
1603
                  file_storage_dir, base_file_storage_dir)
1604
    return None
1605
  return file_storage_dir
1606

    
1607

    
1608
def CreateFileStorageDir(file_storage_dir):
1609
  """Create file storage directory.
1610

1611
  Args:
1612
    file_storage_dir: string containing the path
1613

1614
  Returns:
1615
    tuple with first element a boolean indicating wheter dir
1616
    creation was successful or not
1617

1618
  """
1619
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1620
  result = True,
1621
  if not file_storage_dir:
1622
    result = False,
1623
  else:
1624
    if os.path.exists(file_storage_dir):
1625
      if not os.path.isdir(file_storage_dir):
1626
        logging.error("'%s' is not a directory", file_storage_dir)
1627
        result = False,
1628
    else:
1629
      try:
1630
        os.makedirs(file_storage_dir, 0750)
1631
      except OSError, err:
1632
        logging.error("Cannot create file storage directory '%s': %s",
1633
                      file_storage_dir, err)
1634
        result = False,
1635
  return result
1636

    
1637

    
1638
def RemoveFileStorageDir(file_storage_dir):
1639
  """Remove file storage directory.
1640

1641
  Remove it only if it's empty. If not log an error and return.
1642

1643
  Args:
1644
    file_storage_dir: string containing the path
1645

1646
  Returns:
1647
    tuple with first element a boolean indicating wheter dir
1648
    removal was successful or not
1649

1650
  """
1651
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1652
  result = True,
1653
  if not file_storage_dir:
1654
    result = False,
1655
  else:
1656
    if os.path.exists(file_storage_dir):
1657
      if not os.path.isdir(file_storage_dir):
1658
        logging.error("'%s' is not a directory", file_storage_dir)
1659
        result = False,
1660
      # deletes dir only if empty, otherwise we want to return False
1661
      try:
1662
        os.rmdir(file_storage_dir)
1663
      except OSError, err:
1664
        logging.exception("Cannot remove file storage directory '%s'",
1665
                          file_storage_dir)
1666
        result = False,
1667
  return result
1668

    
1669

    
1670
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1671
  """Rename the file storage directory.
1672

1673
  Args:
1674
    old_file_storage_dir: string containing the old path
1675
    new_file_storage_dir: string containing the new path
1676

1677
  Returns:
1678
    tuple with first element a boolean indicating wheter dir
1679
    rename was successful or not
1680

1681
  """
1682
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1683
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1684
  result = True,
1685
  if not old_file_storage_dir or not new_file_storage_dir:
1686
    result = False,
1687
  else:
1688
    if not os.path.exists(new_file_storage_dir):
1689
      if os.path.isdir(old_file_storage_dir):
1690
        try:
1691
          os.rename(old_file_storage_dir, new_file_storage_dir)
1692
        except OSError, err:
1693
          logging.exception("Cannot rename '%s' to '%s'",
1694
                            old_file_storage_dir, new_file_storage_dir)
1695
          result =  False,
1696
      else:
1697
        logging.error("'%s' is not a directory", old_file_storage_dir)
1698
        result = False,
1699
    else:
1700
      if os.path.exists(old_file_storage_dir):
1701
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1702
                      old_file_storage_dir, new_file_storage_dir)
1703
        result = False,
1704
  return result
1705

    
1706

    
1707
def _IsJobQueueFile(file_name):
1708
  """Checks whether the given filename is in the queue directory.
1709

1710
  """
1711
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1712
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1713

    
1714
  if not result:
1715
    logging.error("'%s' is not a file in the queue directory",
1716
                  file_name)
1717

    
1718
  return result
1719

    
1720

    
1721
def JobQueueUpdate(file_name, content):
1722
  """Updates a file in the queue directory.
1723

1724
  """
1725
  if not _IsJobQueueFile(file_name):
1726
    return False
1727

    
1728
  # Write and replace the file atomically
1729
  utils.WriteFile(file_name, data=content)
1730

    
1731
  return True
1732

    
1733

    
1734
def JobQueueRename(old, new):
1735
  """Renames a job queue file.
1736

1737
  """
1738
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1739
    return False
1740

    
1741
  os.rename(old, new)
1742

    
1743
  return True
1744

    
1745

    
1746
def CloseBlockDevices(disks):
1747
  """Closes the given block devices.
1748

1749
  This means they will be switched to secondary mode (in case of DRBD).
1750

1751
  """
1752
  bdevs = []
1753
  for cf in disks:
1754
    rd = _RecursiveFindBD(cf)
1755
    if rd is None:
1756
      return (False, "Can't find device %s" % cf)
1757
    bdevs.append(rd)
1758

    
1759
  msg = []
1760
  for rd in bdevs:
1761
    try:
1762
      rd.Close()
1763
    except errors.BlockDeviceError, err:
1764
      msg.append(str(err))
1765
  if msg:
1766
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1767
  else:
1768
    return (True, "All devices secondary")
1769

    
1770

    
1771
class HooksRunner(object):
1772
  """Hook runner.
1773

1774
  This class is instantiated on the node side (ganeti-noded) and not on
1775
  the master side.
1776

1777
  """
1778
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1779

    
1780
  def __init__(self, hooks_base_dir=None):
1781
    """Constructor for hooks runner.
1782

1783
    Args:
1784
      - hooks_base_dir: if not None, this overrides the
1785
        constants.HOOKS_BASE_DIR (useful for unittests)
1786

1787
    """
1788
    if hooks_base_dir is None:
1789
      hooks_base_dir = constants.HOOKS_BASE_DIR
1790
    self._BASE_DIR = hooks_base_dir
1791

    
1792
  @staticmethod
1793
  def ExecHook(script, env):
1794
    """Exec one hook script.
1795

1796
    Args:
1797
     - script: the full path to the script
1798
     - env: the environment with which to exec the script
1799

1800
    """
1801
    # exec the process using subprocess and log the output
1802
    fdstdin = None
1803
    try:
1804
      fdstdin = open("/dev/null", "r")
1805
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1806
                               stderr=subprocess.STDOUT, close_fds=True,
1807
                               shell=False, cwd="/", env=env)
1808
      output = ""
1809
      try:
1810
        output = child.stdout.read(4096)
1811
        child.stdout.close()
1812
      except EnvironmentError, err:
1813
        output += "Hook script error: %s" % str(err)
1814

    
1815
      while True:
1816
        try:
1817
          result = child.wait()
1818
          break
1819
        except EnvironmentError, err:
1820
          if err.errno == errno.EINTR:
1821
            continue
1822
          raise
1823
    finally:
1824
      # try not to leak fds
1825
      for fd in (fdstdin, ):
1826
        if fd is not None:
1827
          try:
1828
            fd.close()
1829
          except EnvironmentError, err:
1830
            # just log the error
1831
            #logging.exception("Error while closing fd %s", fd)
1832
            pass
1833

    
1834
    return result == 0, output
1835

    
1836
  def RunHooks(self, hpath, phase, env):
1837
    """Run the scripts in the hooks directory.
1838

1839
    This method will not be usually overriden by child opcodes.
1840

1841
    """
1842
    if phase == constants.HOOKS_PHASE_PRE:
1843
      suffix = "pre"
1844
    elif phase == constants.HOOKS_PHASE_POST:
1845
      suffix = "post"
1846
    else:
1847
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1848
    rr = []
1849

    
1850
    subdir = "%s-%s.d" % (hpath, suffix)
1851
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1852
    try:
1853
      dir_contents = utils.ListVisibleFiles(dir_name)
1854
    except OSError, err:
1855
      # must log
1856
      return rr
1857

    
1858
    # we use the standard python sort order,
1859
    # so 00name is the recommended naming scheme
1860
    dir_contents.sort()
1861
    for relname in dir_contents:
1862
      fname = os.path.join(dir_name, relname)
1863
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1864
          self.RE_MASK.match(relname) is not None):
1865
        rrval = constants.HKR_SKIP
1866
        output = ""
1867
      else:
1868
        result, output = self.ExecHook(fname, env)
1869
        if not result:
1870
          rrval = constants.HKR_FAIL
1871
        else:
1872
          rrval = constants.HKR_SUCCESS
1873
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1874

    
1875
    return rr
1876

    
1877

    
1878
class IAllocatorRunner(object):
1879
  """IAllocator runner.
1880

1881
  This class is instantiated on the node side (ganeti-noded) and not on
1882
  the master side.
1883

1884
  """
1885
  def Run(self, name, idata):
1886
    """Run an iallocator script.
1887

1888
    Return value: tuple of:
1889
       - run status (one of the IARUN_ constants)
1890
       - stdout
1891
       - stderr
1892
       - fail reason (as from utils.RunResult)
1893

1894
    """
1895
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1896
                                  os.path.isfile)
1897
    if alloc_script is None:
1898
      return (constants.IARUN_NOTFOUND, None, None, None)
1899

    
1900
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1901
    try:
1902
      os.write(fd, idata)
1903
      os.close(fd)
1904
      result = utils.RunCmd([alloc_script, fin_name])
1905
      if result.failed:
1906
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1907
                result.fail_reason)
1908
    finally:
1909
      os.unlink(fin_name)
1910

    
1911
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1912

    
1913

    
1914
class DevCacheManager(object):
1915
  """Simple class for managing a cache of block device information.
1916

1917
  """
1918
  _DEV_PREFIX = "/dev/"
1919
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1920

    
1921
  @classmethod
1922
  def _ConvertPath(cls, dev_path):
1923
    """Converts a /dev/name path to the cache file name.
1924

1925
    This replaces slashes with underscores and strips the /dev
1926
    prefix. It then returns the full path to the cache file
1927

1928
    """
1929
    if dev_path.startswith(cls._DEV_PREFIX):
1930
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1931
    dev_path = dev_path.replace("/", "_")
1932
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1933
    return fpath
1934

    
1935
  @classmethod
1936
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1937
    """Updates the cache information for a given device.
1938

1939
    """
1940
    if dev_path is None:
1941
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1942
      return
1943
    fpath = cls._ConvertPath(dev_path)
1944
    if on_primary:
1945
      state = "primary"
1946
    else:
1947
      state = "secondary"
1948
    if iv_name is None:
1949
      iv_name = "not_visible"
1950
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1951
    try:
1952
      utils.WriteFile(fpath, data=fdata)
1953
    except EnvironmentError, err:
1954
      logging.exception("Can't update bdev cache for %s", dev_path)
1955

    
1956
  @classmethod
1957
  def RemoveCache(cls, dev_path):
1958
    """Remove data for a dev_path.
1959

1960
    """
1961
    if dev_path is None:
1962
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
1963
      return
1964
    fpath = cls._ConvertPath(dev_path)
1965
    try:
1966
      utils.RemoveFile(fpath)
1967
    except EnvironmentError, err:
1968
      logging.exception("Can't update bdev cache for %s", dev_path)