Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 7688d0d3

History | View | Annotate | Download (55.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36

    
37
from ganeti import errors
38
from ganeti import utils
39
from ganeti import ssh
40
from ganeti import hypervisor
41
from ganeti import constants
42
from ganeti import bdev
43
from ganeti import objects
44
from ganeti import ssconf
45

    
46

    
47
def _GetConfig():
48
  return ssconf.SimpleConfigReader()
49

    
50

    
51
def _GetSshRunner():
52
  return ssh.SshRunner(_GetConfig())
53

    
54

    
55
def _CleanDirectory(path, exclude=[]):
56
  """Removes all regular files in a directory.
57

58
  @param exclude: List of files to be excluded.
59
  @type exclude: list
60

61
  """
62
  if not os.path.isdir(path):
63
    return
64

    
65
  # Normalize excluded paths
66
  exclude = [os.path.normpath(i) for i in exclude]
67

    
68
  for rel_name in utils.ListVisibleFiles(path):
69
    full_name = os.path.normpath(os.path.join(path, rel_name))
70
    if full_name in exclude:
71
      continue
72
    if os.path.isfile(full_name) and not os.path.islink(full_name):
73
      utils.RemoveFile(full_name)
74

    
75

    
76
def JobQueuePurge():
77
  """Removes job queue files and archived jobs
78

79
  """
80
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
81
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
82

    
83

    
84
def GetMasterInfo():
85
  """Returns master information.
86

87
  This is an utility function to compute master information, either
88
  for consumption here or from the node daemon.
89

90
  @rtype: tuple
91
  @return: (master_netdev, master_ip, master_name)
92

93
  """
94
  try:
95
    cfg = _GetConfig()
96
    master_netdev = cfg.GetMasterNetdev()
97
    master_ip = cfg.GetMasterIP()
98
    master_node = cfg.GetMasterNode()
99
  except errors.ConfigurationError, err:
100
    logging.exception("Cluster configuration incomplete")
101
    return (None, None)
102
  return (master_netdev, master_ip, master_node)
103

    
104

    
105
def StartMaster(start_daemons):
106
  """Activate local node as master node.
107

108
  The function will always try activate the IP address of the master
109
  (if someone else has it, then it won't). Then, if the start_daemons
110
  parameter is True, it will also start the master daemons
111
  (ganet-masterd and ganeti-rapi).
112

113
  """
114
  ok = True
115
  master_netdev, master_ip, _ = GetMasterInfo()
116
  if not master_netdev:
117
    return False
118

    
119
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
120
    if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT,
121
                     source=constants.LOCALHOST_IP_ADDRESS):
122
      # we already have the ip:
123
      logging.debug("Already started")
124
    else:
125
      logging.error("Someone else has the master ip, not activating")
126
      ok = False
127
  else:
128
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
129
                           "dev", master_netdev, "label",
130
                           "%s:0" % master_netdev])
131
    if result.failed:
132
      logging.error("Can't activate master IP: %s", result.output)
133
      ok = False
134

    
135
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
136
                           "-s", master_ip, master_ip])
137
    # we'll ignore the exit code of arping
138

    
139
  # and now start the master and rapi daemons
140
  if start_daemons:
141
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
142
      result = utils.RunCmd([daemon])
143
      if result.failed:
144
        logging.error("Can't start daemon %s: %s", daemon, result.output)
145
        ok = False
146
  return ok
147

    
148

    
149
def StopMaster(stop_daemons):
150
  """Deactivate this node as master.
151

152
  The function will always try to deactivate the IP address of the
153
  master. Then, if the stop_daemons parameter is True, it will also
154
  stop the master daemons (ganet-masterd and ganeti-rapi).
155

156
  """
157
  master_netdev, master_ip, _ = GetMasterInfo()
158
  if not master_netdev:
159
    return False
160

    
161
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
162
                         "dev", master_netdev])
163
  if result.failed:
164
    logging.error("Can't remove the master IP, error: %s", result.output)
165
    # but otherwise ignore the failure
166

    
167
  if stop_daemons:
168
    # stop/kill the rapi and the master daemon
169
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
170
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
171

    
172
  return True
173

    
174

    
175
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
176
  """Joins this node to the cluster.
177

178
  This does the following:
179
      - updates the hostkeys of the machine (rsa and dsa)
180
      - adds the ssh private key to the user
181
      - adds the ssh public key to the users' authorized_keys file
182

183
  """
184
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
185
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
186
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
187
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
188
  for name, content, mode in sshd_keys:
189
    utils.WriteFile(name, data=content, mode=mode)
190

    
191
  try:
192
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
193
                                                    mkdir=True)
194
  except errors.OpExecError, err:
195
    logging.exception("Error while processing user ssh files")
196
    return False
197

    
198
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
199
    utils.WriteFile(name, data=content, mode=0600)
200

    
201
  utils.AddAuthorizedKey(auth_keys, sshpub)
202

    
203
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
204

    
205
  return True
206

    
207

    
208
def LeaveCluster():
209
  """Cleans up the current node and prepares it to be removed from the cluster.
210

211
  """
212
  _CleanDirectory(constants.DATA_DIR)
213
  JobQueuePurge()
214

    
215
  try:
216
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
217
  except errors.OpExecError:
218
    logging.exception("Error while processing ssh files")
219
    return
220

    
221
  f = open(pub_key, 'r')
222
  try:
223
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
224
  finally:
225
    f.close()
226

    
227
  utils.RemoveFile(priv_key)
228
  utils.RemoveFile(pub_key)
229

    
230
  # Return a reassuring string to the caller, and quit
231
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
232

    
233

    
234
def GetNodeInfo(vgname):
235
  """Gives back a hash with different informations about the node.
236

237
  Returns:
238
    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
239
      'memory_free' : xxx, 'memory_total' : xxx }
240
    where
241
    vg_size is the size of the configured volume group in MiB
242
    vg_free is the free size of the volume group in MiB
243
    memory_dom0 is the memory allocated for domain0 in MiB
244
    memory_free is the currently available (free) ram in MiB
245
    memory_total is the total number of ram in MiB
246

247
  """
248
  outputarray = {}
249
  vginfo = _GetVGInfo(vgname)
250
  outputarray['vg_size'] = vginfo['vg_size']
251
  outputarray['vg_free'] = vginfo['vg_free']
252

    
253
  hyper = hypervisor.GetHypervisor(_GetConfig())
254
  hyp_info = hyper.GetNodeInfo()
255
  if hyp_info is not None:
256
    outputarray.update(hyp_info)
257

    
258
  f = open("/proc/sys/kernel/random/boot_id", 'r')
259
  try:
260
    outputarray["bootid"] = f.read(128).rstrip("\n")
261
  finally:
262
    f.close()
263

    
264
  return outputarray
265

    
266

    
267
def VerifyNode(what):
268
  """Verify the status of the local node.
269

270
  Args:
271
    what - a dictionary of things to check:
272
      'filelist' : list of files for which to compute checksums
273
      'nodelist' : list of nodes we should check communication with
274
      'hypervisor': run the hypervisor-specific verify
275

276
  Requested files on local node are checksummed and the result returned.
277

278
  The nodelist is traversed, with the following checks being made
279
  for each node:
280
  - known_hosts key correct
281
  - correct resolving of node name (target node returns its own hostname
282
    by ssh-execution of 'hostname', result compared against name in list.
283

284
  """
285
  result = {}
286

    
287
  if 'hypervisor' in what:
288
    result['hypervisor'] = hypervisor.GetHypervisor(_GetConfig()).Verify()
289

    
290
  if 'filelist' in what:
291
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
292

    
293
  if 'nodelist' in what:
294
    result['nodelist'] = {}
295
    random.shuffle(what['nodelist'])
296
    for node in what['nodelist']:
297
      success, message = _GetSshRunner().VerifyNodeHostname(node)
298
      if not success:
299
        result['nodelist'][node] = message
300
  if 'node-net-test' in what:
301
    result['node-net-test'] = {}
302
    my_name = utils.HostInfo().name
303
    my_pip = my_sip = None
304
    for name, pip, sip in what['node-net-test']:
305
      if name == my_name:
306
        my_pip = pip
307
        my_sip = sip
308
        break
309
    if not my_pip:
310
      result['node-net-test'][my_name] = ("Can't find my own"
311
                                          " primary/secondary IP"
312
                                          " in the node list")
313
    else:
314
      port = utils.GetNodeDaemonPort()
315
      for name, pip, sip in what['node-net-test']:
316
        fail = []
317
        if not utils.TcpPing(pip, port, source=my_pip):
318
          fail.append("primary")
319
        if sip != pip:
320
          if not utils.TcpPing(sip, port, source=my_sip):
321
            fail.append("secondary")
322
        if fail:
323
          result['node-net-test'][name] = ("failure using the %s"
324
                                           " interface(s)" %
325
                                           " and ".join(fail))
326

    
327
  return result
328

    
329

    
330
def GetVolumeList(vg_name):
331
  """Compute list of logical volumes and their size.
332

333
  Returns:
334
    dictionary of all partions (key) with their size (in MiB), inactive
335
    and online status:
336
    {'test1': ('20.06', True, True)}
337

338
  """
339
  lvs = {}
340
  sep = '|'
341
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
342
                         "--separator=%s" % sep,
343
                         "-olv_name,lv_size,lv_attr", vg_name])
344
  if result.failed:
345
    logging.error("Failed to list logical volumes, lvs output: %s",
346
                  result.output)
347
    return result.output
348

    
349
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
350
  for line in result.stdout.splitlines():
351
    line = line.strip()
352
    match = valid_line_re.match(line)
353
    if not match:
354
      logging.error("Invalid line returned from lvs output: '%s'", line)
355
      continue
356
    name, size, attr = match.groups()
357
    inactive = attr[4] == '-'
358
    online = attr[5] == 'o'
359
    lvs[name] = (size, inactive, online)
360

    
361
  return lvs
362

    
363

    
364
def ListVolumeGroups():
365
  """List the volume groups and their size.
366

367
  Returns:
368
    Dictionary with keys volume name and values the size of the volume
369

370
  """
371
  return utils.ListVolumeGroups()
372

    
373

    
374
def NodeVolumes():
375
  """List all volumes on this node.
376

377
  """
378
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
379
                         "--separator=|",
380
                         "--options=lv_name,lv_size,devices,vg_name"])
381
  if result.failed:
382
    logging.error("Failed to list logical volumes, lvs output: %s",
383
                  result.output)
384
    return {}
385

    
386
  def parse_dev(dev):
387
    if '(' in dev:
388
      return dev.split('(')[0]
389
    else:
390
      return dev
391

    
392
  def map_line(line):
393
    return {
394
      'name': line[0].strip(),
395
      'size': line[1].strip(),
396
      'dev': parse_dev(line[2].strip()),
397
      'vg': line[3].strip(),
398
    }
399

    
400
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
401
          if line.count('|') >= 3]
402

    
403

    
404
def BridgesExist(bridges_list):
405
  """Check if a list of bridges exist on the current node.
406

407
  Returns:
408
    True if all of them exist, false otherwise
409

410
  """
411
  for bridge in bridges_list:
412
    if not utils.BridgeExists(bridge):
413
      return False
414

    
415
  return True
416

    
417

    
418
def GetInstanceList():
419
  """Provides a list of instances.
420

421
  Returns:
422
    A list of all running instances on the current node
423
    - instance1.example.com
424
    - instance2.example.com
425

426
  """
427
  try:
428
    names = hypervisor.GetHypervisor(_GetConfig()).ListInstances()
429
  except errors.HypervisorError, err:
430
    logging.exception("Error enumerating instances")
431
    raise
432

    
433
  return names
434

    
435

    
436
def GetInstanceInfo(instance):
437
  """Gives back the informations about an instance as a dictionary.
438

439
  Args:
440
    instance: name of the instance (ex. instance1.example.com)
441

442
  Returns:
443
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
444
    where
445
    memory: memory size of instance (int)
446
    state: xen state of instance (string)
447
    time: cpu time of instance (float)
448

449
  """
450
  output = {}
451

    
452
  iinfo = hypervisor.GetHypervisor(_GetConfig()).GetInstanceInfo(instance)
453
  if iinfo is not None:
454
    output['memory'] = iinfo[2]
455
    output['state'] = iinfo[4]
456
    output['time'] = iinfo[5]
457

    
458
  return output
459

    
460

    
461
def GetAllInstancesInfo():
462
  """Gather data about all instances.
463

464
  This is the equivalent of `GetInstanceInfo()`, except that it
465
  computes data for all instances at once, thus being faster if one
466
  needs data about more than one instance.
467

468
  Returns: a dictionary of dictionaries, keys being the instance name,
469
    and with values:
470
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
471
    where
472
    memory: memory size of instance (int)
473
    state: xen state of instance (string)
474
    time: cpu time of instance (float)
475
    vcpus: the number of cpus
476

477
  """
478
  output = {}
479

    
480
  iinfo = hypervisor.GetHypervisor(_GetConfig()).GetAllInstancesInfo()
481
  if iinfo:
482
    for name, inst_id, memory, vcpus, state, times in iinfo:
483
      output[name] = {
484
        'memory': memory,
485
        'vcpus': vcpus,
486
        'state': state,
487
        'time': times,
488
        }
489

    
490
  return output
491

    
492

    
493
def AddOSToInstance(instance, os_disk, swap_disk):
494
  """Add an OS to an instance.
495

496
  Args:
497
    instance: the instance object
498
    os_disk: the instance-visible name of the os device
499
    swap_disk: the instance-visible name of the swap device
500

501
  """
502
  cfg = _GetConfig()
503
  inst_os = OSFromDisk(instance.os)
504

    
505
  create_script = inst_os.create_script
506

    
507
  os_device = instance.FindDisk(os_disk)
508
  if os_device is None:
509
    logging.error("Can't find this device-visible name '%s'", os_disk)
510
    return False
511

    
512
  swap_device = instance.FindDisk(swap_disk)
513
  if swap_device is None:
514
    logging.error("Can't find this device-visible name '%s'", swap_disk)
515
    return False
516

    
517
  real_os_dev = _RecursiveFindBD(os_device)
518
  if real_os_dev is None:
519
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
520
                                  str(os_device))
521
  real_os_dev.Open()
522

    
523
  real_swap_dev = _RecursiveFindBD(swap_device)
524
  if real_swap_dev is None:
525
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
526
                                  str(swap_device))
527
  real_swap_dev.Open()
528

    
529
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
530
                                     instance.name, int(time.time()))
531
  if not os.path.exists(constants.LOG_OS_DIR):
532
    os.mkdir(constants.LOG_OS_DIR, 0750)
533

    
534
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
535
                                inst_os.path, create_script, instance.name,
536
                                real_os_dev.dev_path, real_swap_dev.dev_path,
537
                                logfile)
538
  env = {'HYPERVISOR': cfg.GetHypervisorType()}
539

    
540
  result = utils.RunCmd(command, env=env)
541
  if result.failed:
542
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
543
                  " output: %s", command, result.fail_reason, logfile,
544
                  result.output)
545
    return False
546

    
547
  return True
548

    
549

    
550
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
551
  """Run the OS rename script for an instance.
552

553
  Args:
554
    instance: the instance object
555
    old_name: the old name of the instance
556
    os_disk: the instance-visible name of the os device
557
    swap_disk: the instance-visible name of the swap device
558

559
  """
560
  inst_os = OSFromDisk(instance.os)
561

    
562
  script = inst_os.rename_script
563

    
564
  os_device = instance.FindDisk(os_disk)
565
  if os_device is None:
566
    logging.error("Can't find this device-visible name '%s'", os_disk)
567
    return False
568

    
569
  swap_device = instance.FindDisk(swap_disk)
570
  if swap_device is None:
571
    logging.error("Can't find this device-visible name '%s'", swap_disk)
572
    return False
573

    
574
  real_os_dev = _RecursiveFindBD(os_device)
575
  if real_os_dev is None:
576
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
577
                                  str(os_device))
578
  real_os_dev.Open()
579

    
580
  real_swap_dev = _RecursiveFindBD(swap_device)
581
  if real_swap_dev is None:
582
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
583
                                  str(swap_device))
584
  real_swap_dev.Open()
585

    
586
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
587
                                           old_name,
588
                                           instance.name, int(time.time()))
589
  if not os.path.exists(constants.LOG_OS_DIR):
590
    os.mkdir(constants.LOG_OS_DIR, 0750)
591

    
592
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
593
                                inst_os.path, script, old_name, instance.name,
594
                                real_os_dev.dev_path, real_swap_dev.dev_path,
595
                                logfile)
596

    
597
  result = utils.RunCmd(command)
598

    
599
  if result.failed:
600
    logging.error("os create command '%s' returned error: %s output: %s",
601
                  command, result.fail_reason, result.output)
602
    return False
603

    
604
  return True
605

    
606

    
607
def _GetVGInfo(vg_name):
608
  """Get informations about the volume group.
609

610
  Args:
611
    vg_name: the volume group
612

613
  Returns:
614
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
615
    where
616
    vg_size is the total size of the volume group in MiB
617
    vg_free is the free size of the volume group in MiB
618
    pv_count are the number of physical disks in that vg
619

620
  If an error occurs during gathering of data, we return the same dict
621
  with keys all set to None.
622

623
  """
624
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
625

    
626
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
627
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
628

    
629
  if retval.failed:
630
    logging.error("volume group %s not present", vg_name)
631
    return retdic
632
  valarr = retval.stdout.strip().rstrip(':').split(':')
633
  if len(valarr) == 3:
634
    try:
635
      retdic = {
636
        "vg_size": int(round(float(valarr[0]), 0)),
637
        "vg_free": int(round(float(valarr[1]), 0)),
638
        "pv_count": int(valarr[2]),
639
        }
640
    except ValueError, err:
641
      logging.exception("Fail to parse vgs output")
642
  else:
643
    logging.error("vgs output has the wrong number of fields (expected"
644
                  " three): %s", str(valarr))
645
  return retdic
646

    
647

    
648
def _GatherBlockDevs(instance):
649
  """Set up an instance's block device(s).
650

651
  This is run on the primary node at instance startup. The block
652
  devices must be already assembled.
653

654
  """
655
  block_devices = []
656
  for disk in instance.disks:
657
    device = _RecursiveFindBD(disk)
658
    if device is None:
659
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
660
                                    str(disk))
661
    device.Open()
662
    block_devices.append((disk, device))
663
  return block_devices
664

    
665

    
666
def StartInstance(instance, extra_args):
667
  """Start an instance.
668

669
  Args:
670
    instance - name of instance to start.
671

672
  """
673
  running_instances = GetInstanceList()
674

    
675
  if instance.name in running_instances:
676
    return True
677

    
678
  block_devices = _GatherBlockDevs(instance)
679
  hyper = hypervisor.GetHypervisor(_GetConfig())
680

    
681
  try:
682
    hyper.StartInstance(instance, block_devices, extra_args)
683
  except errors.HypervisorError, err:
684
    logging.exception("Failed to start instance")
685
    return False
686

    
687
  return True
688

    
689

    
690
def ShutdownInstance(instance):
691
  """Shut an instance down.
692

693
  Args:
694
    instance - name of instance to shutdown.
695

696
  """
697
  running_instances = GetInstanceList()
698

    
699
  if instance.name not in running_instances:
700
    return True
701

    
702
  hyper = hypervisor.GetHypervisor(_GetConfig())
703
  try:
704
    hyper.StopInstance(instance)
705
  except errors.HypervisorError, err:
706
    logging.error("Failed to stop instance")
707
    return False
708

    
709
  # test every 10secs for 2min
710
  shutdown_ok = False
711

    
712
  time.sleep(1)
713
  for dummy in range(11):
714
    if instance.name not in GetInstanceList():
715
      break
716
    time.sleep(10)
717
  else:
718
    # the shutdown did not succeed
719
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
720

    
721
    try:
722
      hyper.StopInstance(instance, force=True)
723
    except errors.HypervisorError, err:
724
      logging.exception("Failed to stop instance")
725
      return False
726

    
727
    time.sleep(1)
728
    if instance.name in GetInstanceList():
729
      logging.error("could not shutdown instance '%s' even by destroy",
730
                    instance.name)
731
      return False
732

    
733
  return True
734

    
735

    
736
def RebootInstance(instance, reboot_type, extra_args):
737
  """Reboot an instance.
738

739
  Args:
740
    instance    - name of instance to reboot
741
    reboot_type - how to reboot [soft,hard,full]
742

743
  """
744
  running_instances = GetInstanceList()
745

    
746
  if instance.name not in running_instances:
747
    logging.error("Cannot reboot instance that is not running")
748
    return False
749

    
750
  hyper = hypervisor.GetHypervisor(_GetConfig())
751
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
752
    try:
753
      hyper.RebootInstance(instance)
754
    except errors.HypervisorError, err:
755
      logging.exception("Failed to soft reboot instance")
756
      return False
757
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
758
    try:
759
      ShutdownInstance(instance)
760
      StartInstance(instance, extra_args)
761
    except errors.HypervisorError, err:
762
      logging.exception("Failed to hard reboot instance")
763
      return False
764
  else:
765
    raise errors.ParameterError("reboot_type invalid")
766

    
767

    
768
  return True
769

    
770

    
771
def MigrateInstance(instance, target, live):
772
  """Migrates an instance to another node.
773

774
  """
775
  hyper = hypervisor.GetHypervisor(_GetConfig())
776

    
777
  try:
778
    hyper.MigrateInstance(instance, target, live)
779
  except errors.HypervisorError, err:
780
    msg = "Failed to migrate instance: %s" % str(err)
781
    logging.error(msg)
782
    return (False, msg)
783
  return (True, "Migration successfull")
784

    
785

    
786
def CreateBlockDevice(disk, size, owner, on_primary, info):
787
  """Creates a block device for an instance.
788

789
  Args:
790
   disk: a ganeti.objects.Disk object
791
   size: the size of the physical underlying device
792
   owner: a string with the name of the instance
793
   on_primary: a boolean indicating if it is the primary node or not
794
   info: string that will be sent to the physical device creation
795

796
  Returns:
797
    the new unique_id of the device (this can sometime be
798
    computed only after creation), or None. On secondary nodes,
799
    it's not required to return anything.
800

801
  """
802
  clist = []
803
  if disk.children:
804
    for child in disk.children:
805
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
806
      if on_primary or disk.AssembleOnSecondary():
807
        # we need the children open in case the device itself has to
808
        # be assembled
809
        crdev.Open()
810
      clist.append(crdev)
811
  try:
812
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
813
    if device is not None:
814
      logging.info("removing existing device %s", disk)
815
      device.Remove()
816
  except errors.BlockDeviceError, err:
817
    pass
818

    
819
  device = bdev.Create(disk.dev_type, disk.physical_id,
820
                       clist, size)
821
  if device is None:
822
    raise ValueError("Can't create child device for %s, %s" %
823
                     (disk, size))
824
  if on_primary or disk.AssembleOnSecondary():
825
    if not device.Assemble():
826
      errorstring = "Can't assemble device after creation"
827
      logging.error(errorstring)
828
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
829
                                    " daemon logs" % errorstring)
830
    device.SetSyncSpeed(constants.SYNC_SPEED)
831
    if on_primary or disk.OpenOnSecondary():
832
      device.Open(force=True)
833
    DevCacheManager.UpdateCache(device.dev_path, owner,
834
                                on_primary, disk.iv_name)
835

    
836
  device.SetInfo(info)
837

    
838
  physical_id = device.unique_id
839
  return physical_id
840

    
841

    
842
def RemoveBlockDevice(disk):
843
  """Remove a block device.
844

845
  This is intended to be called recursively.
846

847
  """
848
  try:
849
    # since we are removing the device, allow a partial match
850
    # this allows removal of broken mirrors
851
    rdev = _RecursiveFindBD(disk, allow_partial=True)
852
  except errors.BlockDeviceError, err:
853
    # probably can't attach
854
    logging.info("Can't attach to device %s in remove", disk)
855
    rdev = None
856
  if rdev is not None:
857
    r_path = rdev.dev_path
858
    result = rdev.Remove()
859
    if result:
860
      DevCacheManager.RemoveCache(r_path)
861
  else:
862
    result = True
863
  if disk.children:
864
    for child in disk.children:
865
      result = result and RemoveBlockDevice(child)
866
  return result
867

    
868

    
869
def _RecursiveAssembleBD(disk, owner, as_primary):
870
  """Activate a block device for an instance.
871

872
  This is run on the primary and secondary nodes for an instance.
873

874
  This function is called recursively.
875

876
  Args:
877
    disk: a objects.Disk object
878
    as_primary: if we should make the block device read/write
879

880
  Returns:
881
    the assembled device or None (in case no device was assembled)
882

883
  If the assembly is not successful, an exception is raised.
884

885
  """
886
  children = []
887
  if disk.children:
888
    mcn = disk.ChildrenNeeded()
889
    if mcn == -1:
890
      mcn = 0 # max number of Nones allowed
891
    else:
892
      mcn = len(disk.children) - mcn # max number of Nones
893
    for chld_disk in disk.children:
894
      try:
895
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
896
      except errors.BlockDeviceError, err:
897
        if children.count(None) >= mcn:
898
          raise
899
        cdev = None
900
        logging.debug("Error in child activation: %s", str(err))
901
      children.append(cdev)
902

    
903
  if as_primary or disk.AssembleOnSecondary():
904
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
905
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
906
    result = r_dev
907
    if as_primary or disk.OpenOnSecondary():
908
      r_dev.Open()
909
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
910
                                as_primary, disk.iv_name)
911

    
912
  else:
913
    result = True
914
  return result
915

    
916

    
917
def AssembleBlockDevice(disk, owner, as_primary):
918
  """Activate a block device for an instance.
919

920
  This is a wrapper over _RecursiveAssembleBD.
921

922
  Returns:
923
    a /dev path for primary nodes
924
    True for secondary nodes
925

926
  """
927
  result = _RecursiveAssembleBD(disk, owner, as_primary)
928
  if isinstance(result, bdev.BlockDev):
929
    result = result.dev_path
930
  return result
931

    
932

    
933
def ShutdownBlockDevice(disk):
934
  """Shut down a block device.
935

936
  First, if the device is assembled (can `Attach()`), then the device
937
  is shutdown. Then the children of the device are shutdown.
938

939
  This function is called recursively. Note that we don't cache the
940
  children or such, as oppossed to assemble, shutdown of different
941
  devices doesn't require that the upper device was active.
942

943
  """
944
  r_dev = _RecursiveFindBD(disk)
945
  if r_dev is not None:
946
    r_path = r_dev.dev_path
947
    result = r_dev.Shutdown()
948
    if result:
949
      DevCacheManager.RemoveCache(r_path)
950
  else:
951
    result = True
952
  if disk.children:
953
    for child in disk.children:
954
      result = result and ShutdownBlockDevice(child)
955
  return result
956

    
957

    
958
def MirrorAddChildren(parent_cdev, new_cdevs):
959
  """Extend a mirrored block device.
960

961
  """
962
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
963
  if parent_bdev is None:
964
    logging.error("Can't find parent device")
965
    return False
966
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
967
  if new_bdevs.count(None) > 0:
968
    logging.error("Can't find new device(s) to add: %s:%s",
969
                  new_bdevs, new_cdevs)
970
    return False
971
  parent_bdev.AddChildren(new_bdevs)
972
  return True
973

    
974

    
975
def MirrorRemoveChildren(parent_cdev, new_cdevs):
976
  """Shrink a mirrored block device.
977

978
  """
979
  parent_bdev = _RecursiveFindBD(parent_cdev)
980
  if parent_bdev is None:
981
    logging.error("Can't find parent in remove children: %s", parent_cdev)
982
    return False
983
  devs = []
984
  for disk in new_cdevs:
985
    rpath = disk.StaticDevPath()
986
    if rpath is None:
987
      bd = _RecursiveFindBD(disk)
988
      if bd is None:
989
        logging.error("Can't find dynamic device %s while removing children",
990
                      disk)
991
        return False
992
      else:
993
        devs.append(bd.dev_path)
994
    else:
995
      devs.append(rpath)
996
  parent_bdev.RemoveChildren(devs)
997
  return True
998

    
999

    
1000
def GetMirrorStatus(disks):
1001
  """Get the mirroring status of a list of devices.
1002

1003
  Args:
1004
    disks: list of `objects.Disk`
1005

1006
  Returns:
1007
    list of (mirror_done, estimated_time) tuples, which
1008
    are the result of bdev.BlockDevice.CombinedSyncStatus()
1009

1010
  """
1011
  stats = []
1012
  for dsk in disks:
1013
    rbd = _RecursiveFindBD(dsk)
1014
    if rbd is None:
1015
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1016
    stats.append(rbd.CombinedSyncStatus())
1017
  return stats
1018

    
1019

    
1020
def _RecursiveFindBD(disk, allow_partial=False):
1021
  """Check if a device is activated.
1022

1023
  If so, return informations about the real device.
1024

1025
  Args:
1026
    disk: the objects.Disk instance
1027
    allow_partial: don't abort the find if a child of the
1028
                   device can't be found; this is intended to be
1029
                   used when repairing mirrors
1030

1031
  Returns:
1032
    None if the device can't be found
1033
    otherwise the device instance
1034

1035
  """
1036
  children = []
1037
  if disk.children:
1038
    for chdisk in disk.children:
1039
      children.append(_RecursiveFindBD(chdisk))
1040

    
1041
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1042

    
1043

    
1044
def FindBlockDevice(disk):
1045
  """Check if a device is activated.
1046

1047
  If so, return informations about the real device.
1048

1049
  Args:
1050
    disk: the objects.Disk instance
1051
  Returns:
1052
    None if the device can't be found
1053
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
1054

1055
  """
1056
  rbd = _RecursiveFindBD(disk)
1057
  if rbd is None:
1058
    return rbd
1059
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1060

    
1061

    
1062
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1063
  """Write a file to the filesystem.
1064

1065
  This allows the master to overwrite(!) a file. It will only perform
1066
  the operation if the file belongs to a list of configuration files.
1067

1068
  """
1069
  if not os.path.isabs(file_name):
1070
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1071
                  file_name)
1072
    return False
1073

    
1074
  allowed_files = [
1075
    constants.CLUSTER_CONF_FILE,
1076
    constants.ETC_HOSTS,
1077
    constants.SSH_KNOWN_HOSTS_FILE,
1078
    constants.VNC_PASSWORD_FILE,
1079
    ]
1080

    
1081
  if file_name not in allowed_files:
1082
    logging.error("Filename passed to UploadFile not in allowed"
1083
                 " upload targets: '%s'", file_name)
1084
    return False
1085

    
1086
  utils.WriteFile(file_name, data=data, mode=mode, uid=uid, gid=gid,
1087
                  atime=atime, mtime=mtime)
1088
  return True
1089

    
1090

    
1091
def _ErrnoOrStr(err):
1092
  """Format an EnvironmentError exception.
1093

1094
  If the `err` argument has an errno attribute, it will be looked up
1095
  and converted into a textual EXXXX description. Otherwise the string
1096
  representation of the error will be returned.
1097

1098
  """
1099
  if hasattr(err, 'errno'):
1100
    detail = errno.errorcode[err.errno]
1101
  else:
1102
    detail = str(err)
1103
  return detail
1104

    
1105

    
1106
def _OSOndiskVersion(name, os_dir):
1107
  """Compute and return the API version of a given OS.
1108

1109
  This function will try to read the API version of the os given by
1110
  the 'name' parameter and residing in the 'os_dir' directory.
1111

1112
  Return value will be either an integer denoting the version or None in the
1113
  case when this is not a valid OS name.
1114

1115
  """
1116
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1117

    
1118
  try:
1119
    st = os.stat(api_file)
1120
  except EnvironmentError, err:
1121
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1122
                           " found (%s)" % _ErrnoOrStr(err))
1123

    
1124
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1125
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1126
                           " a regular file")
1127

    
1128
  try:
1129
    f = open(api_file)
1130
    try:
1131
      api_version = f.read(256)
1132
    finally:
1133
      f.close()
1134
  except EnvironmentError, err:
1135
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1136
                           " API version (%s)" % _ErrnoOrStr(err))
1137

    
1138
  api_version = api_version.strip()
1139
  try:
1140
    api_version = int(api_version)
1141
  except (TypeError, ValueError), err:
1142
    raise errors.InvalidOS(name, os_dir,
1143
                           "API version is not integer (%s)" % str(err))
1144

    
1145
  return api_version
1146

    
1147

    
1148
def DiagnoseOS(top_dirs=None):
1149
  """Compute the validity for all OSes.
1150

1151
  Returns an OS object for each name in all the given top directories
1152
  (if not given defaults to constants.OS_SEARCH_PATH)
1153

1154
  Returns:
1155
    list of OS objects
1156

1157
  """
1158
  if top_dirs is None:
1159
    top_dirs = constants.OS_SEARCH_PATH
1160

    
1161
  result = []
1162
  for dir_name in top_dirs:
1163
    if os.path.isdir(dir_name):
1164
      try:
1165
        f_names = utils.ListVisibleFiles(dir_name)
1166
      except EnvironmentError, err:
1167
        logging.exception("Can't list the OS directory %s", dir_name)
1168
        break
1169
      for name in f_names:
1170
        try:
1171
          os_inst = OSFromDisk(name, base_dir=dir_name)
1172
          result.append(os_inst)
1173
        except errors.InvalidOS, err:
1174
          result.append(objects.OS.FromInvalidOS(err))
1175

    
1176
  return result
1177

    
1178

    
1179
def OSFromDisk(name, base_dir=None):
1180
  """Create an OS instance from disk.
1181

1182
  This function will return an OS instance if the given name is a
1183
  valid OS name. Otherwise, it will raise an appropriate
1184
  `errors.InvalidOS` exception, detailing why this is not a valid
1185
  OS.
1186

1187
  Args:
1188
    os_dir: Directory containing the OS scripts. Defaults to a search
1189
            in all the OS_SEARCH_PATH directories.
1190

1191
  """
1192

    
1193
  if base_dir is None:
1194
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1195
    if os_dir is None:
1196
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1197
  else:
1198
    os_dir = os.path.sep.join([base_dir, name])
1199

    
1200
  api_version = _OSOndiskVersion(name, os_dir)
1201

    
1202
  if api_version != constants.OS_API_VERSION:
1203
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1204
                           " (found %s want %s)"
1205
                           % (api_version, constants.OS_API_VERSION))
1206

    
1207
  # OS Scripts dictionary, we will populate it with the actual script names
1208
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1209

    
1210
  for script in os_scripts:
1211
    os_scripts[script] = os.path.sep.join([os_dir, script])
1212

    
1213
    try:
1214
      st = os.stat(os_scripts[script])
1215
    except EnvironmentError, err:
1216
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1217
                             (script, _ErrnoOrStr(err)))
1218

    
1219
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1220
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1221
                             script)
1222

    
1223
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1224
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1225
                             script)
1226

    
1227

    
1228
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1229
                    create_script=os_scripts['create'],
1230
                    export_script=os_scripts['export'],
1231
                    import_script=os_scripts['import'],
1232
                    rename_script=os_scripts['rename'],
1233
                    api_version=api_version)
1234

    
1235

    
1236
def GrowBlockDevice(disk, amount):
1237
  """Grow a stack of block devices.
1238

1239
  This function is called recursively, with the childrens being the
1240
  first one resize.
1241

1242
  Args:
1243
    disk: the disk to be grown
1244

1245
  Returns: a tuple of (status, result), with:
1246
    status: the result (true/false) of the operation
1247
    result: the error message if the operation failed, otherwise not used
1248

1249
  """
1250
  r_dev = _RecursiveFindBD(disk)
1251
  if r_dev is None:
1252
    return False, "Cannot find block device %s" % (disk,)
1253

    
1254
  try:
1255
    r_dev.Grow(amount)
1256
  except errors.BlockDeviceError, err:
1257
    return False, str(err)
1258

    
1259
  return True, None
1260

    
1261

    
1262
def SnapshotBlockDevice(disk):
1263
  """Create a snapshot copy of a block device.
1264

1265
  This function is called recursively, and the snapshot is actually created
1266
  just for the leaf lvm backend device.
1267

1268
  Args:
1269
    disk: the disk to be snapshotted
1270

1271
  Returns:
1272
    a config entry for the actual lvm device snapshotted.
1273

1274
  """
1275
  if disk.children:
1276
    if len(disk.children) == 1:
1277
      # only one child, let's recurse on it
1278
      return SnapshotBlockDevice(disk.children[0])
1279
    else:
1280
      # more than one child, choose one that matches
1281
      for child in disk.children:
1282
        if child.size == disk.size:
1283
          # return implies breaking the loop
1284
          return SnapshotBlockDevice(child)
1285
  elif disk.dev_type == constants.LD_LV:
1286
    r_dev = _RecursiveFindBD(disk)
1287
    if r_dev is not None:
1288
      # let's stay on the safe side and ask for the full size, for now
1289
      return r_dev.Snapshot(disk.size)
1290
    else:
1291
      return None
1292
  else:
1293
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1294
                                 " '%s' of type '%s'" %
1295
                                 (disk.unique_id, disk.dev_type))
1296

    
1297

    
1298
def ExportSnapshot(disk, dest_node, instance):
1299
  """Export a block device snapshot to a remote node.
1300

1301
  Args:
1302
    disk: the snapshot block device
1303
    dest_node: the node to send the image to
1304
    instance: instance being exported
1305

1306
  Returns:
1307
    True if successful, False otherwise.
1308

1309
  """
1310
  inst_os = OSFromDisk(instance.os)
1311
  export_script = inst_os.export_script
1312

    
1313
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1314
                                     instance.name, int(time.time()))
1315
  if not os.path.exists(constants.LOG_OS_DIR):
1316
    os.mkdir(constants.LOG_OS_DIR, 0750)
1317

    
1318
  real_os_dev = _RecursiveFindBD(disk)
1319
  if real_os_dev is None:
1320
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1321
                                  str(disk))
1322
  real_os_dev.Open()
1323

    
1324
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1325
  destfile = disk.physical_id[1]
1326

    
1327
  # the target command is built out of three individual commands,
1328
  # which are joined by pipes; we check each individual command for
1329
  # valid parameters
1330

    
1331
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1332
                               export_script, instance.name,
1333
                               real_os_dev.dev_path, logfile)
1334

    
1335
  comprcmd = "gzip"
1336

    
1337
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1338
                                destdir, destdir, destfile)
1339
  remotecmd = _GetSshRunner().BuildCmd(dest_node, constants.GANETI_RUNAS,
1340
                                       destcmd)
1341

    
1342
  # all commands have been checked, so we're safe to combine them
1343
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1344

    
1345
  result = utils.RunCmd(command)
1346

    
1347
  if result.failed:
1348
    logging.error("os snapshot export command '%s' returned error: %s"
1349
                  " output: %s", command, result.fail_reason, result.output)
1350
    return False
1351

    
1352
  return True
1353

    
1354

    
1355
def FinalizeExport(instance, snap_disks):
1356
  """Write out the export configuration information.
1357

1358
  Args:
1359
    instance: instance configuration
1360
    snap_disks: snapshot block devices
1361

1362
  Returns:
1363
    False in case of error, True otherwise.
1364

1365
  """
1366
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1367
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1368

    
1369
  config = objects.SerializableConfigParser()
1370

    
1371
  config.add_section(constants.INISECT_EXP)
1372
  config.set(constants.INISECT_EXP, 'version', '0')
1373
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1374
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1375
  config.set(constants.INISECT_EXP, 'os', instance.os)
1376
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1377

    
1378
  config.add_section(constants.INISECT_INS)
1379
  config.set(constants.INISECT_INS, 'name', instance.name)
1380
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1381
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1382
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1383

    
1384
  nic_count = 0
1385
  for nic_count, nic in enumerate(instance.nics):
1386
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1387
               nic_count, '%s' % nic.mac)
1388
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1389
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1390
               '%s' % nic.bridge)
1391
  # TODO: redundant: on load can read nics until it doesn't exist
1392
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1393

    
1394
  disk_count = 0
1395
  for disk_count, disk in enumerate(snap_disks):
1396
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1397
               ('%s' % disk.iv_name))
1398
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1399
               ('%s' % disk.physical_id[1]))
1400
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1401
               ('%d' % disk.size))
1402
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1403

    
1404
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1405
  cfo = open(cff, 'w')
1406
  try:
1407
    config.write(cfo)
1408
  finally:
1409
    cfo.close()
1410

    
1411
  shutil.rmtree(finaldestdir, True)
1412
  shutil.move(destdir, finaldestdir)
1413

    
1414
  return True
1415

    
1416

    
1417
def ExportInfo(dest):
1418
  """Get export configuration information.
1419

1420
  Args:
1421
    dest: directory containing the export
1422

1423
  Returns:
1424
    A serializable config file containing the export info.
1425

1426
  """
1427
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1428

    
1429
  config = objects.SerializableConfigParser()
1430
  config.read(cff)
1431

    
1432
  if (not config.has_section(constants.INISECT_EXP) or
1433
      not config.has_section(constants.INISECT_INS)):
1434
    return None
1435

    
1436
  return config
1437

    
1438

    
1439
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1440
  """Import an os image into an instance.
1441

1442
  Args:
1443
    instance: the instance object
1444
    os_disk: the instance-visible name of the os device
1445
    swap_disk: the instance-visible name of the swap device
1446
    src_node: node holding the source image
1447
    src_image: path to the source image on src_node
1448

1449
  Returns:
1450
    False in case of error, True otherwise.
1451

1452
  """
1453
  cfg = _GetConfig()
1454
  inst_os = OSFromDisk(instance.os)
1455
  import_script = inst_os.import_script
1456

    
1457
  os_device = instance.FindDisk(os_disk)
1458
  if os_device is None:
1459
    logging.error("Can't find this device-visible name '%s'", os_disk)
1460
    return False
1461

    
1462
  swap_device = instance.FindDisk(swap_disk)
1463
  if swap_device is None:
1464
    logging.error("Can't find this device-visible name '%s'", swap_disk)
1465
    return False
1466

    
1467
  real_os_dev = _RecursiveFindBD(os_device)
1468
  if real_os_dev is None:
1469
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1470
                                  str(os_device))
1471
  real_os_dev.Open()
1472

    
1473
  real_swap_dev = _RecursiveFindBD(swap_device)
1474
  if real_swap_dev is None:
1475
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1476
                                  str(swap_device))
1477
  real_swap_dev.Open()
1478

    
1479
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1480
                                        instance.name, int(time.time()))
1481
  if not os.path.exists(constants.LOG_OS_DIR):
1482
    os.mkdir(constants.LOG_OS_DIR, 0750)
1483

    
1484
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1485
  remotecmd = _GetSshRunner().BuildCmd(src_node, constants.GANETI_RUNAS,
1486
                                       destcmd)
1487

    
1488
  comprcmd = "gunzip"
1489
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1490
                               inst_os.path, import_script, instance.name,
1491
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1492
                               logfile)
1493

    
1494
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1495
  env = {'HYPERVISOR': cfg.GetHypervisorType()}
1496

    
1497
  result = utils.RunCmd(command, env=env)
1498

    
1499
  if result.failed:
1500
    logging.error("os import command '%s' returned error: %s"
1501
                  " output: %s", command, result.fail_reason, result.output)
1502
    return False
1503

    
1504
  return True
1505

    
1506

    
1507
def ListExports():
1508
  """Return a list of exports currently available on this machine.
1509

1510
  """
1511
  if os.path.isdir(constants.EXPORT_DIR):
1512
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1513
  else:
1514
    return []
1515

    
1516

    
1517
def RemoveExport(export):
1518
  """Remove an existing export from the node.
1519

1520
  Args:
1521
    export: the name of the export to remove
1522

1523
  Returns:
1524
    False in case of error, True otherwise.
1525

1526
  """
1527
  target = os.path.join(constants.EXPORT_DIR, export)
1528

    
1529
  shutil.rmtree(target)
1530
  # TODO: catch some of the relevant exceptions and provide a pretty
1531
  # error message if rmtree fails.
1532

    
1533
  return True
1534

    
1535

    
1536
def RenameBlockDevices(devlist):
1537
  """Rename a list of block devices.
1538

1539
  The devlist argument is a list of tuples (disk, new_logical,
1540
  new_physical). The return value will be a combined boolean result
1541
  (True only if all renames succeeded).
1542

1543
  """
1544
  result = True
1545
  for disk, unique_id in devlist:
1546
    dev = _RecursiveFindBD(disk)
1547
    if dev is None:
1548
      result = False
1549
      continue
1550
    try:
1551
      old_rpath = dev.dev_path
1552
      dev.Rename(unique_id)
1553
      new_rpath = dev.dev_path
1554
      if old_rpath != new_rpath:
1555
        DevCacheManager.RemoveCache(old_rpath)
1556
        # FIXME: we should add the new cache information here, like:
1557
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1558
        # but we don't have the owner here - maybe parse from existing
1559
        # cache? for now, we only lose lvm data when we rename, which
1560
        # is less critical than DRBD or MD
1561
    except errors.BlockDeviceError, err:
1562
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1563
      result = False
1564
  return result
1565

    
1566

    
1567
def _TransformFileStorageDir(file_storage_dir):
1568
  """Checks whether given file_storage_dir is valid.
1569

1570
  Checks wheter the given file_storage_dir is within the cluster-wide
1571
  default file_storage_dir stored in SimpleStore. Only paths under that
1572
  directory are allowed.
1573

1574
  Args:
1575
    file_storage_dir: string with path
1576

1577
  Returns:
1578
    normalized file_storage_dir (string) if valid, None otherwise
1579

1580
  """
1581
  cfg = _GetConfig()
1582
  file_storage_dir = os.path.normpath(file_storage_dir)
1583
  base_file_storage_dir = cfg.GetFileStorageDir()
1584
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1585
      base_file_storage_dir):
1586
    logging.error("file storage directory '%s' is not under base file"
1587
                  " storage directory '%s'",
1588
                  file_storage_dir, base_file_storage_dir)
1589
    return None
1590
  return file_storage_dir
1591

    
1592

    
1593
def CreateFileStorageDir(file_storage_dir):
1594
  """Create file storage directory.
1595

1596
  Args:
1597
    file_storage_dir: string containing the path
1598

1599
  Returns:
1600
    tuple with first element a boolean indicating wheter dir
1601
    creation was successful or not
1602

1603
  """
1604
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1605
  result = True,
1606
  if not file_storage_dir:
1607
    result = False,
1608
  else:
1609
    if os.path.exists(file_storage_dir):
1610
      if not os.path.isdir(file_storage_dir):
1611
        logging.error("'%s' is not a directory", file_storage_dir)
1612
        result = False,
1613
    else:
1614
      try:
1615
        os.makedirs(file_storage_dir, 0750)
1616
      except OSError, err:
1617
        logging.error("Cannot create file storage directory '%s': %s",
1618
                      file_storage_dir, err)
1619
        result = False,
1620
  return result
1621

    
1622

    
1623
def RemoveFileStorageDir(file_storage_dir):
1624
  """Remove file storage directory.
1625

1626
  Remove it only if it's empty. If not log an error and return.
1627

1628
  Args:
1629
    file_storage_dir: string containing the path
1630

1631
  Returns:
1632
    tuple with first element a boolean indicating wheter dir
1633
    removal was successful or not
1634

1635
  """
1636
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1637
  result = True,
1638
  if not file_storage_dir:
1639
    result = False,
1640
  else:
1641
    if os.path.exists(file_storage_dir):
1642
      if not os.path.isdir(file_storage_dir):
1643
        logging.error("'%s' is not a directory", file_storage_dir)
1644
        result = False,
1645
      # deletes dir only if empty, otherwise we want to return False
1646
      try:
1647
        os.rmdir(file_storage_dir)
1648
      except OSError, err:
1649
        logging.exception("Cannot remove file storage directory '%s'",
1650
                          file_storage_dir)
1651
        result = False,
1652
  return result
1653

    
1654

    
1655
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1656
  """Rename the file storage directory.
1657

1658
  Args:
1659
    old_file_storage_dir: string containing the old path
1660
    new_file_storage_dir: string containing the new path
1661

1662
  Returns:
1663
    tuple with first element a boolean indicating wheter dir
1664
    rename was successful or not
1665

1666
  """
1667
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1668
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1669
  result = True,
1670
  if not old_file_storage_dir or not new_file_storage_dir:
1671
    result = False,
1672
  else:
1673
    if not os.path.exists(new_file_storage_dir):
1674
      if os.path.isdir(old_file_storage_dir):
1675
        try:
1676
          os.rename(old_file_storage_dir, new_file_storage_dir)
1677
        except OSError, err:
1678
          logging.exception("Cannot rename '%s' to '%s'",
1679
                            old_file_storage_dir, new_file_storage_dir)
1680
          result =  False,
1681
      else:
1682
        logging.error("'%s' is not a directory", old_file_storage_dir)
1683
        result = False,
1684
    else:
1685
      if os.path.exists(old_file_storage_dir):
1686
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1687
                      old_file_storage_dir, new_file_storage_dir)
1688
        result = False,
1689
  return result
1690

    
1691

    
1692
def _IsJobQueueFile(file_name):
1693
  """Checks whether the given filename is in the queue directory.
1694

1695
  """
1696
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1697
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1698

    
1699
  if not result:
1700
    logging.error("'%s' is not a file in the queue directory",
1701
                  file_name)
1702

    
1703
  return result
1704

    
1705

    
1706
def JobQueueUpdate(file_name, content):
1707
  """Updates a file in the queue directory.
1708

1709
  """
1710
  if not _IsJobQueueFile(file_name):
1711
    return False
1712

    
1713
  # Write and replace the file atomically
1714
  utils.WriteFile(file_name, data=content)
1715

    
1716
  return True
1717

    
1718

    
1719
def JobQueueRename(old, new):
1720
  """Renames a job queue file.
1721

1722
  """
1723
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1724
    return False
1725

    
1726
  os.rename(old, new)
1727

    
1728
  return True
1729

    
1730

    
1731
def CloseBlockDevices(disks):
1732
  """Closes the given block devices.
1733

1734
  This means they will be switched to secondary mode (in case of DRBD).
1735

1736
  """
1737
  bdevs = []
1738
  for cf in disks:
1739
    rd = _RecursiveFindBD(cf)
1740
    if rd is None:
1741
      return (False, "Can't find device %s" % cf)
1742
    bdevs.append(rd)
1743

    
1744
  msg = []
1745
  for rd in bdevs:
1746
    try:
1747
      rd.Close()
1748
    except errors.BlockDeviceError, err:
1749
      msg.append(str(err))
1750
  if msg:
1751
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
1752
  else:
1753
    return (True, "All devices secondary")
1754

    
1755

    
1756
class HooksRunner(object):
1757
  """Hook runner.
1758

1759
  This class is instantiated on the node side (ganeti-noded) and not on
1760
  the master side.
1761

1762
  """
1763
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1764

    
1765
  def __init__(self, hooks_base_dir=None):
1766
    """Constructor for hooks runner.
1767

1768
    Args:
1769
      - hooks_base_dir: if not None, this overrides the
1770
        constants.HOOKS_BASE_DIR (useful for unittests)
1771

1772
    """
1773
    if hooks_base_dir is None:
1774
      hooks_base_dir = constants.HOOKS_BASE_DIR
1775
    self._BASE_DIR = hooks_base_dir
1776

    
1777
  @staticmethod
1778
  def ExecHook(script, env):
1779
    """Exec one hook script.
1780

1781
    Args:
1782
     - script: the full path to the script
1783
     - env: the environment with which to exec the script
1784

1785
    """
1786
    # exec the process using subprocess and log the output
1787
    fdstdin = None
1788
    try:
1789
      fdstdin = open("/dev/null", "r")
1790
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1791
                               stderr=subprocess.STDOUT, close_fds=True,
1792
                               shell=False, cwd="/", env=env)
1793
      output = ""
1794
      try:
1795
        output = child.stdout.read(4096)
1796
        child.stdout.close()
1797
      except EnvironmentError, err:
1798
        output += "Hook script error: %s" % str(err)
1799

    
1800
      while True:
1801
        try:
1802
          result = child.wait()
1803
          break
1804
        except EnvironmentError, err:
1805
          if err.errno == errno.EINTR:
1806
            continue
1807
          raise
1808
    finally:
1809
      # try not to leak fds
1810
      for fd in (fdstdin, ):
1811
        if fd is not None:
1812
          try:
1813
            fd.close()
1814
          except EnvironmentError, err:
1815
            # just log the error
1816
            #logging.exception("Error while closing fd %s", fd)
1817
            pass
1818

    
1819
    return result == 0, output
1820

    
1821
  def RunHooks(self, hpath, phase, env):
1822
    """Run the scripts in the hooks directory.
1823

1824
    This method will not be usually overriden by child opcodes.
1825

1826
    """
1827
    if phase == constants.HOOKS_PHASE_PRE:
1828
      suffix = "pre"
1829
    elif phase == constants.HOOKS_PHASE_POST:
1830
      suffix = "post"
1831
    else:
1832
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1833
    rr = []
1834

    
1835
    subdir = "%s-%s.d" % (hpath, suffix)
1836
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1837
    try:
1838
      dir_contents = utils.ListVisibleFiles(dir_name)
1839
    except OSError, err:
1840
      # must log
1841
      return rr
1842

    
1843
    # we use the standard python sort order,
1844
    # so 00name is the recommended naming scheme
1845
    dir_contents.sort()
1846
    for relname in dir_contents:
1847
      fname = os.path.join(dir_name, relname)
1848
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1849
          self.RE_MASK.match(relname) is not None):
1850
        rrval = constants.HKR_SKIP
1851
        output = ""
1852
      else:
1853
        result, output = self.ExecHook(fname, env)
1854
        if not result:
1855
          rrval = constants.HKR_FAIL
1856
        else:
1857
          rrval = constants.HKR_SUCCESS
1858
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1859

    
1860
    return rr
1861

    
1862

    
1863
class IAllocatorRunner(object):
1864
  """IAllocator runner.
1865

1866
  This class is instantiated on the node side (ganeti-noded) and not on
1867
  the master side.
1868

1869
  """
1870
  def Run(self, name, idata):
1871
    """Run an iallocator script.
1872

1873
    Return value: tuple of:
1874
       - run status (one of the IARUN_ constants)
1875
       - stdout
1876
       - stderr
1877
       - fail reason (as from utils.RunResult)
1878

1879
    """
1880
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
1881
                                  os.path.isfile)
1882
    if alloc_script is None:
1883
      return (constants.IARUN_NOTFOUND, None, None, None)
1884

    
1885
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
1886
    try:
1887
      os.write(fd, idata)
1888
      os.close(fd)
1889
      result = utils.RunCmd([alloc_script, fin_name])
1890
      if result.failed:
1891
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
1892
                result.fail_reason)
1893
    finally:
1894
      os.unlink(fin_name)
1895

    
1896
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
1897

    
1898

    
1899
class DevCacheManager(object):
1900
  """Simple class for managing a cache of block device information.
1901

1902
  """
1903
  _DEV_PREFIX = "/dev/"
1904
  _ROOT_DIR = constants.BDEV_CACHE_DIR
1905

    
1906
  @classmethod
1907
  def _ConvertPath(cls, dev_path):
1908
    """Converts a /dev/name path to the cache file name.
1909

1910
    This replaces slashes with underscores and strips the /dev
1911
    prefix. It then returns the full path to the cache file
1912

1913
    """
1914
    if dev_path.startswith(cls._DEV_PREFIX):
1915
      dev_path = dev_path[len(cls._DEV_PREFIX):]
1916
    dev_path = dev_path.replace("/", "_")
1917
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
1918
    return fpath
1919

    
1920
  @classmethod
1921
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
1922
    """Updates the cache information for a given device.
1923

1924
    """
1925
    if dev_path is None:
1926
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
1927
      return
1928
    fpath = cls._ConvertPath(dev_path)
1929
    if on_primary:
1930
      state = "primary"
1931
    else:
1932
      state = "secondary"
1933
    if iv_name is None:
1934
      iv_name = "not_visible"
1935
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
1936
    try:
1937
      utils.WriteFile(fpath, data=fdata)
1938
    except EnvironmentError, err:
1939
      logging.exception("Can't update bdev cache for %s", dev_path)
1940

    
1941
  @classmethod
1942
  def RemoveCache(cls, dev_path):
1943
    """Remove data for a dev_path.
1944

1945
    """
1946
    if dev_path is None:
1947
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
1948
      return
1949
    fpath = cls._ConvertPath(dev_path)
1950
    try:
1951
      utils.RemoveFile(fpath)
1952
    except EnvironmentError, err:
1953
      logging.exception("Can't update bdev cache for %s", dev_path)