Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 24fc781f

History | View | Annotate | Download (55.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetSshRunner():
48
  return ssh.SshRunner()
49

    
50

    
51
def _CleanDirectory(path, exclude=[]):
52
  """Removes all regular files in a directory.
53

54
  @param exclude: List of files to be excluded.
55
  @type exclude: list
56

57
  """
58
  if not os.path.isdir(path):
59
    return
60

    
61
  # Normalize excluded paths
62
  exclude = [os.path.normpath(i) for i in exclude]
63

    
64
  for rel_name in utils.ListVisibleFiles(path):
65
    full_name = os.path.normpath(os.path.join(path, rel_name))
66
    if full_name in exclude:
67
      continue
68
    if os.path.isfile(full_name) and not os.path.islink(full_name):
69
      utils.RemoveFile(full_name)
70

    
71

    
72
def _JobQueuePurge(keep_lock):
73
  """Removes job queue files and archived jobs
74

75
  """
76
  if keep_lock:
77
    exclude = [constants.JOB_QUEUE_LOCK_FILE]
78
  else:
79
    exclude = []
80

    
81
  _CleanDirectory(constants.QUEUE_DIR, exclude=exclude)
82
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
83

    
84

    
85
def _GetMasterInfo():
86
  """Return the master ip and netdev.
87

88
  """
89
  try:
90
    ss = ssconf.SimpleStore()
91
    master_netdev = ss.GetMasterNetdev()
92
    master_ip = ss.GetMasterIP()
93
  except errors.ConfigurationError, err:
94
    logging.exception("Cluster configuration incomplete")
95
    return (None, None)
96
  return (master_netdev, master_ip)
97

    
98

    
99
def StartMaster(start_daemons):
100
  """Activate local node as master node.
101

102
  The function will always try activate the IP address of the master
103
  (if someone else has it, then it won't). Then, if the start_daemons
104
  parameter is True, it will also start the master daemons
105
  (ganet-masterd and ganeti-rapi).
106

107
  """
108
  ok = True
109
  master_netdev, master_ip = _GetMasterInfo()
110
  if not master_netdev:
111
    return False
112

    
113
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
114
    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
115
                     source=constants.LOCALHOST_IP_ADDRESS):
116
      # we already have the ip:
117
      logging.debug("Already started")
118
    else:
119
      logging.error("Someone else has the master ip, not activating")
120
      ok = False
121
  else:
122
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
123
                           "dev", master_netdev, "label",
124
                           "%s:0" % master_netdev])
125
    if result.failed:
126
      logging.error("Can't activate master IP: %s", result.output)
127
      ok = False
128

    
129
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
130
                           "-s", master_ip, master_ip])
131
    # we'll ignore the exit code of arping
132

    
133
  # and now start the master and rapi daemons
134
  if start_daemons:
135
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
136
      result = utils.RunCmd([daemon])
137
      if result.failed:
138
        logging.error("Can't start daemon %s: %s", daemon, result.output)
139
        ok = False
140
  return ok
141

    
142

    
143
def StopMaster(stop_daemons):
144
  """Deactivate this node as master.
145

146
  The function will always try to deactivate the IP address of the
147
  master. Then, if the stop_daemons parameter is True, it will also
148
  stop the master daemons (ganet-masterd and ganeti-rapi).
149

150
  """
151
  master_netdev, master_ip = _GetMasterInfo()
152
  if not master_netdev:
153
    return False
154

    
155
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
156
                         "dev", master_netdev])
157
  if result.failed:
158
    logging.error("Can't remove the master IP, error: %s", result.output)
159
    # but otherwise ignore the failure
160

    
161
  if stop_daemons:
162
    # stop/kill the rapi and the master daemon
163
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
164
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
165

    
166
  return True
167

    
168

    
169
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
170
  """Joins this node to the cluster.
171

172
  This does the following:
173
      - updates the hostkeys of the machine (rsa and dsa)
174
      - adds the ssh private key to the user
175
      - adds the ssh public key to the users' authorized_keys file
176

177
  """
178
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
179
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
180
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
181
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
182
  for name, content, mode in sshd_keys:
183
    utils.WriteFile(name, data=content, mode=mode)
184

    
185
  try:
186
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
187
                                                    mkdir=True)
188
  except errors.OpExecError, err:
189
    logging.exception("Error while processing user ssh files")
190
    return False
191

    
192
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
193
    utils.WriteFile(name, data=content, mode=0600)
194

    
195
  utils.AddAuthorizedKey(auth_keys, sshpub)
196

    
197
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
198

    
199
  return True
200

    
201

    
202
def LeaveCluster():
203
  """Cleans up the current node and prepares it to be removed from the cluster.
204

205
  """
206
  _CleanDirectory(constants.DATA_DIR)
207

    
208
  # The lock can be removed because we're going to quit anyway.
209
  _JobQueuePurge(keep_lock=False)
210

    
211
  try:
212
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
213
  except errors.OpExecError:
214
    logging.exception("Error while processing ssh files")
215
    return
216

    
217
  f = open(pub_key, 'r')
218
  try:
219
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
220
  finally:
221
    f.close()
222

    
223
  utils.RemoveFile(priv_key)
224
  utils.RemoveFile(pub_key)
225

    
226
  # Return a reassuring string to the caller, and quit
227
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
228

    
229

    
230
def GetNodeInfo(vgname):
231
  """Gives back a hash with different informations about the node.
232

233
  Returns:
234
    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
235
      'memory_free' : xxx, 'memory_total' : xxx }
236
    where
237
    vg_size is the size of the configured volume group in MiB
238
    vg_free is the free size of the volume group in MiB
239
    memory_dom0 is the memory allocated for domain0 in MiB
240
    memory_free is the currently available (free) ram in MiB
241
    memory_total is the total number of ram in MiB
242

243
  """
244
  outputarray = {}
245
  vginfo = _GetVGInfo(vgname)
246
  outputarray['vg_size'] = vginfo['vg_size']
247
  outputarray['vg_free'] = vginfo['vg_free']
248

    
249
  hyper = hypervisor.GetHypervisor()
250
  hyp_info = hyper.GetNodeInfo()
251
  if hyp_info is not None:
252
    outputarray.update(hyp_info)
253

    
254
  f = open("/proc/sys/kernel/random/boot_id", 'r')
255
  try:
256
    outputarray["bootid"] = f.read(128).rstrip("\n")
257
  finally:
258
    f.close()
259

    
260
  return outputarray
261

    
262

    
263
def VerifyNode(what):
264
  """Verify the status of the local node.
265

266
  Args:
267
    what - a dictionary of things to check:
268
      'filelist' : list of files for which to compute checksums
269
      'nodelist' : list of nodes we should check communication with
270
      'hypervisor': run the hypervisor-specific verify
271

272
  Requested files on local node are checksummed and the result returned.
273

274
  The nodelist is traversed, with the following checks being made
275
  for each node:
276
  - known_hosts key correct
277
  - correct resolving of node name (target node returns its own hostname
278
    by ssh-execution of 'hostname', result compared against name in list.
279

280
  """
281
  result = {}
282

    
283
  if 'hypervisor' in what:
284
    result['hypervisor'] = hypervisor.GetHypervisor().Verify()
285

    
286
  if 'filelist' in what:
287
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
288

    
289
  if 'nodelist' in what:
290
    result['nodelist'] = {}
291
    random.shuffle(what['nodelist'])
292
    for node in what['nodelist']:
293
      success, message = _GetSshRunner().VerifyNodeHostname(node)
294
      if not success:
295
        result['nodelist'][node] = message
296
  if 'node-net-test' in what:
297
    result['node-net-test'] = {}
298
    my_name = utils.HostInfo().name
299
    my_pip = my_sip = None
300
    for name, pip, sip in what['node-net-test']:
301
      if name == my_name:
302
        my_pip = pip
303
        my_sip = sip
304
        break
305
    if not my_pip:
306
      result['node-net-test'][my_name] = ("Can't find my own"
307
                                          " primary/secondary IP"
308
                                          " in the node list")
309
    else:
310
      port = ssconf.SimpleStore().GetNodeDaemonPort()
311
      for name, pip, sip in what['node-net-test']:
312
        fail = []
313
        if not utils.TcpPing(pip, port, source=my_pip):
314
          fail.append("primary")
315
        if sip != pip:
316
          if not utils.TcpPing(sip, port, source=my_sip):
317
            fail.append("secondary")
318
        if fail:
319
          result['node-net-test'][name] = ("failure using the %s"
320
                                           " interface(s)" %
321
                                           " and ".join(fail))
322

    
323
  return result
324

    
325

    
326
def GetVolumeList(vg_name):
327
  """Compute list of logical volumes and their size.
328

329
  Returns:
330
    dictionary of all partions (key) with their size (in MiB), inactive
331
    and online status:
332
    {'test1': ('20.06', True, True)}
333

334
  """
335
  lvs = {}
336
  sep = '|'
337
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
338
                         "--separator=%s" % sep,
339
                         "-olv_name,lv_size,lv_attr", vg_name])
340
  if result.failed:
341
    logging.error("Failed to list logical volumes, lvs output: %s",
342
                  result.output)
343
    return result.output
344

    
345
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
346
  for line in result.stdout.splitlines():
347
    line = line.strip()
348
    match = valid_line_re.match(line)
349
    if not match:
350
      logging.error("Invalid line returned from lvs output: '%s'", line)
351
      continue
352
    name, size, attr = match.groups()
353
    inactive = attr[4] == '-'
354
    online = attr[5] == 'o'
355
    lvs[name] = (size, inactive, online)
356

    
357
  return lvs
358

    
359

    
360
def ListVolumeGroups():
361
  """List the volume groups and their size.
362

363
  Returns:
364
    Dictionary with keys volume name and values the size of the volume
365

366
  """
367
  return utils.ListVolumeGroups()
368

    
369

    
370
def NodeVolumes():
371
  """List all volumes on this node.
372

373
  """
374
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
375
                         "--separator=|",
376
                         "--options=lv_name,lv_size,devices,vg_name"])
377
  if result.failed:
378
    logging.error("Failed to list logical volumes, lvs output: %s",
379
                  result.output)
380
    return {}
381

    
382
  def parse_dev(dev):
383
    if '(' in dev:
384
      return dev.split('(')[0]
385
    else:
386
      return dev
387

    
388
  def map_line(line):
389
    return {
390
      'name': line[0].strip(),
391
      'size': line[1].strip(),
392
      'dev': parse_dev(line[2].strip()),
393
      'vg': line[3].strip(),
394
    }
395

    
396
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
397
          if line.count('|') >= 3]
398

    
399

    
400
def BridgesExist(bridges_list):
401
  """Check if a list of bridges exist on the current node.
402

403
  Returns:
404
    True if all of them exist, false otherwise
405

406
  """
407
  for bridge in bridges_list:
408
    if not utils.BridgeExists(bridge):
409
      return False
410

    
411
  return True
412

    
413

    
414
def GetInstanceList():
415
  """Provides a list of instances.
416

417
  Returns:
418
    A list of all running instances on the current node
419
    - instance1.example.com
420
    - instance2.example.com
421

422
  """
423
  try:
424
    names = hypervisor.GetHypervisor().ListInstances()
425
  except errors.HypervisorError, err:
426
    logging.exception("Error enumerating instances")
427
    raise
428

    
429
  return names
430

    
431

    
432
def GetInstanceInfo(instance):
433
  """Gives back the informations about an instance as a dictionary.
434

435
  Args:
436
    instance: name of the instance (ex. instance1.example.com)
437

438
  Returns:
439
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
440
    where
441
    memory: memory size of instance (int)
442
    state: xen state of instance (string)
443
    time: cpu time of instance (float)
444

445
  """
446
  output = {}
447

    
448
  iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
449
  if iinfo is not None:
450
    output['memory'] = iinfo[2]
451
    output['state'] = iinfo[4]
452
    output['time'] = iinfo[5]
453

    
454
  return output
455

    
456

    
457
def GetAllInstancesInfo():
458
  """Gather data about all instances.
459

460
  This is the equivalent of `GetInstanceInfo()`, except that it
461
  computes data for all instances at once, thus being faster if one
462
  needs data about more than one instance.
463

464
  Returns: a dictionary of dictionaries, keys being the instance name,
465
    and with values:
466
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
467
    where
468
    memory: memory size of instance (int)
469
    state: xen state of instance (string)
470
    time: cpu time of instance (float)
471
    vcpus: the number of cpus
472

473
  """
474
  output = {}
475

    
476
  iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
477
  if iinfo:
478
    for name, inst_id, memory, vcpus, state, times in iinfo:
479
      output[name] = {
480
        'memory': memory,
481
        'vcpus': vcpus,
482
        'state': state,
483
        'time': times,
484
        }
485

    
486
  return output
487

    
488

    
489
def AddOSToInstance(instance, os_disk, swap_disk):
490
  """Add an OS to an instance.
491

492
  Args:
493
    instance: the instance object
494
    os_disk: the instance-visible name of the os device
495
    swap_disk: the instance-visible name of the swap device
496

497
  """
498
  inst_os = OSFromDisk(instance.os)
499

    
500
  create_script = inst_os.create_script
501

    
502
  os_device = instance.FindDisk(os_disk)
503
  if os_device is None:
504
    logging.error("Can't find this device-visible name '%s'", os_disk)
505
    return False
506

    
507
  swap_device = instance.FindDisk(swap_disk)
508
  if swap_device is None:
509
    logging.error("Can't find this device-visible name '%s'", swap_disk)
510
    return False
511

    
512
  real_os_dev = _RecursiveFindBD(os_device)
513
  if real_os_dev is None:
514
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
515
                                  str(os_device))
516
  real_os_dev.Open()
517

    
518
  real_swap_dev = _RecursiveFindBD(swap_device)
519
  if real_swap_dev is None:
520
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
521
                                  str(swap_device))
522
  real_swap_dev.Open()
523

    
524
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
525
                                     instance.name, int(time.time()))
526
  if not os.path.exists(constants.LOG_OS_DIR):
527
    os.mkdir(constants.LOG_OS_DIR, 0750)
528

    
529
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
530
                                inst_os.path, create_script, instance.name,
531
                                real_os_dev.dev_path, real_swap_dev.dev_path,
532
                                logfile)
533

    
534
  result = utils.RunCmd(command)
535
  if result.failed:
536
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
537
                  " output: %s", command, result.fail_reason, logfile,
538
                  result.output)
539
    return False
540

    
541
  return True
542

    
543

    
544
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
545
  """Run the OS rename script for an instance.
546

547
  Args:
548
    instance: the instance object
549
    old_name: the old name of the instance
550
    os_disk: the instance-visible name of the os device
551
    swap_disk: the instance-visible name of the swap device
552

553
  """
554
  inst_os = OSFromDisk(instance.os)
555

    
556
  script = inst_os.rename_script
557

    
558
  os_device = instance.FindDisk(os_disk)
559
  if os_device is None:
560
    logging.error("Can't find this device-visible name '%s'", os_disk)
561
    return False
562

    
563
  swap_device = instance.FindDisk(swap_disk)
564
  if swap_device is None:
565
    logging.error("Can't find this device-visible name '%s'", swap_disk)
566
    return False
567

    
568
  real_os_dev = _RecursiveFindBD(os_device)
569
  if real_os_dev is None:
570
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
571
                                  str(os_device))
572
  real_os_dev.Open()
573

    
574
  real_swap_dev = _RecursiveFindBD(swap_device)
575
  if real_swap_dev is None:
576
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
577
                                  str(swap_device))
578
  real_swap_dev.Open()
579

    
580
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
581
                                           old_name,
582
                                           instance.name, int(time.time()))
583
  if not os.path.exists(constants.LOG_OS_DIR):
584
    os.mkdir(constants.LOG_OS_DIR, 0750)
585

    
586
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
587
                                inst_os.path, script, old_name, instance.name,
588
                                real_os_dev.dev_path, real_swap_dev.dev_path,
589
                                logfile)
590

    
591
  result = utils.RunCmd(command)
592

    
593
  if result.failed:
594
    logging.error("os create command '%s' returned error: %s output: %s",
595
                  command, result.fail_reason, result.output)
596
    return False
597

    
598
  return True
599

    
600

    
601
def _GetVGInfo(vg_name):
602
  """Get informations about the volume group.
603

604
  Args:
605
    vg_name: the volume group
606

607
  Returns:
608
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
609
    where
610
    vg_size is the total size of the volume group in MiB
611
    vg_free is the free size of the volume group in MiB
612
    pv_count are the number of physical disks in that vg
613

614
  If an error occurs during gathering of data, we return the same dict
615
  with keys all set to None.
616

617
  """
618
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
619

    
620
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
621
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
622

    
623
  if retval.failed:
624
    logging.error("volume group %s not present", vg_name)
625
    return retdic
626
  valarr = retval.stdout.strip().rstrip(':').split(':')
627
  if len(valarr) == 3:
628
    try:
629
      retdic = {
630
        "vg_size": int(round(float(valarr[0]), 0)),
631
        "vg_free": int(round(float(valarr[1]), 0)),
632
        "pv_count": int(valarr[2]),
633
        }
634
    except ValueError, err:
635
      logging.exception("Fail to parse vgs output")
636
  else:
637
    logging.error("vgs output has the wrong number of fields (expected"
638
                  " three): %s", str(valarr))
639
  return retdic
640

    
641

    
642
def _GatherBlockDevs(instance):
643
  """Set up an instance's block device(s).
644

645
  This is run on the primary node at instance startup. The block
646
  devices must be already assembled.
647

648
  """
649
  block_devices = []
650
  for disk in instance.disks:
651
    device = _RecursiveFindBD(disk)
652
    if device is None:
653
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
654
                                    str(disk))
655
    device.Open()
656
    block_devices.append((disk, device))
657
  return block_devices
658

    
659

    
660
def StartInstance(instance, extra_args):
661
  """Start an instance.
662

663
  Args:
664
    instance - name of instance to start.
665

666
  """
667
  running_instances = GetInstanceList()
668

    
669
  if instance.name in running_instances:
670
    return True
671

    
672
  block_devices = _GatherBlockDevs(instance)
673
  hyper = hypervisor.GetHypervisor()
674

    
675
  try:
676
    hyper.StartInstance(instance, block_devices, extra_args)
677
  except errors.HypervisorError, err:
678
    logging.exception("Failed to start instance")
679
    return False
680

    
681
  return True
682

    
683

    
684
def ShutdownInstance(instance):
685
  """Shut an instance down.
686

687
  Args:
688
    instance - name of instance to shutdown.
689

690
  """
691
  running_instances = GetInstanceList()
692

    
693
  if instance.name not in running_instances:
694
    return True
695

    
696
  hyper = hypervisor.GetHypervisor()
697
  try:
698
    hyper.StopInstance(instance)
699
  except errors.HypervisorError, err:
700
    logging.error("Failed to stop instance")
701
    return False
702

    
703
  # test every 10secs for 2min
704
  shutdown_ok = False
705

    
706
  time.sleep(1)
707
  for dummy in range(11):
708
    if instance.name not in GetInstanceList():
709
      break
710
    time.sleep(10)
711
  else:
712
    # the shutdown did not succeed
713
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
714

    
715
    try:
716
      hyper.StopInstance(instance, force=True)
717
    except errors.HypervisorError, err:
718
      logging.exception("Failed to stop instance")
719
      return False
720

    
721
    time.sleep(1)
722
    if instance.name in GetInstanceList():
723
      logging.error("could not shutdown instance '%s' even by destroy",
724
                    instance.name)
725
      return False
726

    
727
  return True
728

    
729

    
730
def RebootInstance(instance, reboot_type, extra_args):
731
  """Reboot an instance.
732

733
  Args:
734
    instance    - name of instance to reboot
735
    reboot_type - how to reboot [soft,hard,full]
736

737
  """
738
  running_instances = GetInstanceList()
739

    
740
  if instance.name not in running_instances:
741
    logging.error("Cannot reboot instance that is not running")
742
    return False
743

    
744
  hyper = hypervisor.GetHypervisor()
745
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
746
    try:
747
      hyper.RebootInstance(instance)
748
    except errors.HypervisorError, err:
749
      logging.exception("Failed to soft reboot instance")
750
      return False
751
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
752
    try:
753
      ShutdownInstance(instance)
754
      StartInstance(instance, extra_args)
755
    except errors.HypervisorError, err:
756
      logging.exception("Failed to hard reboot instance")
757
      return False
758
  else:
759
    raise errors.ParameterError("reboot_type invalid")
760

    
761

    
762
  return True
763

    
764

    
765
def MigrateInstance(instance, target, live):
766
  """Migrates an instance to another node.
767

768
  """
769
  hyper = hypervisor.GetHypervisor()
770

    
771
  try:
772
    hyper.MigrateInstance(instance, target, live)
773
  except errors.HypervisorError, err:
774
    msg = "Failed to migrate instance: %s" % str(err)
775
    logging.error(msg)
776
    return (False, msg)
777
  return (True, "Migration successfull")
778

    
779

    
780
def CreateBlockDevice(disk, size, owner, on_primary, info):
781
  """Creates a block device for an instance.
782

783
  Args:
784
   disk: a ganeti.objects.Disk object
785
   size: the size of the physical underlying device
786
   owner: a string with the name of the instance
787
   on_primary: a boolean indicating if it is the primary node or not
788
   info: string that will be sent to the physical device creation
789

790
  Returns:
791
    the new unique_id of the device (this can sometime be
792
    computed only after creation), or None. On secondary nodes,
793
    it's not required to return anything.
794

795
  """
796
  clist = []
797
  if disk.children:
798
    for child in disk.children:
799
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
800
      if on_primary or disk.AssembleOnSecondary():
801
        # we need the children open in case the device itself has to
802
        # be assembled
803
        crdev.Open()
804
      clist.append(crdev)
805
  try:
806
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
807
    if device is not None:
808
      logging.info("removing existing device %s", disk)
809
      device.Remove()
810
  except errors.BlockDeviceError, err:
811
    pass
812

    
813
  device = bdev.Create(disk.dev_type, disk.physical_id,
814
                       clist, size)
815
  if device is None:
816
    raise ValueError("Can't create child device for %s, %s" %
817
                     (disk, size))
818
  if on_primary or disk.AssembleOnSecondary():
819
    if not device.Assemble():
820
      errorstring = "Can't assemble device after creation"
821
      logging.error(errorstring)
822
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
823
                                    " daemon logs" % errorstring)
824
    device.SetSyncSpeed(constants.SYNC_SPEED)
825
    if on_primary or disk.OpenOnSecondary():
826
      device.Open(force=True)
827
    DevCacheManager.UpdateCache(device.dev_path, owner,
828
                                on_primary, disk.iv_name)
829

    
830
  device.SetInfo(info)
831

    
832
  physical_id = device.unique_id
833
  return physical_id
834

    
835

    
836
def RemoveBlockDevice(disk):
837
  """Remove a block device.
838

839
  This is intended to be called recursively.
840

841
  """
842
  try:
843
    # since we are removing the device, allow a partial match
844
    # this allows removal of broken mirrors
845
    rdev = _RecursiveFindBD(disk, allow_partial=True)
846
  except errors.BlockDeviceError, err:
847
    # probably can't attach
848
    logging.info("Can't attach to device %s in remove", disk)
849
    rdev = None
850
  if rdev is not None:
851
    r_path = rdev.dev_path
852
    result = rdev.Remove()
853
    if result:
854
      DevCacheManager.RemoveCache(r_path)
855
  else:
856
    result = True
857
  if disk.children:
858
    for child in disk.children:
859
      result = result and RemoveBlockDevice(child)
860
  return result
861

    
862

    
863
def _RecursiveAssembleBD(disk, owner, as_primary):
864
  """Activate a block device for an instance.
865

866
  This is run on the primary and secondary nodes for an instance.
867

868
  This function is called recursively.
869

870
  Args:
871
    disk: a objects.Disk object
872
    as_primary: if we should make the block device read/write
873

874
  Returns:
875
    the assembled device or None (in case no device was assembled)
876

877
  If the assembly is not successful, an exception is raised.
878

879
  """
880
  children = []
881
  if disk.children:
882
    mcn = disk.ChildrenNeeded()
883
    if mcn == -1:
884
      mcn = 0 # max number of Nones allowed
885
    else:
886
      mcn = len(disk.children) - mcn # max number of Nones
887
    for chld_disk in disk.children:
888
      try:
889
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
890
      except errors.BlockDeviceError, err:
891
        if children.count(None) >= mcn:
892
          raise
893
        cdev = None
894
        logging.debug("Error in child activation: %s", str(err))
895
      children.append(cdev)
896

    
897
  if as_primary or disk.AssembleOnSecondary():
898
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
899
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
900
    result = r_dev
901
    if as_primary or disk.OpenOnSecondary():
902
      r_dev.Open()
903
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
904
                                as_primary, disk.iv_name)
905

    
906
  else:
907
    result = True
908
  return result
909

    
910

    
911
def AssembleBlockDevice(disk, owner, as_primary):
912
  """Activate a block device for an instance.
913

914
  This is a wrapper over _RecursiveAssembleBD.
915

916
  Returns:
917
    a /dev path for primary nodes
918
    True for secondary nodes
919

920
  """
921
  result = _RecursiveAssembleBD(disk, owner, as_primary)
922
  if isinstance(result, bdev.BlockDev):
923
    result = result.dev_path
924
  return result
925

    
926

    
927
def ShutdownBlockDevice(disk):
928
  """Shut down a block device.
929

930
  First, if the device is assembled (can `Attach()`), then the device
931
  is shutdown. Then the children of the device are shutdown.
932

933
  This function is called recursively. Note that we don't cache the
934
  children or such, as oppossed to assemble, shutdown of different
935
  devices doesn't require that the upper device was active.
936

937
  """
938
  r_dev = _RecursiveFindBD(disk)
939
  if r_dev is not None:
940
    r_path = r_dev.dev_path
941
    result = r_dev.Shutdown()
942
    if result:
943
      DevCacheManager.RemoveCache(r_path)
944
  else:
945
    result = True
946
  if disk.children:
947
    for child in disk.children:
948
      result = result and ShutdownBlockDevice(child)
949
  return result
950

    
951

    
952
def MirrorAddChildren(parent_cdev, new_cdevs):
953
  """Extend a mirrored block device.
954

955
  """
956
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
957
  if parent_bdev is None:
958
    logging.error("Can't find parent device")
959
    return False
960
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
961
  if new_bdevs.count(None) > 0:
962
    logging.error("Can't find new device(s) to add: %s:%s",
963
                  new_bdevs, new_cdevs)
964
    return False
965
  parent_bdev.AddChildren(new_bdevs)
966
  return True
967

    
968

    
969
def MirrorRemoveChildren(parent_cdev, new_cdevs):
970
  """Shrink a mirrored block device.
971

972
  """
973
  parent_bdev = _RecursiveFindBD(parent_cdev)
974
  if parent_bdev is None:
975
    logging.error("Can't find parent in remove children: %s", parent_cdev)
976
    return False
977
  devs = []
978
  for disk in new_cdevs:
979
    rpath = disk.StaticDevPath()
980
    if rpath is None:
981
      bd = _RecursiveFindBD(disk)
982
      if bd is None:
983
        logging.error("Can't find dynamic device %s while removing children",
984
                      disk)
985
        return False
986
      else:
987
        devs.append(bd.dev_path)
988
    else:
989
      devs.append(rpath)
990
  parent_bdev.RemoveChildren(devs)
991
  return True
992

    
993

    
994
def GetMirrorStatus(disks):
995
  """Get the mirroring status of a list of devices.
996

997
  Args:
998
    disks: list of `objects.Disk`
999

1000
  Returns:
1001
    list of (mirror_done, estimated_time) tuples, which
1002
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1003

1004
  """
1005
  stats = []
1006
  for dsk in disks:
1007
    rbd = _RecursiveFindBD(dsk)
1008
    if rbd is None:
1009
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1010
    stats.append(rbd.CombinedSyncStatus())
1011
  return stats
1012

    
1013

    
1014
def _RecursiveFindBD(disk, allow_partial=False):
1015
  """Check if a device is activated.
1016

1017
  If so, return informations about the real device.
1018

1019
  Args:
1020
    disk: the objects.Disk instance
1021
    allow_partial: don't abort the find if a child of the
1022
                   device can't be found; this is intended to be
1023
                   used when repairing mirrors
1024

1025
  Returns:
1026
    None if the device can't be found
1027
    otherwise the device instance
1028

1029
  """
1030
  children = []
1031
  if disk.children:
1032
    for chdisk in disk.children:
1033
      children.append(_RecursiveFindBD(chdisk))
1034

    
1035
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1036

    
1037

    
1038
def FindBlockDevice(disk):
1039
  """Check if a device is activated.
1040

1041
  If so, return informations about the real device.
1042

1043
  Args:
1044
    disk: the objects.Disk instance
1045
  Returns:
1046
    None if the device can't be found
1047
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1048

1049
  """
1050
  rbd = _RecursiveFindBD(disk)
1051
  if rbd is None:
1052
    return rbd
1053
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1054

    
1055

    
1056
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1057
  """Write a file to the filesystem.
1058

1059
  This allows the master to overwrite(!) a file. It will only perform
1060
  the operation if the file belongs to a list of configuration files.
1061

1062
  """
1063
  if not os.path.isabs(file_name):
1064
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1065
                  file_name)
1066
    return False
1067

    
1068
  allowed_files = [
1069
    constants.CLUSTER_CONF_FILE,
1070
    constants.ETC_HOSTS,
1071
    constants.SSH_KNOWN_HOSTS_FILE,
1072
    constants.VNC_PASSWORD_FILE,
1073
    ]
1074
  allowed_files.extend(ssconf.SimpleStore().GetFileList())
1075

    
1076
  if file_name not in allowed_files:
1077
    logging.error("Filename passed to UploadFile not in allowed"
1078
                 " upload targets: '%s'", file_name)
1079
    return False
1080

    
1081
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1082
                  atime=atime, mtime=mtime)
1083
  return True
1084

    
1085

    
1086
def _ErrnoOrStr(err):
1087
  """Format an EnvironmentError exception.
1088

1089
  If the `err` argument has an errno attribute, it will be looked up
1090
  and converted into a textual EXXXX description. Otherwise the string
1091
  representation of the error will be returned.
1092

1093
  """
1094
  if hasattr(err, 'errno'):
1095
    detail = errno.errorcode[err.errno]
1096
  else:
1097
    detail = str(err)
1098
  return detail
1099

    
1100

    
1101
def _OSOndiskVersion(name, os_dir):
1102
  """Compute and return the API version of a given OS.
1103

1104
  This function will try to read the API version of the os given by
1105
  the 'name' parameter and residing in the 'os_dir' directory.
1106

1107
  Return value will be either an integer denoting the version or None in the
1108
  case when this is not a valid OS name.
1109

1110
  """
1111
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1112

    
1113
  try:
1114
    st = os.stat(api_file)
1115
  except EnvironmentError, err:
1116
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1117
                           " found (%s)" % _ErrnoOrStr(err))
1118

    
1119
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1120
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1121
                           " a regular file")
1122

    
1123
  try:
1124
    f = open(api_file)
1125
    try:
1126
      api_version = f.read(256)
1127
    finally:
1128
      f.close()
1129
  except EnvironmentError, err:
1130
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1131
                           " API version (%s)" % _ErrnoOrStr(err))
1132

    
1133
  api_version = api_version.strip()
1134
  try:
1135
    api_version = int(api_version)
1136
  except (TypeError, ValueError), err:
1137
    raise errors.InvalidOS(name, os_dir,
1138
                           "API version is not integer (%s)" % str(err))
1139

    
1140
  return api_version
1141

    
1142

    
1143
def DiagnoseOS(top_dirs=None):
1144
  """Compute the validity for all OSes.
1145

1146
  Returns an OS object for each name in all the given top directories
1147
  (if not given defaults to constants.OS_SEARCH_PATH)
1148

1149
  Returns:
1150
    list of OS objects
1151

1152
  """
1153
  if top_dirs is None:
1154
    top_dirs = constants.OS_SEARCH_PATH
1155

    
1156
  result = []
1157
  for dir_name in top_dirs:
1158
    if os.path.isdir(dir_name):
1159
      try:
1160
        f_names = utils.ListVisibleFiles(dir_name)
1161
      except EnvironmentError, err:
1162
        logging.exception("Can't list the OS directory %s", dir_name)
1163
        break
1164
      for name in f_names:
1165
        try:
1166
          os_inst = OSFromDisk(name, base_dir=dir_name)
1167
          result.append(os_inst)
1168
        except errors.InvalidOS, err:
1169
          result.append(objects.OS.FromInvalidOS(err))
1170

    
1171
  return result
1172

    
1173

    
1174
def OSFromDisk(name, base_dir=None):
1175
  """Create an OS instance from disk.
1176

1177
  This function will return an OS instance if the given name is a
1178
  valid OS name. Otherwise, it will raise an appropriate
1179
  `errors.InvalidOS` exception, detailing why this is not a valid
1180
  OS.
1181

1182
  Args:
1183
    os_dir: Directory containing the OS scripts. Defaults to a search
1184
            in all the OS_SEARCH_PATH directories.
1185

1186
  """
1187

    
1188
  if base_dir is None:
1189
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1190
    if os_dir is None:
1191
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1192
  else:
1193
    os_dir = os.path.sep.join([base_dir, name])
1194

    
1195
  api_version = _OSOndiskVersion(name, os_dir)
1196

    
1197
  if api_version != constants.OS_API_VERSION:
1198
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1199
                           " (found %s want %s)"
1200
                           % (api_version, constants.OS_API_VERSION))
1201

    
1202
  # OS Scripts dictionary, we will populate it with the actual script names
1203
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1204

    
1205
  for script in os_scripts:
1206
    os_scripts[script] = os.path.sep.join([os_dir, script])
1207

    
1208
    try:
1209
      st = os.stat(os_scripts[script])
1210
    except EnvironmentError, err:
1211
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1212
                             (script, _ErrnoOrStr(err)))
1213

    
1214
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1215
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1216
                             script)
1217

    
1218
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1219
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1220
                             script)
1221

    
1222

    
1223
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1224
                    create_script=os_scripts['create'],
1225
                    export_script=os_scripts['export'],
1226
                    import_script=os_scripts['import'],
1227
                    rename_script=os_scripts['rename'],
1228
                    api_version=api_version)
1229

    
1230

    
1231
def GrowBlockDevice(disk, amount):
1232
  """Grow a stack of block devices.
1233

1234
  This function is called recursively, with the childrens being the
1235
  first one resize.
1236

1237
  Args:
1238
    disk: the disk to be grown
1239

1240
  Returns: a tuple of (status, result), with:
1241
    status: the result (true/false) of the operation
1242
    result: the error message if the operation failed, otherwise not used
1243

1244
  """
1245
  r_dev = _RecursiveFindBD(disk)
1246
  if r_dev is None:
1247
    return False, "Cannot find block device %s" % (disk,)
1248

    
1249
  try:
1250
    r_dev.Grow(amount)
1251
  except errors.BlockDeviceError, err:
1252
    return False, str(err)
1253

    
1254
  return True, None
1255

    
1256

    
1257
def SnapshotBlockDevice(disk):
1258
  """Create a snapshot copy of a block device.
1259

1260
  This function is called recursively, and the snapshot is actually created
1261
  just for the leaf lvm backend device.
1262

1263
  Args:
1264
    disk: the disk to be snapshotted
1265

1266
  Returns:
1267
    a config entry for the actual lvm device snapshotted.
1268

1269
  """
1270
  if disk.children:
1271
    if len(disk.children) == 1:
1272
      # only one child, let's recurse on it
1273
      return SnapshotBlockDevice(disk.children[0])
1274
    else:
1275
      # more than one child, choose one that matches
1276
      for child in disk.children:
1277
        if child.size == disk.size:
1278
          # return implies breaking the loop
1279
          return SnapshotBlockDevice(child)
1280
  elif disk.dev_type == constants.LD_LV:
1281
    r_dev = _RecursiveFindBD(disk)
1282
    if r_dev is not None:
1283
      # let's stay on the safe side and ask for the full size, for now
1284
      return r_dev.Snapshot(disk.size)
1285
    else:
1286
      return None
1287
  else:
1288
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1289
                                 " '%s' of type '%s'" %
1290
                                 (disk.unique_id, disk.dev_type))
1291

    
1292

    
1293
def ExportSnapshot(disk, dest_node, instance):
1294
  """Export a block device snapshot to a remote node.
1295

1296
  Args:
1297
    disk: the snapshot block device
1298
    dest_node: the node to send the image to
1299
    instance: instance being exported
1300

1301
  Returns:
1302
    True if successful, False otherwise.
1303

1304
  """
1305
  inst_os = OSFromDisk(instance.os)
1306
  export_script = inst_os.export_script
1307

    
1308
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1309
                                     instance.name, int(time.time()))
1310
  if not os.path.exists(constants.LOG_OS_DIR):
1311
    os.mkdir(constants.LOG_OS_DIR, 0750)
1312

    
1313
  real_os_dev = _RecursiveFindBD(disk)
1314
  if real_os_dev is None:
1315
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1316
                                  str(disk))
1317
  real_os_dev.Open()
1318

    
1319
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1320
  destfile = disk.physical_id[1]
1321

    
1322
  # the target command is built out of three individual commands,
1323
  # which are joined by pipes; we check each individual command for
1324
  # valid parameters
1325

    
1326
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1327
                               export_script, instance.name,
1328
                               real_os_dev.dev_path, logfile)
1329

    
1330
  comprcmd = "gzip"
1331

    
1332
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1333
                                destdir, destdir, destfile)
1334
  remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1335
                                       destcmd)
1336

    
1337
  # all commands have been checked, so we're safe to combine them
1338
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1339

    
1340
  result = utils.RunCmd(command)
1341

    
1342
  if result.failed:
1343
    logging.error("os snapshot export command '%s' returned error: %s"
1344
                  " output: %s", command, result.fail_reason, result.output)
1345
    return False
1346

    
1347
  return True
1348

    
1349

    
1350
def FinalizeExport(instance, snap_disks):
1351
  """Write out the export configuration information.
1352

1353
  Args:
1354
    instance: instance configuration
1355
    snap_disks: snapshot block devices
1356

1357
  Returns:
1358
    False in case of error, True otherwise.
1359

1360
  """
1361
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1362
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1363

    
1364
  config = objects.SerializableConfigParser()
1365

    
1366
  config.add_section(constants.INISECT_EXP)
1367
  config.set(constants.INISECT_EXP, 'version', '0')
1368
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1369
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1370
  config.set(constants.INISECT_EXP, 'os', instance.os)
1371
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1372

    
1373
  config.add_section(constants.INISECT_INS)
1374
  config.set(constants.INISECT_INS, 'name', instance.name)
1375
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1376
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1377
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1378

    
1379
  nic_count = 0
1380
  for nic_count, nic in enumerate(instance.nics):
1381
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1382
               nic_count, '%s' % nic.mac)
1383
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1384
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1385
               '%s' % nic.bridge)
1386
  # TODO: redundant: on load can read nics until it doesn't exist
1387
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1388

    
1389
  disk_count = 0
1390
  for disk_count, disk in enumerate(snap_disks):
1391
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1392
               ('%s' % disk.iv_name))
1393
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1394
               ('%s' % disk.physical_id[1]))
1395
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1396
               ('%d' % disk.size))
1397
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1398

    
1399
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1400
  cfo = open(cff, 'w')
1401
  try:
1402
    config.write(cfo)
1403
  finally:
1404
    cfo.close()
1405

    
1406
  shutil.rmtree(finaldestdir, True)
1407
  shutil.move(destdir, finaldestdir)
1408

    
1409
  return True
1410

    
1411

    
1412
def ExportInfo(dest):
1413
  """Get export configuration information.
1414

1415
  Args:
1416
    dest: directory containing the export
1417

1418
  Returns:
1419
    A serializable config file containing the export info.
1420

1421
  """
1422
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1423

    
1424
  config = objects.SerializableConfigParser()
1425
  config.read(cff)
1426

    
1427
  if (not config.has_section(constants.INISECT_EXP) or
1428
      not config.has_section(constants.INISECT_INS)):
1429
    return None
1430

    
1431
  return config
1432

    
1433

    
1434
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1435
  """Import an os image into an instance.
1436

1437
  Args:
1438
    instance: the instance object
1439
    os_disk: the instance-visible name of the os device
1440
    swap_disk: the instance-visible name of the swap device
1441
    src_node: node holding the source image
1442
    src_image: path to the source image on src_node
1443

1444
  Returns:
1445
    False in case of error, True otherwise.
1446

1447
  """
1448
  inst_os = OSFromDisk(instance.os)
1449
  import_script = inst_os.import_script
1450

    
1451
  os_device = instance.FindDisk(os_disk)
1452
  if os_device is None:
1453
    logging.error("Can't find this device-visible name '%s'", os_disk)
1454
    return False
1455

    
1456
  swap_device = instance.FindDisk(swap_disk)
1457
  if swap_device is None:
1458
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1459
    return False
1460

    
1461
  real_os_dev = _RecursiveFindBD(os_device)
1462
  if real_os_dev is None:
1463
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1464
                                  str(os_device))
1465
  real_os_dev.Open()
1466

    
1467
  real_swap_dev = _RecursiveFindBD(swap_device)
1468
  if real_swap_dev is None:
1469
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1470
                                  str(swap_device))
1471
  real_swap_dev.Open()
1472

    
1473
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1474
                                        instance.name, int(time.time()))
1475
  if not os.path.exists(constants.LOG_OS_DIR):
1476
    os.mkdir(constants.LOG_OS_DIR, 0750)
1477

    
1478
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1479
  remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1480
                                       destcmd)
1481

    
1482
  comprcmd = "gunzip"
1483
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1484
                               inst_os.path, import_script, instance.name,
1485
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1486
                               logfile)
1487

    
1488
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1489

    
1490
  result = utils.RunCmd(command)
1491

    
1492
  if result.failed:
1493
    logging.error("os import command '%s' returned error: %s"
1494
                  " output: %s", command, result.fail_reason, result.output)
1495
    return False
1496

    
1497
  return True
1498

    
1499

    
1500
def ListExports():
1501
  """Return a list of exports currently available on this machine.
1502

1503
  """
1504
  if os.path.isdir(constants.EXPORT_DIR):
1505
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1506
  else:
1507
    return []
1508

    
1509

    
1510
def RemoveExport(export):
1511
  """Remove an existing export from the node.
1512

1513
  Args:
1514
    export: the name of the export to remove
1515

1516
  Returns:
1517
    False in case of error, True otherwise.
1518

1519
  """
1520
  target = os.path.join(constants.EXPORT_DIR, export)
1521

    
1522
  shutil.rmtree(target)
1523
  # TODO: catch some of the relevant exceptions and provide a pretty
1524
  # error message if rmtree fails.
1525

    
1526
  return True
1527

    
1528

    
1529
def RenameBlockDevices(devlist):
1530
  """Rename a list of block devices.
1531

1532
  The devlist argument is a list of tuples (disk, new_logical,
1533
  new_physical). The return value will be a combined boolean result
1534
  (True only if all renames succeeded).
1535

1536
  """
1537
  result = True
1538
  for disk, unique_id in devlist:
1539
    dev = _RecursiveFindBD(disk)
1540
    if dev is None:
1541
      result = False
1542
      continue
1543
    try:
1544
      old_rpath = dev.dev_path
1545
      dev.Rename(unique_id)
1546
      new_rpath = dev.dev_path
1547
      if old_rpath != new_rpath:
1548
        DevCacheManager.RemoveCache(old_rpath)
1549
        # FIXME: we should add the new cache information here, like:
1550
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1551
        # but we don't have the owner here - maybe parse from existing
1552
        # cache? for now, we only lose lvm data when we rename, which
1553
        # is less critical than DRBD or MD
1554
    except errors.BlockDeviceError, err:
1555
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1556
      result = False
1557
  return result
1558

    
1559

    
1560
def _TransformFileStorageDir(file_storage_dir):
1561
  """Checks whether given file_storage_dir is valid.
1562

1563
  Checks wheter the given file_storage_dir is within the cluster-wide
1564
  default file_storage_dir stored in SimpleStore. Only paths under that
1565
  directory are allowed.
1566

1567
  Args:
1568
    file_storage_dir: string with path
1569

1570
  Returns:
1571
    normalized file_storage_dir (string) if valid, None otherwise
1572

1573
  """
1574
  file_storage_dir = os.path.normpath(file_storage_dir)
1575
  base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1576
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1577
      base_file_storage_dir):
1578
    logging.error("file storage directory '%s' is not under base file"
1579
                  " storage directory '%s'",
1580
                  file_storage_dir, base_file_storage_dir)
1581
    return None
1582
  return file_storage_dir
1583

    
1584

    
1585
def CreateFileStorageDir(file_storage_dir):
1586
  """Create file storage directory.
1587

1588
  Args:
1589
    file_storage_dir: string containing the path
1590

1591
  Returns:
1592
    tuple with first element a boolean indicating wheter dir
1593
    creation was successful or not
1594

1595
  """
1596
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1597
  result = True,
1598
  if not file_storage_dir:
1599
    result = False,
1600
  else:
1601
    if os.path.exists(file_storage_dir):
1602
      if not os.path.isdir(file_storage_dir):
1603
        logging.error("'%s' is not a directory", file_storage_dir)
1604
        result = False,
1605
    else:
1606
      try:
1607
        os.makedirs(file_storage_dir, 0750)
1608
      except OSError, err:
1609
        logging.error("Cannot create file storage directory '%s': %s",
1610
                      file_storage_dir, err)
1611
        result = False,
1612
  return result
1613

    
1614

    
1615
def RemoveFileStorageDir(file_storage_dir):
1616
  """Remove file storage directory.
1617

1618
  Remove it only if it's empty. If not log an error and return.
1619

1620
  Args:
1621
    file_storage_dir: string containing the path
1622

1623
  Returns:
1624
    tuple with first element a boolean indicating wheter dir
1625
    removal was successful or not
1626

1627
  """
1628
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1629
  result = True,
1630
  if not file_storage_dir:
1631
    result = False,
1632
  else:
1633
    if os.path.exists(file_storage_dir):
1634
      if not os.path.isdir(file_storage_dir):
1635
        logging.error("'%s' is not a directory", file_storage_dir)
1636
        result = False,
1637
      # deletes dir only if empty, otherwise we want to return False
1638
      try:
1639
        os.rmdir(file_storage_dir)
1640
      except OSError, err:
1641
        logging.exception("Cannot remove file storage directory '%s'",
1642
                          file_storage_dir)
1643
        result = False,
1644
  return result
1645

    
1646

    
1647
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1648
  """Rename the file storage directory.
1649

1650
  Args:
1651
    old_file_storage_dir: string containing the old path
1652
    new_file_storage_dir: string containing the new path
1653

1654
  Returns:
1655
    tuple with first element a boolean indicating wheter dir
1656
    rename was successful or not
1657

1658
  """
1659
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1660
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1661
  result = True,
1662
  if not old_file_storage_dir or not new_file_storage_dir:
1663
    result = False,
1664
  else:
1665
    if not os.path.exists(new_file_storage_dir):
1666
      if os.path.isdir(old_file_storage_dir):
1667
        try:
1668
          os.rename(old_file_storage_dir, new_file_storage_dir)
1669
        except OSError, err:
1670
          logging.exception("Cannot rename '%s' to '%s'",
1671
                            old_file_storage_dir, new_file_storage_dir)
1672
          result =  False,
1673
      else:
1674
        logging.error("'%s' is not a directory", old_file_storage_dir)
1675
        result = False,
1676
    else:
1677
      if os.path.exists(old_file_storage_dir):
1678
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1679
                      old_file_storage_dir, new_file_storage_dir)
1680
        result = False,
1681
  return result
1682

    
1683

    
1684
def _IsJobQueueFile(file_name):
1685
  """Checks whether the given filename is in the queue directory.
1686

1687
  """
1688
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1689
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1690

    
1691
  if not result:
1692
    logging.error("'%s' is not a file in the queue directory",
1693
                  file_name)
1694

    
1695
  return result
1696

    
1697

    
1698
def JobQueueUpdate(file_name, content):
1699
  """Updates a file in the queue directory.
1700

1701
  """
1702
  if not _IsJobQueueFile(file_name):
1703
    return False
1704

    
1705
  # Write and replace the file atomically
1706
  utils.WriteFile(file_name, data=content)
1707

    
1708
  return True
1709

    
1710

    
1711
def JobQueuePurge():
1712
  """Removes job queue files and archived jobs
1713

1714
  """
1715
  # The lock must not be removed, otherwise another process could create
1716
  # it again.
1717
  return _JobQueuePurge(keep_lock=True)
1718

    
1719

    
1720
def JobQueueRename(old, new):
1721
  """Renames a job queue file.
1722

1723
  """
1724
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1725
    return False
1726

    
1727
  os.rename(old, new)
1728

    
1729
  return True
1730

    
1731

    
1732
def CloseBlockDevices(disks):
1733
  """Closes the given block devices.
1734

1735
  This means they will be switched to secondary mode (in case of DRBD).
1736

1737
  """
1738
  bdevs = []
1739
  for cf in disks:
1740
    rd = _RecursiveFindBD(cf)
1741
    if rd is None:
1742
      return (False, "Can't find device %s" % cf)
1743
    bdevs.append(rd)
1744

    
1745
  msg = []
1746
  for rd in bdevs:
1747
    try:
1748
      rd.Close()
1749
    except errors.BlockDeviceError, err:
1750
      msg.append(str(err))
1751
  if msg:
1752
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1753
  else:
1754
    return (True, "All devices secondary")
1755

    
1756

    
1757
class HooksRunner(object):
1758
  """Hook runner.
1759

1760
  This class is instantiated on the node side (ganeti-noded) and not on
1761
  the master side.
1762

1763
  """
1764
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1765

    
1766
  def __init__(self, hooks_base_dir=None):
1767
    """Constructor for hooks runner.
1768

1769
    Args:
1770
      - hooks_base_dir: if not None, this overrides the
1771
        constants.HOOKS_BASE_DIR (useful for unittests)
1772

1773
    """
1774
    if hooks_base_dir is None:
1775
      hooks_base_dir = constants.HOOKS_BASE_DIR
1776
    self._BASE_DIR = hooks_base_dir
1777

    
1778
  @staticmethod
1779
  def ExecHook(script, env):
1780
    """Exec one hook script.
1781

1782
    Args:
1783
     - script: the full path to the script
1784
     - env: the environment with which to exec the script
1785

1786
    """
1787
    # exec the process using subprocess and log the output
1788
    fdstdin = None
1789
    try:
1790
      fdstdin = open("/dev/null", "r")
1791
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1792
                               stderr=subprocess.STDOUT, close_fds=True,
1793
                               shell=False, cwd="/", env=env)
1794
      output = ""
1795
      try:
1796
        output = child.stdout.read(4096)
1797
        child.stdout.close()
1798
      except EnvironmentError, err:
1799
        output += "Hook script error: %s" % str(err)
1800

    
1801
      while True:
1802
        try:
1803
          result = child.wait()
1804
          break
1805
        except EnvironmentError, err:
1806
          if err.errno == errno.EINTR:
1807
            continue
1808
          raise
1809
    finally:
1810
      # try not to leak fds
1811
      for fd in (fdstdin, ):
1812
        if fd is not None:
1813
          try:
1814
            fd.close()
1815
          except EnvironmentError, err:
1816
            # just log the error
1817
            #logging.exception("Error while closing fd %s", fd)
1818
            pass
1819

    
1820
    return result == 0, output
1821

    
1822
  def RunHooks(self, hpath, phase, env):
1823
    """Run the scripts in the hooks directory.
1824

1825
    This method will not be usually overriden by child opcodes.
1826

1827
    """
1828
    if phase == constants.HOOKS_PHASE_PRE:
1829
      suffix = "pre"
1830
    elif phase == constants.HOOKS_PHASE_POST:
1831
      suffix = "post"
1832
    else:
1833
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1834
    rr = []
1835

    
1836
    subdir = "%s-%s.d" % (hpath, suffix)
1837
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1838
    try:
1839
      dir_contents = utils.ListVisibleFiles(dir_name)
1840
    except OSError, err:
1841
      # must log
1842
      return rr
1843

    
1844
    # we use the standard python sort order,
1845
    # so 00name is the recommended naming scheme
1846
    dir_contents.sort()
1847
    for relname in dir_contents:
1848
      fname = os.path.join(dir_name, relname)
1849
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1850
          self.RE_MASK.match(relname) is not None):
1851
        rrval = constants.HKR_SKIP
1852
        output = ""
1853
      else:
1854
        result, output = self.ExecHook(fname, env)
1855
        if not result:
1856
          rrval = constants.HKR_FAIL
1857
        else:
1858
          rrval = constants.HKR_SUCCESS
1859
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1860

    
1861
    return rr
1862

    
1863

    
1864
class IAllocatorRunner(object):
1865
  """IAllocator runner.
1866

1867
  This class is instantiated on the node side (ganeti-noded) and not on
1868
  the master side.
1869

1870
  """
1871
  def Run(self, name, idata):
1872
    """Run an iallocator script.
1873

1874
    Return value: tuple of:
1875
       - run status (one of the IARUN_ constants)
1876
       - stdout
1877
       - stderr
1878
       - fail reason (as from utils.RunResult)
1879

1880
    """
1881
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1882
                                  os.path.isfile)
1883
    if alloc_script is None:
1884
      return (constants.IARUN_NOTFOUND, None, None, None)
1885

    
1886
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1887
    try:
1888
      os.write(fd, idata)
1889
      os.close(fd)
1890
      result = utils.RunCmd([alloc_script, fin_name])
1891
      if result.failed:
1892
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1893
                result.fail_reason)
1894
    finally:
1895
      os.unlink(fin_name)
1896

    
1897
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1898

    
1899

    
1900
class DevCacheManager(object):
1901
  """Simple class for managing a cache of block device information.
1902

1903
  """
1904
  _DEV_PREFIX = "/dev/"
1905
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1906

    
1907
  @classmethod
1908
  def _ConvertPath(cls, dev_path):
1909
    """Converts a /dev/name path to the cache file name.
1910

1911
    This replaces slashes with underscores and strips the /dev
1912
    prefix. It then returns the full path to the cache file
1913

1914
    """
1915
    if dev_path.startswith(cls._DEV_PREFIX):
1916
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1917
    dev_path = dev_path.replace("/", "_")
1918
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1919
    return fpath
1920

    
1921
  @classmethod
1922
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1923
    """Updates the cache information for a given device.
1924

1925
    """
1926
    if dev_path is None:
1927
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1928
      return
1929
    fpath = cls._ConvertPath(dev_path)
1930
    if on_primary:
1931
      state = "primary"
1932
    else:
1933
      state = "secondary"
1934
    if iv_name is None:
1935
      iv_name = "not_visible"
1936
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1937
    try:
1938
      utils.WriteFile(fpath, data=fdata)
1939
    except EnvironmentError, err:
1940
      logging.exception("Can't update bdev cache for %s", dev_path)
1941

    
1942
  @classmethod
1943
  def RemoveCache(cls, dev_path):
1944
    """Remove data for a dev_path.
1945

1946
    """
1947
    if dev_path is None:
1948
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
1949
      return
1950
    fpath = cls._ConvertPath(dev_path)
1951
    try:
1952
      utils.RemoveFile(fpath)
1953
    except EnvironmentError, err:
1954
      logging.exception("Can't update bdev cache for %s", dev_path)