Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ bd1e4562

History | View | Annotate | Download (55.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetSshRunner():
48
  return ssh.SshRunner()
49

    
50

    
51
def _CleanDirectory(path, exclude=[]):
52
  """Removes all regular files in a directory.
53

54
  @param exclude: List of files to be excluded.
55
  @type exclude: list
56

57
  """
58
  if not os.path.isdir(path):
59
    return
60

    
61
  # Normalize excluded paths
62
  exclude = [os.path.normpath(i) for i in exclude]
63

    
64
  for rel_name in utils.ListVisibleFiles(path):
65
    full_name = os.path.normpath(os.path.join(path, rel_name))
66
    if full_name in exclude:
67
      continue
68
    if os.path.isfile(full_name) and not os.path.islink(full_name):
69
      utils.RemoveFile(full_name)
70

    
71

    
72
def _JobQueuePurge(keep_lock):
73
  """Removes job queue files and archived jobs
74

75
  """
76
  if keep_lock:
77
    exclude = [constants.JOB_QUEUE_LOCK_FILE]
78
  else:
79
    exclude = []
80

    
81
  _CleanDirectory(constants.QUEUE_DIR, exclude=exclude)
82
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
83

    
84

    
85
def GetMasterInfo():
86
  """Returns master information.
87

88
  This is an utility function to compute master information, either
89
  for consumption here or from the node daemon.
90

91
  @rtype: tuple
92
  @return: (master_netdev, master_ip, master_name)
93

94
  """
95
  try:
96
    ss = ssconf.SimpleStore()
97
    master_netdev = ss.GetMasterNetdev()
98
    master_ip = ss.GetMasterIP()
99
    master_node = ss.GetMasterNode()
100
  except errors.ConfigurationError, err:
101
    logging.exception("Cluster configuration incomplete")
102
    return (None, None)
103
  return (master_netdev, master_ip, master_node)
104

    
105

    
106
def StartMaster(start_daemons):
107
  """Activate local node as master node.
108

109
  The function will always try activate the IP address of the master
110
  (if someone else has it, then it won't). Then, if the start_daemons
111
  parameter is True, it will also start the master daemons
112
  (ganet-masterd and ganeti-rapi).
113

114
  """
115
  ok = True
116
  master_netdev, master_ip, _ = GetMasterInfo()
117
  if not master_netdev:
118
    return False
119

    
120
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
121
    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
122
                     source=constants.LOCALHOST_IP_ADDRESS):
123
      # we already have the ip:
124
      logging.debug("Already started")
125
    else:
126
      logging.error("Someone else has the master ip, not activating")
127
      ok = False
128
  else:
129
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
130
                           "dev", master_netdev, "label",
131
                           "%s:0" % master_netdev])
132
    if result.failed:
133
      logging.error("Can't activate master IP: %s", result.output)
134
      ok = False
135

    
136
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
137
                           "-s", master_ip, master_ip])
138
    # we'll ignore the exit code of arping
139

    
140
  # and now start the master and rapi daemons
141
  if start_daemons:
142
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
143
      result = utils.RunCmd([daemon])
144
      if result.failed:
145
        logging.error("Can't start daemon %s: %s", daemon, result.output)
146
        ok = False
147
  return ok
148

    
149

    
150
def StopMaster(stop_daemons):
151
  """Deactivate this node as master.
152

153
  The function will always try to deactivate the IP address of the
154
  master. Then, if the stop_daemons parameter is True, it will also
155
  stop the master daemons (ganet-masterd and ganeti-rapi).
156

157
  """
158
  master_netdev, master_ip, _ = GetMasterInfo()
159
  if not master_netdev:
160
    return False
161

    
162
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
163
                         "dev", master_netdev])
164
  if result.failed:
165
    logging.error("Can't remove the master IP, error: %s", result.output)
166
    # but otherwise ignore the failure
167

    
168
  if stop_daemons:
169
    # stop/kill the rapi and the master daemon
170
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
171
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
172

    
173
  return True
174

    
175

    
176
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
177
  """Joins this node to the cluster.
178

179
  This does the following:
180
      - updates the hostkeys of the machine (rsa and dsa)
181
      - adds the ssh private key to the user
182
      - adds the ssh public key to the users' authorized_keys file
183

184
  """
185
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
186
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
187
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
188
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
189
  for name, content, mode in sshd_keys:
190
    utils.WriteFile(name, data=content, mode=mode)
191

    
192
  try:
193
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
194
                                                    mkdir=True)
195
  except errors.OpExecError, err:
196
    logging.exception("Error while processing user ssh files")
197
    return False
198

    
199
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
200
    utils.WriteFile(name, data=content, mode=0600)
201

    
202
  utils.AddAuthorizedKey(auth_keys, sshpub)
203

    
204
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
205

    
206
  return True
207

    
208

    
209
def LeaveCluster():
210
  """Cleans up the current node and prepares it to be removed from the cluster.
211

212
  """
213
  _CleanDirectory(constants.DATA_DIR)
214

    
215
  # The lock can be removed because we're going to quit anyway.
216
  _JobQueuePurge(keep_lock=False)
217

    
218
  try:
219
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
220
  except errors.OpExecError:
221
    logging.exception("Error while processing ssh files")
222
    return
223

    
224
  f = open(pub_key, 'r')
225
  try:
226
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
227
  finally:
228
    f.close()
229

    
230
  utils.RemoveFile(priv_key)
231
  utils.RemoveFile(pub_key)
232

    
233
  # Return a reassuring string to the caller, and quit
234
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
235

    
236

    
237
def GetNodeInfo(vgname):
238
  """Gives back a hash with different informations about the node.
239

240
  Returns:
241
    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
242
      'memory_free' : xxx, 'memory_total' : xxx }
243
    where
244
    vg_size is the size of the configured volume group in MiB
245
    vg_free is the free size of the volume group in MiB
246
    memory_dom0 is the memory allocated for domain0 in MiB
247
    memory_free is the currently available (free) ram in MiB
248
    memory_total is the total number of ram in MiB
249

250
  """
251
  outputarray = {}
252
  vginfo = _GetVGInfo(vgname)
253
  outputarray['vg_size'] = vginfo['vg_size']
254
  outputarray['vg_free'] = vginfo['vg_free']
255

    
256
  hyper = hypervisor.GetHypervisor()
257
  hyp_info = hyper.GetNodeInfo()
258
  if hyp_info is not None:
259
    outputarray.update(hyp_info)
260

    
261
  f = open("/proc/sys/kernel/random/boot_id", 'r')
262
  try:
263
    outputarray["bootid"] = f.read(128).rstrip("\n")
264
  finally:
265
    f.close()
266

    
267
  return outputarray
268

    
269

    
270
def VerifyNode(what):
271
  """Verify the status of the local node.
272

273
  Args:
274
    what - a dictionary of things to check:
275
      'filelist' : list of files for which to compute checksums
276
      'nodelist' : list of nodes we should check communication with
277
      'hypervisor': run the hypervisor-specific verify
278

279
  Requested files on local node are checksummed and the result returned.
280

281
  The nodelist is traversed, with the following checks being made
282
  for each node:
283
  - known_hosts key correct
284
  - correct resolving of node name (target node returns its own hostname
285
    by ssh-execution of 'hostname', result compared against name in list.
286

287
  """
288
  result = {}
289

    
290
  if 'hypervisor' in what:
291
    result['hypervisor'] = hypervisor.GetHypervisor().Verify()
292

    
293
  if 'filelist' in what:
294
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
295

    
296
  if 'nodelist' in what:
297
    result['nodelist'] = {}
298
    random.shuffle(what['nodelist'])
299
    for node in what['nodelist']:
300
      success, message = _GetSshRunner().VerifyNodeHostname(node)
301
      if not success:
302
        result['nodelist'][node] = message
303
  if 'node-net-test' in what:
304
    result['node-net-test'] = {}
305
    my_name = utils.HostInfo().name
306
    my_pip = my_sip = None
307
    for name, pip, sip in what['node-net-test']:
308
      if name == my_name:
309
        my_pip = pip
310
        my_sip = sip
311
        break
312
    if not my_pip:
313
      result['node-net-test'][my_name] = ("Can't find my own"
314
                                          " primary/secondary IP"
315
                                          " in the node list")
316
    else:
317
      port = ssconf.SimpleStore().GetNodeDaemonPort()
318
      for name, pip, sip in what['node-net-test']:
319
        fail = []
320
        if not utils.TcpPing(pip, port, source=my_pip):
321
          fail.append("primary")
322
        if sip != pip:
323
          if not utils.TcpPing(sip, port, source=my_sip):
324
            fail.append("secondary")
325
        if fail:
326
          result['node-net-test'][name] = ("failure using the %s"
327
                                           " interface(s)" %
328
                                           " and ".join(fail))
329

    
330
  return result
331

    
332

    
333
def GetVolumeList(vg_name):
334
  """Compute list of logical volumes and their size.
335

336
  Returns:
337
    dictionary of all partions (key) with their size (in MiB), inactive
338
    and online status:
339
    {'test1': ('20.06', True, True)}
340

341
  """
342
  lvs = {}
343
  sep = '|'
344
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
345
                         "--separator=%s" % sep,
346
                         "-olv_name,lv_size,lv_attr", vg_name])
347
  if result.failed:
348
    logging.error("Failed to list logical volumes, lvs output: %s",
349
                  result.output)
350
    return result.output
351

    
352
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
353
  for line in result.stdout.splitlines():
354
    line = line.strip()
355
    match = valid_line_re.match(line)
356
    if not match:
357
      logging.error("Invalid line returned from lvs output: '%s'", line)
358
      continue
359
    name, size, attr = match.groups()
360
    inactive = attr[4] == '-'
361
    online = attr[5] == 'o'
362
    lvs[name] = (size, inactive, online)
363

    
364
  return lvs
365

    
366

    
367
def ListVolumeGroups():
368
  """List the volume groups and their size.
369

370
  Returns:
371
    Dictionary with keys volume name and values the size of the volume
372

373
  """
374
  return utils.ListVolumeGroups()
375

    
376

    
377
def NodeVolumes():
378
  """List all volumes on this node.
379

380
  """
381
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
382
                         "--separator=|",
383
                         "--options=lv_name,lv_size,devices,vg_name"])
384
  if result.failed:
385
    logging.error("Failed to list logical volumes, lvs output: %s",
386
                  result.output)
387
    return {}
388

    
389
  def parse_dev(dev):
390
    if '(' in dev:
391
      return dev.split('(')[0]
392
    else:
393
      return dev
394

    
395
  def map_line(line):
396
    return {
397
      'name': line[0].strip(),
398
      'size': line[1].strip(),
399
      'dev': parse_dev(line[2].strip()),
400
      'vg': line[3].strip(),
401
    }
402

    
403
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
404
          if line.count('|') >= 3]
405

    
406

    
407
def BridgesExist(bridges_list):
408
  """Check if a list of bridges exist on the current node.
409

410
  Returns:
411
    True if all of them exist, false otherwise
412

413
  """
414
  for bridge in bridges_list:
415
    if not utils.BridgeExists(bridge):
416
      return False
417

    
418
  return True
419

    
420

    
421
def GetInstanceList():
422
  """Provides a list of instances.
423

424
  Returns:
425
    A list of all running instances on the current node
426
    - instance1.example.com
427
    - instance2.example.com
428

429
  """
430
  try:
431
    names = hypervisor.GetHypervisor().ListInstances()
432
  except errors.HypervisorError, err:
433
    logging.exception("Error enumerating instances")
434
    raise
435

    
436
  return names
437

    
438

    
439
def GetInstanceInfo(instance):
440
  """Gives back the informations about an instance as a dictionary.
441

442
  Args:
443
    instance: name of the instance (ex. instance1.example.com)
444

445
  Returns:
446
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
447
    where
448
    memory: memory size of instance (int)
449
    state: xen state of instance (string)
450
    time: cpu time of instance (float)
451

452
  """
453
  output = {}
454

    
455
  iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
456
  if iinfo is not None:
457
    output['memory'] = iinfo[2]
458
    output['state'] = iinfo[4]
459
    output['time'] = iinfo[5]
460

    
461
  return output
462

    
463

    
464
def GetAllInstancesInfo():
465
  """Gather data about all instances.
466

467
  This is the equivalent of `GetInstanceInfo()`, except that it
468
  computes data for all instances at once, thus being faster if one
469
  needs data about more than one instance.
470

471
  Returns: a dictionary of dictionaries, keys being the instance name,
472
    and with values:
473
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
474
    where
475
    memory: memory size of instance (int)
476
    state: xen state of instance (string)
477
    time: cpu time of instance (float)
478
    vcpus: the number of cpus
479

480
  """
481
  output = {}
482

    
483
  iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
484
  if iinfo:
485
    for name, inst_id, memory, vcpus, state, times in iinfo:
486
      output[name] = {
487
        'memory': memory,
488
        'vcpus': vcpus,
489
        'state': state,
490
        'time': times,
491
        }
492

    
493
  return output
494

    
495

    
496
def AddOSToInstance(instance, os_disk, swap_disk):
497
  """Add an OS to an instance.
498

499
  Args:
500
    instance: the instance object
501
    os_disk: the instance-visible name of the os device
502
    swap_disk: the instance-visible name of the swap device
503

504
  """
505
  inst_os = OSFromDisk(instance.os)
506

    
507
  create_script = inst_os.create_script
508

    
509
  os_device = instance.FindDisk(os_disk)
510
  if os_device is None:
511
    logging.error("Can't find this device-visible name '%s'", os_disk)
512
    return False
513

    
514
  swap_device = instance.FindDisk(swap_disk)
515
  if swap_device is None:
516
    logging.error("Can't find this device-visible name '%s'", swap_disk)
517
    return False
518

    
519
  real_os_dev = _RecursiveFindBD(os_device)
520
  if real_os_dev is None:
521
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
522
                                  str(os_device))
523
  real_os_dev.Open()
524

    
525
  real_swap_dev = _RecursiveFindBD(swap_device)
526
  if real_swap_dev is None:
527
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
528
                                  str(swap_device))
529
  real_swap_dev.Open()
530

    
531
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
532
                                     instance.name, int(time.time()))
533
  if not os.path.exists(constants.LOG_OS_DIR):
534
    os.mkdir(constants.LOG_OS_DIR, 0750)
535

    
536
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
537
                                inst_os.path, create_script, instance.name,
538
                                real_os_dev.dev_path, real_swap_dev.dev_path,
539
                                logfile)
540
  env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
541

    
542
  result = utils.RunCmd(command, env=env)
543
  if result.failed:
544
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
545
                  " output: %s", command, result.fail_reason, logfile,
546
                  result.output)
547
    return False
548

    
549
  return True
550

    
551

    
552
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
553
  """Run the OS rename script for an instance.
554

555
  Args:
556
    instance: the instance object
557
    old_name: the old name of the instance
558
    os_disk: the instance-visible name of the os device
559
    swap_disk: the instance-visible name of the swap device
560

561
  """
562
  inst_os = OSFromDisk(instance.os)
563

    
564
  script = inst_os.rename_script
565

    
566
  os_device = instance.FindDisk(os_disk)
567
  if os_device is None:
568
    logging.error("Can't find this device-visible name '%s'", os_disk)
569
    return False
570

    
571
  swap_device = instance.FindDisk(swap_disk)
572
  if swap_device is None:
573
    logging.error("Can't find this device-visible name '%s'", swap_disk)
574
    return False
575

    
576
  real_os_dev = _RecursiveFindBD(os_device)
577
  if real_os_dev is None:
578
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
579
                                  str(os_device))
580
  real_os_dev.Open()
581

    
582
  real_swap_dev = _RecursiveFindBD(swap_device)
583
  if real_swap_dev is None:
584
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
585
                                  str(swap_device))
586
  real_swap_dev.Open()
587

    
588
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
589
                                           old_name,
590
                                           instance.name, int(time.time()))
591
  if not os.path.exists(constants.LOG_OS_DIR):
592
    os.mkdir(constants.LOG_OS_DIR, 0750)
593

    
594
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
595
                                inst_os.path, script, old_name, instance.name,
596
                                real_os_dev.dev_path, real_swap_dev.dev_path,
597
                                logfile)
598

    
599
  result = utils.RunCmd(command)
600

    
601
  if result.failed:
602
    logging.error("os create command '%s' returned error: %s output: %s",
603
                  command, result.fail_reason, result.output)
604
    return False
605

    
606
  return True
607

    
608

    
609
def _GetVGInfo(vg_name):
610
  """Get informations about the volume group.
611

612
  Args:
613
    vg_name: the volume group
614

615
  Returns:
616
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
617
    where
618
    vg_size is the total size of the volume group in MiB
619
    vg_free is the free size of the volume group in MiB
620
    pv_count are the number of physical disks in that vg
621

622
  If an error occurs during gathering of data, we return the same dict
623
  with keys all set to None.
624

625
  """
626
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
627

    
628
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
629
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
630

    
631
  if retval.failed:
632
    logging.error("volume group %s not present", vg_name)
633
    return retdic
634
  valarr = retval.stdout.strip().rstrip(':').split(':')
635
  if len(valarr) == 3:
636
    try:
637
      retdic = {
638
        "vg_size": int(round(float(valarr[0]), 0)),
639
        "vg_free": int(round(float(valarr[1]), 0)),
640
        "pv_count": int(valarr[2]),
641
        }
642
    except ValueError, err:
643
      logging.exception("Fail to parse vgs output")
644
  else:
645
    logging.error("vgs output has the wrong number of fields (expected"
646
                  " three): %s", str(valarr))
647
  return retdic
648

    
649

    
650
def _GatherBlockDevs(instance):
651
  """Set up an instance's block device(s).
652

653
  This is run on the primary node at instance startup. The block
654
  devices must be already assembled.
655

656
  """
657
  block_devices = []
658
  for disk in instance.disks:
659
    device = _RecursiveFindBD(disk)
660
    if device is None:
661
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
662
                                    str(disk))
663
    device.Open()
664
    block_devices.append((disk, device))
665
  return block_devices
666

    
667

    
668
def StartInstance(instance, extra_args):
669
  """Start an instance.
670

671
  Args:
672
    instance - name of instance to start.
673

674
  """
675
  running_instances = GetInstanceList()
676

    
677
  if instance.name in running_instances:
678
    return True
679

    
680
  block_devices = _GatherBlockDevs(instance)
681
  hyper = hypervisor.GetHypervisor()
682

    
683
  try:
684
    hyper.StartInstance(instance, block_devices, extra_args)
685
  except errors.HypervisorError, err:
686
    logging.exception("Failed to start instance")
687
    return False
688

    
689
  return True
690

    
691

    
692
def ShutdownInstance(instance):
693
  """Shut an instance down.
694

695
  Args:
696
    instance - name of instance to shutdown.
697

698
  """
699
  running_instances = GetInstanceList()
700

    
701
  if instance.name not in running_instances:
702
    return True
703

    
704
  hyper = hypervisor.GetHypervisor()
705
  try:
706
    hyper.StopInstance(instance)
707
  except errors.HypervisorError, err:
708
    logging.error("Failed to stop instance")
709
    return False
710

    
711
  # test every 10secs for 2min
712
  shutdown_ok = False
713

    
714
  time.sleep(1)
715
  for dummy in range(11):
716
    if instance.name not in GetInstanceList():
717
      break
718
    time.sleep(10)
719
  else:
720
    # the shutdown did not succeed
721
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
722

    
723
    try:
724
      hyper.StopInstance(instance, force=True)
725
    except errors.HypervisorError, err:
726
      logging.exception("Failed to stop instance")
727
      return False
728

    
729
    time.sleep(1)
730
    if instance.name in GetInstanceList():
731
      logging.error("could not shutdown instance '%s' even by destroy",
732
                    instance.name)
733
      return False
734

    
735
  return True
736

    
737

    
738
def RebootInstance(instance, reboot_type, extra_args):
739
  """Reboot an instance.
740

741
  Args:
742
    instance    - name of instance to reboot
743
    reboot_type - how to reboot [soft,hard,full]
744

745
  """
746
  running_instances = GetInstanceList()
747

    
748
  if instance.name not in running_instances:
749
    logging.error("Cannot reboot instance that is not running")
750
    return False
751

    
752
  hyper = hypervisor.GetHypervisor()
753
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
754
    try:
755
      hyper.RebootInstance(instance)
756
    except errors.HypervisorError, err:
757
      logging.exception("Failed to soft reboot instance")
758
      return False
759
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
760
    try:
761
      ShutdownInstance(instance)
762
      StartInstance(instance, extra_args)
763
    except errors.HypervisorError, err:
764
      logging.exception("Failed to hard reboot instance")
765
      return False
766
  else:
767
    raise errors.ParameterError("reboot_type invalid")
768

    
769

    
770
  return True
771

    
772

    
773
def MigrateInstance(instance, target, live):
774
  """Migrates an instance to another node.
775

776
  """
777
  hyper = hypervisor.GetHypervisor()
778

    
779
  try:
780
    hyper.MigrateInstance(instance, target, live)
781
  except errors.HypervisorError, err:
782
    msg = "Failed to migrate instance: %s" % str(err)
783
    logging.error(msg)
784
    return (False, msg)
785
  return (True, "Migration successfull")
786

    
787

    
788
def CreateBlockDevice(disk, size, owner, on_primary, info):
789
  """Creates a block device for an instance.
790

791
  Args:
792
   disk: a ganeti.objects.Disk object
793
   size: the size of the physical underlying device
794
   owner: a string with the name of the instance
795
   on_primary: a boolean indicating if it is the primary node or not
796
   info: string that will be sent to the physical device creation
797

798
  Returns:
799
    the new unique_id of the device (this can sometime be
800
    computed only after creation), or None. On secondary nodes,
801
    it's not required to return anything.
802

803
  """
804
  clist = []
805
  if disk.children:
806
    for child in disk.children:
807
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
808
      if on_primary or disk.AssembleOnSecondary():
809
        # we need the children open in case the device itself has to
810
        # be assembled
811
        crdev.Open()
812
      clist.append(crdev)
813
  try:
814
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
815
    if device is not None:
816
      logging.info("removing existing device %s", disk)
817
      device.Remove()
818
  except errors.BlockDeviceError, err:
819
    pass
820

    
821
  device = bdev.Create(disk.dev_type, disk.physical_id,
822
                       clist, size)
823
  if device is None:
824
    raise ValueError("Can't create child device for %s, %s" %
825
                     (disk, size))
826
  if on_primary or disk.AssembleOnSecondary():
827
    if not device.Assemble():
828
      errorstring = "Can't assemble device after creation"
829
      logging.error(errorstring)
830
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
831
                                    " daemon logs" % errorstring)
832
    device.SetSyncSpeed(constants.SYNC_SPEED)
833
    if on_primary or disk.OpenOnSecondary():
834
      device.Open(force=True)
835
    DevCacheManager.UpdateCache(device.dev_path, owner,
836
                                on_primary, disk.iv_name)
837

    
838
  device.SetInfo(info)
839

    
840
  physical_id = device.unique_id
841
  return physical_id
842

    
843

    
844
def RemoveBlockDevice(disk):
845
  """Remove a block device.
846

847
  This is intended to be called recursively.
848

849
  """
850
  try:
851
    # since we are removing the device, allow a partial match
852
    # this allows removal of broken mirrors
853
    rdev = _RecursiveFindBD(disk, allow_partial=True)
854
  except errors.BlockDeviceError, err:
855
    # probably can't attach
856
    logging.info("Can't attach to device %s in remove", disk)
857
    rdev = None
858
  if rdev is not None:
859
    r_path = rdev.dev_path
860
    result = rdev.Remove()
861
    if result:
862
      DevCacheManager.RemoveCache(r_path)
863
  else:
864
    result = True
865
  if disk.children:
866
    for child in disk.children:
867
      result = result and RemoveBlockDevice(child)
868
  return result
869

    
870

    
871
def _RecursiveAssembleBD(disk, owner, as_primary):
872
  """Activate a block device for an instance.
873

874
  This is run on the primary and secondary nodes for an instance.
875

876
  This function is called recursively.
877

878
  Args:
879
    disk: a objects.Disk object
880
    as_primary: if we should make the block device read/write
881

882
  Returns:
883
    the assembled device or None (in case no device was assembled)
884

885
  If the assembly is not successful, an exception is raised.
886

887
  """
888
  children = []
889
  if disk.children:
890
    mcn = disk.ChildrenNeeded()
891
    if mcn == -1:
892
      mcn = 0 # max number of Nones allowed
893
    else:
894
      mcn = len(disk.children) - mcn # max number of Nones
895
    for chld_disk in disk.children:
896
      try:
897
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
898
      except errors.BlockDeviceError, err:
899
        if children.count(None) >= mcn:
900
          raise
901
        cdev = None
902
        logging.debug("Error in child activation: %s", str(err))
903
      children.append(cdev)
904

    
905
  if as_primary or disk.AssembleOnSecondary():
906
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
907
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
908
    result = r_dev
909
    if as_primary or disk.OpenOnSecondary():
910
      r_dev.Open()
911
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
912
                                as_primary, disk.iv_name)
913

    
914
  else:
915
    result = True
916
  return result
917

    
918

    
919
def AssembleBlockDevice(disk, owner, as_primary):
920
  """Activate a block device for an instance.
921

922
  This is a wrapper over _RecursiveAssembleBD.
923

924
  Returns:
925
    a /dev path for primary nodes
926
    True for secondary nodes
927

928
  """
929
  result = _RecursiveAssembleBD(disk, owner, as_primary)
930
  if isinstance(result, bdev.BlockDev):
931
    result = result.dev_path
932
  return result
933

    
934

    
935
def ShutdownBlockDevice(disk):
936
  """Shut down a block device.
937

938
  First, if the device is assembled (can `Attach()`), then the device
939
  is shutdown. Then the children of the device are shutdown.
940

941
  This function is called recursively. Note that we don't cache the
942
  children or such, as oppossed to assemble, shutdown of different
943
  devices doesn't require that the upper device was active.
944

945
  """
946
  r_dev = _RecursiveFindBD(disk)
947
  if r_dev is not None:
948
    r_path = r_dev.dev_path
949
    result = r_dev.Shutdown()
950
    if result:
951
      DevCacheManager.RemoveCache(r_path)
952
  else:
953
    result = True
954
  if disk.children:
955
    for child in disk.children:
956
      result = result and ShutdownBlockDevice(child)
957
  return result
958

    
959

    
960
def MirrorAddChildren(parent_cdev, new_cdevs):
961
  """Extend a mirrored block device.
962

963
  """
964
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
965
  if parent_bdev is None:
966
    logging.error("Can't find parent device")
967
    return False
968
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
969
  if new_bdevs.count(None) > 0:
970
    logging.error("Can't find new device(s) to add: %s:%s",
971
                  new_bdevs, new_cdevs)
972
    return False
973
  parent_bdev.AddChildren(new_bdevs)
974
  return True
975

    
976

    
977
def MirrorRemoveChildren(parent_cdev, new_cdevs):
978
  """Shrink a mirrored block device.
979

980
  """
981
  parent_bdev = _RecursiveFindBD(parent_cdev)
982
  if parent_bdev is None:
983
    logging.error("Can't find parent in remove children: %s", parent_cdev)
984
    return False
985
  devs = []
986
  for disk in new_cdevs:
987
    rpath = disk.StaticDevPath()
988
    if rpath is None:
989
      bd = _RecursiveFindBD(disk)
990
      if bd is None:
991
        logging.error("Can't find dynamic device %s while removing children",
992
                      disk)
993
        return False
994
      else:
995
        devs.append(bd.dev_path)
996
    else:
997
      devs.append(rpath)
998
  parent_bdev.RemoveChildren(devs)
999
  return True
1000

    
1001

    
1002
def GetMirrorStatus(disks):
1003
  """Get the mirroring status of a list of devices.
1004

1005
  Args:
1006
    disks: list of `objects.Disk`
1007

1008
  Returns:
1009
    list of (mirror_done, estimated_time) tuples, which
1010
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1011

1012
  """
1013
  stats = []
1014
  for dsk in disks:
1015
    rbd = _RecursiveFindBD(dsk)
1016
    if rbd is None:
1017
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1018
    stats.append(rbd.CombinedSyncStatus())
1019
  return stats
1020

    
1021

    
1022
def _RecursiveFindBD(disk, allow_partial=False):
1023
  """Check if a device is activated.
1024

1025
  If so, return informations about the real device.
1026

1027
  Args:
1028
    disk: the objects.Disk instance
1029
    allow_partial: don't abort the find if a child of the
1030
                   device can't be found; this is intended to be
1031
                   used when repairing mirrors
1032

1033
  Returns:
1034
    None if the device can't be found
1035
    otherwise the device instance
1036

1037
  """
1038
  children = []
1039
  if disk.children:
1040
    for chdisk in disk.children:
1041
      children.append(_RecursiveFindBD(chdisk))
1042

    
1043
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1044

    
1045

    
1046
def FindBlockDevice(disk):
1047
  """Check if a device is activated.
1048

1049
  If so, return informations about the real device.
1050

1051
  Args:
1052
    disk: the objects.Disk instance
1053
  Returns:
1054
    None if the device can't be found
1055
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1056

1057
  """
1058
  rbd = _RecursiveFindBD(disk)
1059
  if rbd is None:
1060
    return rbd
1061
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1062

    
1063

    
1064
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1065
  """Write a file to the filesystem.
1066

1067
  This allows the master to overwrite(!) a file. It will only perform
1068
  the operation if the file belongs to a list of configuration files.
1069

1070
  """
1071
  if not os.path.isabs(file_name):
1072
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1073
                  file_name)
1074
    return False
1075

    
1076
  allowed_files = [
1077
    constants.CLUSTER_CONF_FILE,
1078
    constants.ETC_HOSTS,
1079
    constants.SSH_KNOWN_HOSTS_FILE,
1080
    constants.VNC_PASSWORD_FILE,
1081
    ]
1082
  allowed_files.extend(ssconf.SimpleStore().GetFileList())
1083

    
1084
  if file_name not in allowed_files:
1085
    logging.error("Filename passed to UploadFile not in allowed"
1086
                 " upload targets: '%s'", file_name)
1087
    return False
1088

    
1089
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1090
                  atime=atime, mtime=mtime)
1091
  return True
1092

    
1093

    
1094
def _ErrnoOrStr(err):
1095
  """Format an EnvironmentError exception.
1096

1097
  If the `err` argument has an errno attribute, it will be looked up
1098
  and converted into a textual EXXXX description. Otherwise the string
1099
  representation of the error will be returned.
1100

1101
  """
1102
  if hasattr(err, 'errno'):
1103
    detail = errno.errorcode[err.errno]
1104
  else:
1105
    detail = str(err)
1106
  return detail
1107

    
1108

    
1109
def _OSOndiskVersion(name, os_dir):
1110
  """Compute and return the API version of a given OS.
1111

1112
  This function will try to read the API version of the os given by
1113
  the 'name' parameter and residing in the 'os_dir' directory.
1114

1115
  Return value will be either an integer denoting the version or None in the
1116
  case when this is not a valid OS name.
1117

1118
  """
1119
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1120

    
1121
  try:
1122
    st = os.stat(api_file)
1123
  except EnvironmentError, err:
1124
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1125
                           " found (%s)" % _ErrnoOrStr(err))
1126

    
1127
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1128
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1129
                           " a regular file")
1130

    
1131
  try:
1132
    f = open(api_file)
1133
    try:
1134
      api_version = f.read(256)
1135
    finally:
1136
      f.close()
1137
  except EnvironmentError, err:
1138
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1139
                           " API version (%s)" % _ErrnoOrStr(err))
1140

    
1141
  api_version = api_version.strip()
1142
  try:
1143
    api_version = int(api_version)
1144
  except (TypeError, ValueError), err:
1145
    raise errors.InvalidOS(name, os_dir,
1146
                           "API version is not integer (%s)" % str(err))
1147

    
1148
  return api_version
1149

    
1150

    
1151
def DiagnoseOS(top_dirs=None):
1152
  """Compute the validity for all OSes.
1153

1154
  Returns an OS object for each name in all the given top directories
1155
  (if not given defaults to constants.OS_SEARCH_PATH)
1156

1157
  Returns:
1158
    list of OS objects
1159

1160
  """
1161
  if top_dirs is None:
1162
    top_dirs = constants.OS_SEARCH_PATH
1163

    
1164
  result = []
1165
  for dir_name in top_dirs:
1166
    if os.path.isdir(dir_name):
1167
      try:
1168
        f_names = utils.ListVisibleFiles(dir_name)
1169
      except EnvironmentError, err:
1170
        logging.exception("Can't list the OS directory %s", dir_name)
1171
        break
1172
      for name in f_names:
1173
        try:
1174
          os_inst = OSFromDisk(name, base_dir=dir_name)
1175
          result.append(os_inst)
1176
        except errors.InvalidOS, err:
1177
          result.append(objects.OS.FromInvalidOS(err))
1178

    
1179
  return result
1180

    
1181

    
1182
def OSFromDisk(name, base_dir=None):
1183
  """Create an OS instance from disk.
1184

1185
  This function will return an OS instance if the given name is a
1186
  valid OS name. Otherwise, it will raise an appropriate
1187
  `errors.InvalidOS` exception, detailing why this is not a valid
1188
  OS.
1189

1190
  Args:
1191
    os_dir: Directory containing the OS scripts. Defaults to a search
1192
            in all the OS_SEARCH_PATH directories.
1193

1194
  """
1195

    
1196
  if base_dir is None:
1197
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1198
    if os_dir is None:
1199
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1200
  else:
1201
    os_dir = os.path.sep.join([base_dir, name])
1202

    
1203
  api_version = _OSOndiskVersion(name, os_dir)
1204

    
1205
  if api_version != constants.OS_API_VERSION:
1206
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1207
                           " (found %s want %s)"
1208
                           % (api_version, constants.OS_API_VERSION))
1209

    
1210
  # OS Scripts dictionary, we will populate it with the actual script names
1211
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1212

    
1213
  for script in os_scripts:
1214
    os_scripts[script] = os.path.sep.join([os_dir, script])
1215

    
1216
    try:
1217
      st = os.stat(os_scripts[script])
1218
    except EnvironmentError, err:
1219
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1220
                             (script, _ErrnoOrStr(err)))
1221

    
1222
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1223
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1224
                             script)
1225

    
1226
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1227
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1228
                             script)
1229

    
1230

    
1231
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1232
                    create_script=os_scripts['create'],
1233
                    export_script=os_scripts['export'],
1234
                    import_script=os_scripts['import'],
1235
                    rename_script=os_scripts['rename'],
1236
                    api_version=api_version)
1237

    
1238

    
1239
def GrowBlockDevice(disk, amount):
1240
  """Grow a stack of block devices.
1241

1242
  This function is called recursively, with the childrens being the
1243
  first one resize.
1244

1245
  Args:
1246
    disk: the disk to be grown
1247

1248
  Returns: a tuple of (status, result), with:
1249
    status: the result (true/false) of the operation
1250
    result: the error message if the operation failed, otherwise not used
1251

1252
  """
1253
  r_dev = _RecursiveFindBD(disk)
1254
  if r_dev is None:
1255
    return False, "Cannot find block device %s" % (disk,)
1256

    
1257
  try:
1258
    r_dev.Grow(amount)
1259
  except errors.BlockDeviceError, err:
1260
    return False, str(err)
1261

    
1262
  return True, None
1263

    
1264

    
1265
def SnapshotBlockDevice(disk):
1266
  """Create a snapshot copy of a block device.
1267

1268
  This function is called recursively, and the snapshot is actually created
1269
  just for the leaf lvm backend device.
1270

1271
  Args:
1272
    disk: the disk to be snapshotted
1273

1274
  Returns:
1275
    a config entry for the actual lvm device snapshotted.
1276

1277
  """
1278
  if disk.children:
1279
    if len(disk.children) == 1:
1280
      # only one child, let's recurse on it
1281
      return SnapshotBlockDevice(disk.children[0])
1282
    else:
1283
      # more than one child, choose one that matches
1284
      for child in disk.children:
1285
        if child.size == disk.size:
1286
          # return implies breaking the loop
1287
          return SnapshotBlockDevice(child)
1288
  elif disk.dev_type == constants.LD_LV:
1289
    r_dev = _RecursiveFindBD(disk)
1290
    if r_dev is not None:
1291
      # let's stay on the safe side and ask for the full size, for now
1292
      return r_dev.Snapshot(disk.size)
1293
    else:
1294
      return None
1295
  else:
1296
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1297
                                 " '%s' of type '%s'" %
1298
                                 (disk.unique_id, disk.dev_type))
1299

    
1300

    
1301
def ExportSnapshot(disk, dest_node, instance):
1302
  """Export a block device snapshot to a remote node.
1303

1304
  Args:
1305
    disk: the snapshot block device
1306
    dest_node: the node to send the image to
1307
    instance: instance being exported
1308

1309
  Returns:
1310
    True if successful, False otherwise.
1311

1312
  """
1313
  inst_os = OSFromDisk(instance.os)
1314
  export_script = inst_os.export_script
1315

    
1316
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1317
                                     instance.name, int(time.time()))
1318
  if not os.path.exists(constants.LOG_OS_DIR):
1319
    os.mkdir(constants.LOG_OS_DIR, 0750)
1320

    
1321
  real_os_dev = _RecursiveFindBD(disk)
1322
  if real_os_dev is None:
1323
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1324
                                  str(disk))
1325
  real_os_dev.Open()
1326

    
1327
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1328
  destfile = disk.physical_id[1]
1329

    
1330
  # the target command is built out of three individual commands,
1331
  # which are joined by pipes; we check each individual command for
1332
  # valid parameters
1333

    
1334
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1335
                               export_script, instance.name,
1336
                               real_os_dev.dev_path, logfile)
1337

    
1338
  comprcmd = "gzip"
1339

    
1340
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1341
                                destdir, destdir, destfile)
1342
  remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1343
                                       destcmd)
1344

    
1345
  # all commands have been checked, so we're safe to combine them
1346
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1347

    
1348
  result = utils.RunCmd(command)
1349

    
1350
  if result.failed:
1351
    logging.error("os snapshot export command '%s' returned error: %s"
1352
                  " output: %s", command, result.fail_reason, result.output)
1353
    return False
1354

    
1355
  return True
1356

    
1357

    
1358
def FinalizeExport(instance, snap_disks):
1359
  """Write out the export configuration information.
1360

1361
  Args:
1362
    instance: instance configuration
1363
    snap_disks: snapshot block devices
1364

1365
  Returns:
1366
    False in case of error, True otherwise.
1367

1368
  """
1369
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1370
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1371

    
1372
  config = objects.SerializableConfigParser()
1373

    
1374
  config.add_section(constants.INISECT_EXP)
1375
  config.set(constants.INISECT_EXP, 'version', '0')
1376
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1377
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1378
  config.set(constants.INISECT_EXP, 'os', instance.os)
1379
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1380

    
1381
  config.add_section(constants.INISECT_INS)
1382
  config.set(constants.INISECT_INS, 'name', instance.name)
1383
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1384
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1385
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1386

    
1387
  nic_count = 0
1388
  for nic_count, nic in enumerate(instance.nics):
1389
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1390
               nic_count, '%s' % nic.mac)
1391
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1392
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1393
               '%s' % nic.bridge)
1394
  # TODO: redundant: on load can read nics until it doesn't exist
1395
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1396

    
1397
  disk_count = 0
1398
  for disk_count, disk in enumerate(snap_disks):
1399
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1400
               ('%s' % disk.iv_name))
1401
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1402
               ('%s' % disk.physical_id[1]))
1403
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1404
               ('%d' % disk.size))
1405
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1406

    
1407
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1408
  cfo = open(cff, 'w')
1409
  try:
1410
    config.write(cfo)
1411
  finally:
1412
    cfo.close()
1413

    
1414
  shutil.rmtree(finaldestdir, True)
1415
  shutil.move(destdir, finaldestdir)
1416

    
1417
  return True
1418

    
1419

    
1420
def ExportInfo(dest):
1421
  """Get export configuration information.
1422

1423
  Args:
1424
    dest: directory containing the export
1425

1426
  Returns:
1427
    A serializable config file containing the export info.
1428

1429
  """
1430
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1431

    
1432
  config = objects.SerializableConfigParser()
1433
  config.read(cff)
1434

    
1435
  if (not config.has_section(constants.INISECT_EXP) or
1436
      not config.has_section(constants.INISECT_INS)):
1437
    return None
1438

    
1439
  return config
1440

    
1441

    
1442
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1443
  """Import an os image into an instance.
1444

1445
  Args:
1446
    instance: the instance object
1447
    os_disk: the instance-visible name of the os device
1448
    swap_disk: the instance-visible name of the swap device
1449
    src_node: node holding the source image
1450
    src_image: path to the source image on src_node
1451

1452
  Returns:
1453
    False in case of error, True otherwise.
1454

1455
  """
1456
  inst_os = OSFromDisk(instance.os)
1457
  import_script = inst_os.import_script
1458

    
1459
  os_device = instance.FindDisk(os_disk)
1460
  if os_device is None:
1461
    logging.error("Can't find this device-visible name '%s'", os_disk)
1462
    return False
1463

    
1464
  swap_device = instance.FindDisk(swap_disk)
1465
  if swap_device is None:
1466
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1467
    return False
1468

    
1469
  real_os_dev = _RecursiveFindBD(os_device)
1470
  if real_os_dev is None:
1471
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1472
                                  str(os_device))
1473
  real_os_dev.Open()
1474

    
1475
  real_swap_dev = _RecursiveFindBD(swap_device)
1476
  if real_swap_dev is None:
1477
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1478
                                  str(swap_device))
1479
  real_swap_dev.Open()
1480

    
1481
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1482
                                        instance.name, int(time.time()))
1483
  if not os.path.exists(constants.LOG_OS_DIR):
1484
    os.mkdir(constants.LOG_OS_DIR, 0750)
1485

    
1486
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1487
  remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1488
                                       destcmd)
1489

    
1490
  comprcmd = "gunzip"
1491
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1492
                               inst_os.path, import_script, instance.name,
1493
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1494
                               logfile)
1495

    
1496
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1497
  env = {'HYPERVISOR': ssconf.SimpleStore().GetHypervisorType()}
1498

    
1499
  result = utils.RunCmd(command, env=env)
1500

    
1501
  if result.failed:
1502
    logging.error("os import command '%s' returned error: %s"
1503
                  " output: %s", command, result.fail_reason, result.output)
1504
    return False
1505

    
1506
  return True
1507

    
1508

    
1509
def ListExports():
1510
  """Return a list of exports currently available on this machine.
1511

1512
  """
1513
  if os.path.isdir(constants.EXPORT_DIR):
1514
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1515
  else:
1516
    return []
1517

    
1518

    
1519
def RemoveExport(export):
1520
  """Remove an existing export from the node.
1521

1522
  Args:
1523
    export: the name of the export to remove
1524

1525
  Returns:
1526
    False in case of error, True otherwise.
1527

1528
  """
1529
  target = os.path.join(constants.EXPORT_DIR, export)
1530

    
1531
  shutil.rmtree(target)
1532
  # TODO: catch some of the relevant exceptions and provide a pretty
1533
  # error message if rmtree fails.
1534

    
1535
  return True
1536

    
1537

    
1538
def RenameBlockDevices(devlist):
1539
  """Rename a list of block devices.
1540

1541
  The devlist argument is a list of tuples (disk, new_logical,
1542
  new_physical). The return value will be a combined boolean result
1543
  (True only if all renames succeeded).
1544

1545
  """
1546
  result = True
1547
  for disk, unique_id in devlist:
1548
    dev = _RecursiveFindBD(disk)
1549
    if dev is None:
1550
      result = False
1551
      continue
1552
    try:
1553
      old_rpath = dev.dev_path
1554
      dev.Rename(unique_id)
1555
      new_rpath = dev.dev_path
1556
      if old_rpath != new_rpath:
1557
        DevCacheManager.RemoveCache(old_rpath)
1558
        # FIXME: we should add the new cache information here, like:
1559
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1560
        # but we don't have the owner here - maybe parse from existing
1561
        # cache? for now, we only lose lvm data when we rename, which
1562
        # is less critical than DRBD or MD
1563
    except errors.BlockDeviceError, err:
1564
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1565
      result = False
1566
  return result
1567

    
1568

    
1569
def _TransformFileStorageDir(file_storage_dir):
1570
  """Checks whether given file_storage_dir is valid.
1571

1572
  Checks wheter the given file_storage_dir is within the cluster-wide
1573
  default file_storage_dir stored in SimpleStore. Only paths under that
1574
  directory are allowed.
1575

1576
  Args:
1577
    file_storage_dir: string with path
1578

1579
  Returns:
1580
    normalized file_storage_dir (string) if valid, None otherwise
1581

1582
  """
1583
  file_storage_dir = os.path.normpath(file_storage_dir)
1584
  base_file_storage_dir = ssconf.SimpleStore().GetFileStorageDir()
1585
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1586
      base_file_storage_dir):
1587
    logging.error("file storage directory '%s' is not under base file"
1588
                  " storage directory '%s'",
1589
                  file_storage_dir, base_file_storage_dir)
1590
    return None
1591
  return file_storage_dir
1592

    
1593

    
1594
def CreateFileStorageDir(file_storage_dir):
1595
  """Create file storage directory.
1596

1597
  Args:
1598
    file_storage_dir: string containing the path
1599

1600
  Returns:
1601
    tuple with first element a boolean indicating wheter dir
1602
    creation was successful or not
1603

1604
  """
1605
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1606
  result = True,
1607
  if not file_storage_dir:
1608
    result = False,
1609
  else:
1610
    if os.path.exists(file_storage_dir):
1611
      if not os.path.isdir(file_storage_dir):
1612
        logging.error("'%s' is not a directory", file_storage_dir)
1613
        result = False,
1614
    else:
1615
      try:
1616
        os.makedirs(file_storage_dir, 0750)
1617
      except OSError, err:
1618
        logging.error("Cannot create file storage directory '%s': %s",
1619
                      file_storage_dir, err)
1620
        result = False,
1621
  return result
1622

    
1623

    
1624
def RemoveFileStorageDir(file_storage_dir):
1625
  """Remove file storage directory.
1626

1627
  Remove it only if it's empty. If not log an error and return.
1628

1629
  Args:
1630
    file_storage_dir: string containing the path
1631

1632
  Returns:
1633
    tuple with first element a boolean indicating wheter dir
1634
    removal was successful or not
1635

1636
  """
1637
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1638
  result = True,
1639
  if not file_storage_dir:
1640
    result = False,
1641
  else:
1642
    if os.path.exists(file_storage_dir):
1643
      if not os.path.isdir(file_storage_dir):
1644
        logging.error("'%s' is not a directory", file_storage_dir)
1645
        result = False,
1646
      # deletes dir only if empty, otherwise we want to return False
1647
      try:
1648
        os.rmdir(file_storage_dir)
1649
      except OSError, err:
1650
        logging.exception("Cannot remove file storage directory '%s'",
1651
                          file_storage_dir)
1652
        result = False,
1653
  return result
1654

    
1655

    
1656
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1657
  """Rename the file storage directory.
1658

1659
  Args:
1660
    old_file_storage_dir: string containing the old path
1661
    new_file_storage_dir: string containing the new path
1662

1663
  Returns:
1664
    tuple with first element a boolean indicating wheter dir
1665
    rename was successful or not
1666

1667
  """
1668
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1669
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1670
  result = True,
1671
  if not old_file_storage_dir or not new_file_storage_dir:
1672
    result = False,
1673
  else:
1674
    if not os.path.exists(new_file_storage_dir):
1675
      if os.path.isdir(old_file_storage_dir):
1676
        try:
1677
          os.rename(old_file_storage_dir, new_file_storage_dir)
1678
        except OSError, err:
1679
          logging.exception("Cannot rename '%s' to '%s'",
1680
                            old_file_storage_dir, new_file_storage_dir)
1681
          result =  False,
1682
      else:
1683
        logging.error("'%s' is not a directory", old_file_storage_dir)
1684
        result = False,
1685
    else:
1686
      if os.path.exists(old_file_storage_dir):
1687
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1688
                      old_file_storage_dir, new_file_storage_dir)
1689
        result = False,
1690
  return result
1691

    
1692

    
1693
def _IsJobQueueFile(file_name):
1694
  """Checks whether the given filename is in the queue directory.
1695

1696
  """
1697
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1698
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1699

    
1700
  if not result:
1701
    logging.error("'%s' is not a file in the queue directory",
1702
                  file_name)
1703

    
1704
  return result
1705

    
1706

    
1707
def JobQueueUpdate(file_name, content):
1708
  """Updates a file in the queue directory.
1709

1710
  """
1711
  if not _IsJobQueueFile(file_name):
1712
    return False
1713

    
1714
  # Write and replace the file atomically
1715
  utils.WriteFile(file_name, data=content)
1716

    
1717
  return True
1718

    
1719

    
1720
def JobQueuePurge():
1721
  """Removes job queue files and archived jobs
1722

1723
  """
1724
  # The lock must not be removed, otherwise another process could create
1725
  # it again.
1726
  return _JobQueuePurge(keep_lock=True)
1727

    
1728

    
1729
def JobQueueRename(old, new):
1730
  """Renames a job queue file.
1731

1732
  """
1733
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1734
    return False
1735

    
1736
  os.rename(old, new)
1737

    
1738
  return True
1739

    
1740

    
1741
def CloseBlockDevices(disks):
1742
  """Closes the given block devices.
1743

1744
  This means they will be switched to secondary mode (in case of DRBD).
1745

1746
  """
1747
  bdevs = []
1748
  for cf in disks:
1749
    rd = _RecursiveFindBD(cf)
1750
    if rd is None:
1751
      return (False, "Can't find device %s" % cf)
1752
    bdevs.append(rd)
1753

    
1754
  msg = []
1755
  for rd in bdevs:
1756
    try:
1757
      rd.Close()
1758
    except errors.BlockDeviceError, err:
1759
      msg.append(str(err))
1760
  if msg:
1761
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1762
  else:
1763
    return (True, "All devices secondary")
1764

    
1765

    
1766
class HooksRunner(object):
1767
  """Hook runner.
1768

1769
  This class is instantiated on the node side (ganeti-noded) and not on
1770
  the master side.
1771

1772
  """
1773
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1774

    
1775
  def __init__(self, hooks_base_dir=None):
1776
    """Constructor for hooks runner.
1777

1778
    Args:
1779
      - hooks_base_dir: if not None, this overrides the
1780
        constants.HOOKS_BASE_DIR (useful for unittests)
1781

1782
    """
1783
    if hooks_base_dir is None:
1784
      hooks_base_dir = constants.HOOKS_BASE_DIR
1785
    self._BASE_DIR = hooks_base_dir
1786

    
1787
  @staticmethod
1788
  def ExecHook(script, env):
1789
    """Exec one hook script.
1790

1791
    Args:
1792
     - script: the full path to the script
1793
     - env: the environment with which to exec the script
1794

1795
    """
1796
    # exec the process using subprocess and log the output
1797
    fdstdin = None
1798
    try:
1799
      fdstdin = open("/dev/null", "r")
1800
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1801
                               stderr=subprocess.STDOUT, close_fds=True,
1802
                               shell=False, cwd="/", env=env)
1803
      output = ""
1804
      try:
1805
        output = child.stdout.read(4096)
1806
        child.stdout.close()
1807
      except EnvironmentError, err:
1808
        output += "Hook script error: %s" % str(err)
1809

    
1810
      while True:
1811
        try:
1812
          result = child.wait()
1813
          break
1814
        except EnvironmentError, err:
1815
          if err.errno == errno.EINTR:
1816
            continue
1817
          raise
1818
    finally:
1819
      # try not to leak fds
1820
      for fd in (fdstdin, ):
1821
        if fd is not None:
1822
          try:
1823
            fd.close()
1824
          except EnvironmentError, err:
1825
            # just log the error
1826
            #logging.exception("Error while closing fd %s", fd)
1827
            pass
1828

    
1829
    return result == 0, output
1830

    
1831
  def RunHooks(self, hpath, phase, env):
1832
    """Run the scripts in the hooks directory.
1833

1834
    This method will not be usually overriden by child opcodes.
1835

1836
    """
1837
    if phase == constants.HOOKS_PHASE_PRE:
1838
      suffix = "pre"
1839
    elif phase == constants.HOOKS_PHASE_POST:
1840
      suffix = "post"
1841
    else:
1842
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1843
    rr = []
1844

    
1845
    subdir = "%s-%s.d" % (hpath, suffix)
1846
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1847
    try:
1848
      dir_contents = utils.ListVisibleFiles(dir_name)
1849
    except OSError, err:
1850
      # must log
1851
      return rr
1852

    
1853
    # we use the standard python sort order,
1854
    # so 00name is the recommended naming scheme
1855
    dir_contents.sort()
1856
    for relname in dir_contents:
1857
      fname = os.path.join(dir_name, relname)
1858
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1859
          self.RE_MASK.match(relname) is not None):
1860
        rrval = constants.HKR_SKIP
1861
        output = ""
1862
      else:
1863
        result, output = self.ExecHook(fname, env)
1864
        if not result:
1865
          rrval = constants.HKR_FAIL
1866
        else:
1867
          rrval = constants.HKR_SUCCESS
1868
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1869

    
1870
    return rr
1871

    
1872

    
1873
class IAllocatorRunner(object):
1874
  """IAllocator runner.
1875

1876
  This class is instantiated on the node side (ganeti-noded) and not on
1877
  the master side.
1878

1879
  """
1880
  def Run(self, name, idata):
1881
    """Run an iallocator script.
1882

1883
    Return value: tuple of:
1884
       - run status (one of the IARUN_ constants)
1885
       - stdout
1886
       - stderr
1887
       - fail reason (as from utils.RunResult)
1888

1889
    """
1890
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1891
                                  os.path.isfile)
1892
    if alloc_script is None:
1893
      return (constants.IARUN_NOTFOUND, None, None, None)
1894

    
1895
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1896
    try:
1897
      os.write(fd, idata)
1898
      os.close(fd)
1899
      result = utils.RunCmd([alloc_script, fin_name])
1900
      if result.failed:
1901
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1902
                result.fail_reason)
1903
    finally:
1904
      os.unlink(fin_name)
1905

    
1906
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1907

    
1908

    
1909
class DevCacheManager(object):
1910
  """Simple class for managing a cache of block device information.
1911

1912
  """
1913
  _DEV_PREFIX = "/dev/"
1914
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1915

    
1916
  @classmethod
1917
  def _ConvertPath(cls, dev_path):
1918
    """Converts a /dev/name path to the cache file name.
1919

1920
    This replaces slashes with underscores and strips the /dev
1921
    prefix. It then returns the full path to the cache file
1922

1923
    """
1924
    if dev_path.startswith(cls._DEV_PREFIX):
1925
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1926
    dev_path = dev_path.replace("/", "_")
1927
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1928
    return fpath
1929

    
1930
  @classmethod
1931
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1932
    """Updates the cache information for a given device.
1933

1934
    """
1935
    if dev_path is None:
1936
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1937
      return
1938
    fpath = cls._ConvertPath(dev_path)
1939
    if on_primary:
1940
      state = "primary"
1941
    else:
1942
      state = "secondary"
1943
    if iv_name is None:
1944
      iv_name = "not_visible"
1945
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1946
    try:
1947
      utils.WriteFile(fpath, data=fdata)
1948
    except EnvironmentError, err:
1949
      logging.exception("Can't update bdev cache for %s", dev_path)
1950

    
1951
  @classmethod
1952
  def RemoveCache(cls, dev_path):
1953
    """Remove data for a dev_path.
1954

1955
    """
1956
    if dev_path is None:
1957
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
1958
      return
1959
    fpath = cls._ConvertPath(dev_path)
1960
    try:
1961
      utils.RemoveFile(fpath)
1962
    except EnvironmentError, err:
1963
      logging.exception("Can't update bdev cache for %s", dev_path)