Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 70d9e3d8

History | View | Annotate | Download (39.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import tempfile
30
import stat
31
import errno
32
import re
33
import subprocess
34

    
35
from ganeti import logger
36
from ganeti import errors
37
from ganeti import utils
38
from ganeti import ssh
39
from ganeti import hypervisor
40
from ganeti import constants
41
from ganeti import bdev
42
from ganeti import objects
43
from ganeti import ssconf
44

    
45

    
46
def StartMaster():
47
  """Activate local node as master node.
48

49
  There are two needed steps for this:
50
    - run the master script
51
    - register the cron script
52

53
  """
54
  result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "start"])
55

    
56
  if result.failed:
57
    logger.Error("could not activate cluster interface with command %s,"
58
                 " error: '%s'" % (result.cmd, result.output))
59
    return False
60

    
61
  return True
62

    
63

    
64
def StopMaster():
65
  """Deactivate this node as master.
66

67
  This does two things:
68
    - run the master stop script
69
    - remove link to master cron script.
70

71
  """
72
  result = utils.RunCmd([constants.MASTER_SCRIPT, "-d", "stop"])
73

    
74
  if result.failed:
75
    logger.Error("could not deactivate cluster interface with command %s,"
76
                 " error: '%s'" % (result.cmd, result.output))
77
    return False
78

    
79
  return True
80

    
81

    
82
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
83
  """Joins this node to the cluster.
84

85
  This does the following:
86
      - updates the hostkeys of the machine (rsa and dsa)
87
      - adds the ssh private key to the user
88
      - adds the ssh public key to the users' authorized_keys file
89

90
  """
91
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
92
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
93
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
94
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
95
  for name, content, mode in sshd_keys:
96
    utils.WriteFile(name, data=content, mode=mode)
97

    
98
  try:
99
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
100
                                                    mkdir=True)
101
  except errors.OpExecError, err:
102
    logger.Error("Error while processing user ssh files: %s" % err)
103
    return False
104

    
105
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
106
    utils.WriteFile(name, data=content, mode=0600)
107

    
108
  utils.AddAuthorizedKey(auth_keys, sshpub)
109

    
110
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
111

    
112
  return True
113

    
114

    
115
def LeaveCluster():
116
  """Cleans up the current node and prepares it to be removed from the cluster.
117

118
  """
119
  if os.path.isdir(constants.DATA_DIR):
120
    for rel_name in utils.ListVisibleFiles(constants.DATA_DIR):
121
      full_name = os.path.join(constants.DATA_DIR, rel_name)
122
      if os.path.isfile(full_name) and not os.path.islink(full_name):
123
        utils.RemoveFile(full_name)
124

    
125

    
126
  try:
127
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
128
  except errors.OpExecError, err:
129
    logger.Error("Error while processing ssh files: %s" % err)
130
    return
131

    
132
  f = open(pub_key, 'r')
133
  try:
134
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
135
  finally:
136
    f.close()
137

    
138
  utils.RemoveFile(priv_key)
139
  utils.RemoveFile(pub_key)
140

    
141

    
142
def GetNodeInfo(vgname):
143
  """Gives back a hash with different informations about the node.
144

145
  Returns:
146
    { 'vg_size' : xxx,  'vg_free' : xxx, 'memory_domain0': xxx,
147
      'memory_free' : xxx, 'memory_total' : xxx }
148
    where
149
    vg_size is the size of the configured volume group in MiB
150
    vg_free is the free size of the volume group in MiB
151
    memory_dom0 is the memory allocated for domain0 in MiB
152
    memory_free is the currently available (free) ram in MiB
153
    memory_total is the total number of ram in MiB
154

155
  """
156
  outputarray = {}
157
  vginfo = _GetVGInfo(vgname)
158
  outputarray['vg_size'] = vginfo['vg_size']
159
  outputarray['vg_free'] = vginfo['vg_free']
160

    
161
  hyper = hypervisor.GetHypervisor()
162
  hyp_info = hyper.GetNodeInfo()
163
  if hyp_info is not None:
164
    outputarray.update(hyp_info)
165

    
166
  f = open("/proc/sys/kernel/random/boot_id", 'r')
167
  try:
168
    outputarray["bootid"] = f.read(128).rstrip("\n")
169
  finally:
170
    f.close()
171

    
172
  return outputarray
173

    
174

    
175
def VerifyNode(what):
176
  """Verify the status of the local node.
177

178
  Args:
179
    what - a dictionary of things to check:
180
      'filelist' : list of files for which to compute checksums
181
      'nodelist' : list of nodes we should check communication with
182
      'hypervisor': run the hypervisor-specific verify
183

184
  Requested files on local node are checksummed and the result returned.
185

186
  The nodelist is traversed, with the following checks being made
187
  for each node:
188
  - known_hosts key correct
189
  - correct resolving of node name (target node returns its own hostname
190
    by ssh-execution of 'hostname', result compared against name in list.
191

192
  """
193
  result = {}
194

    
195
  if 'hypervisor' in what:
196
    result['hypervisor'] = hypervisor.GetHypervisor().Verify()
197

    
198
  if 'filelist' in what:
199
    result['filelist'] = utils.FingerprintFiles(what['filelist'])
200

    
201
  if 'nodelist' in what:
202
    result['nodelist'] = {}
203
    for node in what['nodelist']:
204
      success, message = ssh.VerifyNodeHostname(node)
205
      if not success:
206
        result['nodelist'][node] = message
207
  return result
208

    
209

    
210
def GetVolumeList(vg_name):
211
  """Compute list of logical volumes and their size.
212

213
  Returns:
214
    dictionary of all partions (key) with their size:
215
    test1: 20.06MiB
216

217
  """
218
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m",
219
                         "-oname,size", vg_name])
220
  if result.failed:
221
    logger.Error("Failed to list logical volumes, lvs output: %s" %
222
                 result.output)
223
    return {}
224

    
225
  lvlist = [line.split() for line in result.output.splitlines()]
226
  return dict(lvlist)
227

    
228

    
229
def ListVolumeGroups():
230
  """List the volume groups and their size.
231

232
  Returns:
233
    Dictionary with keys volume name and values the size of the volume
234

235
  """
236
  return utils.ListVolumeGroups()
237

    
238

    
239
def NodeVolumes():
240
  """List all volumes on this node.
241

242
  """
243
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
244
                         "--separator=|",
245
                         "--options=lv_name,lv_size,devices,vg_name"])
246
  if result.failed:
247
    logger.Error("Failed to list logical volumes, lvs output: %s" %
248
                 result.output)
249
    return {}
250

    
251
  def parse_dev(dev):
252
    if '(' in dev:
253
      return dev.split('(')[0]
254
    else:
255
      return dev
256

    
257
  def map_line(line):
258
    return {
259
      'name': line[0].strip(),
260
      'size': line[1].strip(),
261
      'dev': parse_dev(line[2].strip()),
262
      'vg': line[3].strip(),
263
    }
264

    
265
  return [map_line(line.split('|')) for line in result.output.splitlines()]
266

    
267

    
268
def BridgesExist(bridges_list):
269
  """Check if a list of bridges exist on the current node.
270

271
  Returns:
272
    True if all of them exist, false otherwise
273

274
  """
275
  for bridge in bridges_list:
276
    if not utils.BridgeExists(bridge):
277
      return False
278

    
279
  return True
280

    
281

    
282
def GetInstanceList():
283
  """Provides a list of instances.
284

285
  Returns:
286
    A list of all running instances on the current node
287
    - instance1.example.com
288
    - instance2.example.com
289

290
  """
291
  try:
292
    names = hypervisor.GetHypervisor().ListInstances()
293
  except errors.HypervisorError, err:
294
    logger.Error("error enumerating instances: %s" % str(err))
295
    raise
296

    
297
  return names
298

    
299

    
300
def GetInstanceInfo(instance):
301
  """Gives back the informations about an instance as a dictionary.
302

303
  Args:
304
    instance: name of the instance (ex. instance1.example.com)
305

306
  Returns:
307
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
308
    where
309
    memory: memory size of instance (int)
310
    state: xen state of instance (string)
311
    time: cpu time of instance (float)
312

313
  """
314
  output = {}
315

    
316
  iinfo = hypervisor.GetHypervisor().GetInstanceInfo(instance)
317
  if iinfo is not None:
318
    output['memory'] = iinfo[2]
319
    output['state'] = iinfo[4]
320
    output['time'] = iinfo[5]
321

    
322
  return output
323

    
324

    
325
def GetAllInstancesInfo():
326
  """Gather data about all instances.
327

328
  This is the equivalent of `GetInstanceInfo()`, except that it
329
  computes data for all instances at once, thus being faster if one
330
  needs data about more than one instance.
331

332
  Returns: a dictionary of dictionaries, keys being the instance name,
333
    and with values:
334
    { 'memory' : 511, 'state' : '-b---', 'time' : 3188.8, }
335
    where
336
    memory: memory size of instance (int)
337
    state: xen state of instance (string)
338
    time: cpu time of instance (float)
339
    vcpus: the number of cpus
340

341
  """
342
  output = {}
343

    
344
  iinfo = hypervisor.GetHypervisor().GetAllInstancesInfo()
345
  if iinfo:
346
    for name, inst_id, memory, vcpus, state, times in iinfo:
347
      output[name] = {
348
        'memory': memory,
349
        'vcpus': vcpus,
350
        'state': state,
351
        'time': times,
352
        }
353

    
354
  return output
355

    
356

    
357
def AddOSToInstance(instance, os_disk, swap_disk):
358
  """Add an OS to an instance.
359

360
  Args:
361
    instance: the instance object
362
    os_disk: the instance-visible name of the os device
363
    swap_disk: the instance-visible name of the swap device
364

365
  """
366
  inst_os = OSFromDisk(instance.os)
367

    
368
  create_script = inst_os.create_script
369

    
370
  os_device = instance.FindDisk(os_disk)
371
  if os_device is None:
372
    logger.Error("Can't find this device-visible name '%s'" % os_disk)
373
    return False
374

    
375
  swap_device = instance.FindDisk(swap_disk)
376
  if swap_device is None:
377
    logger.Error("Can't find this device-visible name '%s'" % swap_disk)
378
    return False
379

    
380
  real_os_dev = _RecursiveFindBD(os_device)
381
  if real_os_dev is None:
382
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
383
                                  str(os_device))
384
  real_os_dev.Open()
385

    
386
  real_swap_dev = _RecursiveFindBD(swap_device)
387
  if real_swap_dev is None:
388
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
389
                                  str(swap_device))
390
  real_swap_dev.Open()
391

    
392
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
393
                                     instance.name, int(time.time()))
394
  if not os.path.exists(constants.LOG_OS_DIR):
395
    os.mkdir(constants.LOG_OS_DIR, 0750)
396

    
397
  command = utils.BuildShellCmd("cd %s && %s -i %s -b %s -s %s &>%s",
398
                                inst_os.path, create_script, instance.name,
399
                                real_os_dev.dev_path, real_swap_dev.dev_path,
400
                                logfile)
401

    
402
  result = utils.RunCmd(command)
403

    
404
  if result.failed:
405
    logger.Error("os create command '%s' returned error: %s"
406
                 " output: %s" %
407
                 (command, result.fail_reason, result.output))
408
    return False
409

    
410
  return True
411

    
412

    
413
def RunRenameInstance(instance, old_name, os_disk, swap_disk):
414
  """Run the OS rename script for an instance.
415

416
  Args:
417
    instance: the instance object
418
    old_name: the old name of the instance
419
    os_disk: the instance-visible name of the os device
420
    swap_disk: the instance-visible name of the swap device
421

422
  """
423
  inst_os = OSFromDisk(instance.os)
424

    
425
  script = inst_os.rename_script
426

    
427
  os_device = instance.FindDisk(os_disk)
428
  if os_device is None:
429
    logger.Error("Can't find this device-visible name '%s'" % os_disk)
430
    return False
431

    
432
  swap_device = instance.FindDisk(swap_disk)
433
  if swap_device is None:
434
    logger.Error("Can't find this device-visible name '%s'" % swap_disk)
435
    return False
436

    
437
  real_os_dev = _RecursiveFindBD(os_device)
438
  if real_os_dev is None:
439
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
440
                                  str(os_device))
441
  real_os_dev.Open()
442

    
443
  real_swap_dev = _RecursiveFindBD(swap_device)
444
  if real_swap_dev is None:
445
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
446
                                  str(swap_device))
447
  real_swap_dev.Open()
448

    
449
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
450
                                           old_name,
451
                                           instance.name, int(time.time()))
452
  if not os.path.exists(constants.LOG_OS_DIR):
453
    os.mkdir(constants.LOG_OS_DIR, 0750)
454

    
455
  command = utils.BuildShellCmd("cd %s && %s -o %s -n %s -b %s -s %s &>%s",
456
                                inst_os.path, script, old_name, instance.name,
457
                                real_os_dev.dev_path, real_swap_dev.dev_path,
458
                                logfile)
459

    
460
  result = utils.RunCmd(command)
461

    
462
  if result.failed:
463
    logger.Error("os create command '%s' returned error: %s"
464
                 " output: %s" %
465
                 (command, result.fail_reason, result.output))
466
    return False
467

    
468
  return True
469

    
470

    
471
def _GetVGInfo(vg_name):
472
  """Get informations about the volume group.
473

474
  Args:
475
    vg_name: the volume group
476

477
  Returns:
478
    { 'vg_size' : xxx, 'vg_free' : xxx, 'pv_count' : xxx }
479
    where
480
    vg_size is the total size of the volume group in MiB
481
    vg_free is the free size of the volume group in MiB
482
    pv_count are the number of physical disks in that vg
483

484
  """
485
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
486
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
487

    
488
  if retval.failed:
489
    errmsg = "volume group %s not present" % vg_name
490
    logger.Error(errmsg)
491
    raise errors.LVMError(errmsg)
492
  valarr = retval.stdout.strip().split(':')
493
  retdic = {
494
    "vg_size": int(round(float(valarr[0]), 0)),
495
    "vg_free": int(round(float(valarr[1]), 0)),
496
    "pv_count": int(valarr[2]),
497
    }
498
  return retdic
499

    
500

    
501
def _GatherBlockDevs(instance):
502
  """Set up an instance's block device(s).
503

504
  This is run on the primary node at instance startup. The block
505
  devices must be already assembled.
506

507
  """
508
  block_devices = []
509
  for disk in instance.disks:
510
    device = _RecursiveFindBD(disk)
511
    if device is None:
512
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
513
                                    str(disk))
514
    device.Open()
515
    block_devices.append((disk, device))
516
  return block_devices
517

    
518

    
519
def StartInstance(instance, extra_args):
520
  """Start an instance.
521

522
  Args:
523
    instance - name of instance to start.
524

525
  """
526
  running_instances = GetInstanceList()
527

    
528
  if instance.name in running_instances:
529
    return True
530

    
531
  block_devices = _GatherBlockDevs(instance)
532
  hyper = hypervisor.GetHypervisor()
533

    
534
  try:
535
    hyper.StartInstance(instance, block_devices, extra_args)
536
  except errors.HypervisorError, err:
537
    logger.Error("Failed to start instance: %s" % err)
538
    return False
539

    
540
  return True
541

    
542

    
543
def ShutdownInstance(instance):
544
  """Shut an instance down.
545

546
  Args:
547
    instance - name of instance to shutdown.
548

549
  """
550
  running_instances = GetInstanceList()
551

    
552
  if instance.name not in running_instances:
553
    return True
554

    
555
  hyper = hypervisor.GetHypervisor()
556
  try:
557
    hyper.StopInstance(instance)
558
  except errors.HypervisorError, err:
559
    logger.Error("Failed to stop instance: %s" % err)
560
    return False
561

    
562
  # test every 10secs for 2min
563
  shutdown_ok = False
564

    
565
  time.sleep(1)
566
  for dummy in range(11):
567
    if instance.name not in GetInstanceList():
568
      break
569
    time.sleep(10)
570
  else:
571
    # the shutdown did not succeed
572
    logger.Error("shutdown of '%s' unsuccessful, using destroy" % instance)
573

    
574
    try:
575
      hyper.StopInstance(instance, force=True)
576
    except errors.HypervisorError, err:
577
      logger.Error("Failed to stop instance: %s" % err)
578
      return False
579

    
580
    time.sleep(1)
581
    if instance.name in GetInstanceList():
582
      logger.Error("could not shutdown instance '%s' even by destroy")
583
      return False
584

    
585
  return True
586

    
587

    
588
def CreateBlockDevice(disk, size, on_primary, info):
589
  """Creates a block device for an instance.
590

591
  Args:
592
   bdev: a ganeti.objects.Disk object
593
   size: the size of the physical underlying devices
594
   do_open: if the device should be `Assemble()`-d and
595
            `Open()`-ed after creation
596

597
  Returns:
598
    the new unique_id of the device (this can sometime be
599
    computed only after creation), or None. On secondary nodes,
600
    it's not required to return anything.
601

602
  """
603
  clist = []
604
  if disk.children:
605
    for child in disk.children:
606
      crdev = _RecursiveAssembleBD(child, on_primary)
607
      if on_primary or disk.AssembleOnSecondary():
608
        # we need the children open in case the device itself has to
609
        # be assembled
610
        crdev.Open()
611
      else:
612
        crdev.Close()
613
      clist.append(crdev)
614
  try:
615
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
616
    if device is not None:
617
      logger.Info("removing existing device %s" % disk)
618
      device.Remove()
619
  except errors.BlockDeviceError, err:
620
    pass
621

    
622
  device = bdev.Create(disk.dev_type, disk.physical_id,
623
                       clist, size)
624
  if device is None:
625
    raise ValueError("Can't create child device for %s, %s" %
626
                     (disk, size))
627
  if on_primary or disk.AssembleOnSecondary():
628
    device.Assemble()
629
    device.SetSyncSpeed(constants.SYNC_SPEED)
630
    if on_primary or disk.OpenOnSecondary():
631
      device.Open(force=True)
632

    
633
  device.SetInfo(info)
634

    
635
  physical_id = device.unique_id
636
  return physical_id
637

    
638

    
639
def RemoveBlockDevice(disk):
640
  """Remove a block device.
641

642
  This is intended to be called recursively.
643

644
  """
645
  try:
646
    # since we are removing the device, allow a partial match
647
    # this allows removal of broken mirrors
648
    rdev = _RecursiveFindBD(disk, allow_partial=True)
649
  except errors.BlockDeviceError, err:
650
    # probably can't attach
651
    logger.Info("Can't attach to device %s in remove" % disk)
652
    rdev = None
653
  if rdev is not None:
654
    result = rdev.Remove()
655
  else:
656
    result = True
657
  if disk.children:
658
    for child in disk.children:
659
      result = result and RemoveBlockDevice(child)
660
  return result
661

    
662

    
663
def _RecursiveAssembleBD(disk, as_primary):
664
  """Activate a block device for an instance.
665

666
  This is run on the primary and secondary nodes for an instance.
667

668
  This function is called recursively.
669

670
  Args:
671
    disk: a objects.Disk object
672
    as_primary: if we should make the block device read/write
673

674
  Returns:
675
    the assembled device or None (in case no device was assembled)
676

677
  If the assembly is not successful, an exception is raised.
678

679
  """
680
  children = []
681
  if disk.children:
682
    for chld_disk in disk.children:
683
      children.append(_RecursiveAssembleBD(chld_disk, as_primary))
684

    
685
  if as_primary or disk.AssembleOnSecondary():
686
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
687
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
688
    result = r_dev
689
    if as_primary or disk.OpenOnSecondary():
690
      r_dev.Open()
691
    else:
692
      r_dev.Close()
693
  else:
694
    result = True
695
  return result
696

    
697

    
698
def AssembleBlockDevice(disk, as_primary):
699
  """Activate a block device for an instance.
700

701
  This is a wrapper over _RecursiveAssembleBD.
702

703
  Returns:
704
    a /dev path for primary nodes
705
    True for secondary nodes
706

707
  """
708
  result = _RecursiveAssembleBD(disk, as_primary)
709
  if isinstance(result, bdev.BlockDev):
710
    result = result.dev_path
711
  return result
712

    
713

    
714
def ShutdownBlockDevice(disk):
715
  """Shut down a block device.
716

717
  First, if the device is assembled (can `Attach()`), then the device
718
  is shutdown. Then the children of the device are shutdown.
719

720
  This function is called recursively. Note that we don't cache the
721
  children or such, as oppossed to assemble, shutdown of different
722
  devices doesn't require that the upper device was active.
723

724
  """
725
  r_dev = _RecursiveFindBD(disk)
726
  if r_dev is not None:
727
    result = r_dev.Shutdown()
728
  else:
729
    result = True
730
  if disk.children:
731
    for child in disk.children:
732
      result = result and ShutdownBlockDevice(child)
733
  return result
734

    
735

    
736
def MirrorAddChild(md_cdev, new_cdev):
737
  """Extend an MD raid1 array.
738

739
  """
740
  md_bdev = _RecursiveFindBD(md_cdev, allow_partial=True)
741
  if md_bdev is None:
742
    logger.Error("Can't find md device")
743
    return False
744
  new_bdev = _RecursiveFindBD(new_cdev)
745
  if new_bdev is None:
746
    logger.Error("Can't find new device to add")
747
    return False
748
  new_bdev.Open()
749
  md_bdev.AddChild(new_bdev)
750
  return True
751

    
752

    
753
def MirrorRemoveChild(md_cdev, new_cdev):
754
  """Reduce an MD raid1 array.
755

756
  """
757
  md_bdev = _RecursiveFindBD(md_cdev)
758
  if md_bdev is None:
759
    return False
760
  new_bdev = _RecursiveFindBD(new_cdev)
761
  if new_bdev is None:
762
    return False
763
  new_bdev.Open()
764
  md_bdev.RemoveChild(new_bdev.dev_path)
765
  return True
766

    
767

    
768
def GetMirrorStatus(disks):
769
  """Get the mirroring status of a list of devices.
770

771
  Args:
772
    disks: list of `objects.Disk`
773

774
  Returns:
775
    list of (mirror_done, estimated_time) tuples, which
776
    are the result of bdev.BlockDevice.CombinedSyncStatus()
777

778
  """
779
  stats = []
780
  for dsk in disks:
781
    rbd = _RecursiveFindBD(dsk)
782
    if rbd is None:
783
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
784
    stats.append(rbd.CombinedSyncStatus())
785
  return stats
786

    
787

    
788
def _RecursiveFindBD(disk, allow_partial=False):
789
  """Check if a device is activated.
790

791
  If so, return informations about the real device.
792

793
  Args:
794
    disk: the objects.Disk instance
795
    allow_partial: don't abort the find if a child of the
796
                   device can't be found; this is intended to be
797
                   used when repairing mirrors
798

799
  Returns:
800
    None if the device can't be found
801
    otherwise the device instance
802

803
  """
804
  children = []
805
  if disk.children:
806
    for chdisk in disk.children:
807
      children.append(_RecursiveFindBD(chdisk))
808

    
809
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
810

    
811

    
812
def FindBlockDevice(disk):
813
  """Check if a device is activated.
814

815
  If so, return informations about the real device.
816

817
  Args:
818
    disk: the objects.Disk instance
819
  Returns:
820
    None if the device can't be found
821
    (device_path, major, minor, sync_percent, estimated_time, is_degraded)
822

823
  """
824
  rbd = _RecursiveFindBD(disk)
825
  if rbd is None:
826
    return rbd
827
  sync_p, est_t, is_degr = rbd.GetSyncStatus()
828
  return rbd.dev_path, rbd.major, rbd.minor, sync_p, est_t, is_degr
829

    
830

    
831
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
832
  """Write a file to the filesystem.
833

834
  This allows the master to overwrite(!) a file. It will only perform
835
  the operation if the file belongs to a list of configuration files.
836

837
  """
838
  if not os.path.isabs(file_name):
839
    logger.Error("Filename passed to UploadFile is not absolute: '%s'" %
840
                 file_name)
841
    return False
842

    
843
  allowed_files = [constants.CLUSTER_CONF_FILE, "/etc/hosts",
844
                   constants.SSH_KNOWN_HOSTS_FILE]
845
  allowed_files.extend(ssconf.SimpleStore().GetFileList())
846
  if file_name not in allowed_files:
847
    logger.Error("Filename passed to UploadFile not in allowed"
848
                 " upload targets: '%s'" % file_name)
849
    return False
850

    
851
  dir_name, small_name = os.path.split(file_name)
852
  fd, new_name = tempfile.mkstemp('.new', small_name, dir_name)
853
  # here we need to make sure we remove the temp file, if any error
854
  # leaves it in place
855
  try:
856
    os.chown(new_name, uid, gid)
857
    os.chmod(new_name, mode)
858
    os.write(fd, data)
859
    os.fsync(fd)
860
    os.utime(new_name, (atime, mtime))
861
    os.rename(new_name, file_name)
862
  finally:
863
    os.close(fd)
864
    utils.RemoveFile(new_name)
865
  return True
866

    
867

    
868
def _ErrnoOrStr(err):
869
  """Format an EnvironmentError exception.
870

871
  If the `err` argument has an errno attribute, it will be looked up
872
  and converted into a textual EXXXX description. Otherwise the string
873
  representation of the error will be returned.
874

875
  """
876
  if hasattr(err, 'errno'):
877
    detail = errno.errorcode[err.errno]
878
  else:
879
    detail = str(err)
880
  return detail
881

    
882
def _OSSearch(name, search_path=None):
883
  """Search for OSes with the given name in the search_path.
884

885
  Args:
886
    name: The name of the OS to look for
887
    search_path: List of dirs to search (defaults to constants.OS_SEARCH_PATH)
888

889
  Returns:
890
    The base_dir the OS resides in
891

892
  """
893

    
894
  if search_path is None:
895
    search_path = constants.OS_SEARCH_PATH
896

    
897
  for dir in search_path:
898
    t_os_dir = os.path.sep.join([dir, name])
899
    if os.path.isdir(t_os_dir):
900
        return dir
901

    
902
  return None
903

    
904
def _OSOndiskVersion(name, os_dir):
905
  """Compute and return the API version of a given OS.
906

907
  This function will try to read the API version of the os given by
908
  the 'name' parameter and residing in the 'os_dir' directory.
909

910
  Return value will be either an integer denoting the version or None in the
911
  case when this is not a valid OS name.
912

913
  """
914

    
915
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
916

    
917
  try:
918
    st = os.stat(api_file)
919
  except EnvironmentError, err:
920
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
921
                           " found (%s)" % _ErrnoOrStr(err))
922

    
923
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
924
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
925
                           " a regular file")
926

    
927
  try:
928
    f = open(api_file)
929
    try:
930
      api_version = f.read(256)
931
    finally:
932
      f.close()
933
  except EnvironmentError, err:
934
    raise errors.InvalidOS(name, os_dir, "error while reading the"
935
                           " API version (%s)" % _ErrnoOrStr(err))
936

    
937
  api_version = api_version.strip()
938
  try:
939
    api_version = int(api_version)
940
  except (TypeError, ValueError), err:
941
    raise errors.InvalidOS(name, os_dir,
942
                           "API version is not integer (%s)" % str(err))
943

    
944
  return api_version
945

    
946

    
947
def DiagnoseOS(top_dirs=None):
948
  """Compute the validity for all OSes.
949

950
  For each name in all the given top directories (if not given defaults i
951
  to constants.OS_SEARCH_PATH it will return an object. If this is a valid
952
  os, the object will be an instance of the object.OS class. If not,
953
  it will be an instance of errors.InvalidOS and this signifies that
954
  this name does not correspond to a valid OS.
955

956
  Returns:
957
    list of objects
958

959
  """
960
  if top_dirs is None:
961
    top_dirs = constants.OS_SEARCH_PATH
962

    
963
  result = []
964
  for dir in top_dirs:
965
    if os.path.isdir(dir):
966
      try:
967
        f_names = utils.ListVisibleFiles(dir)
968
      except EnvironmentError, err:
969
        logger.Error("Can't list the OS directory %s: %s" % (dir,str(err)))
970
        break
971
      for name in f_names:
972
        try:
973
          os_inst = OSFromDisk(name, base_dir=dir)
974
          result.append(os_inst)
975
        except errors.InvalidOS, err:
976
          result.append(err)
977

    
978
  return result
979

    
980

    
981
def OSFromDisk(name, base_dir=None):
982
  """Create an OS instance from disk.
983

984
  This function will return an OS instance if the given name is a
985
  valid OS name. Otherwise, it will raise an appropriate
986
  `errors.InvalidOS` exception, detailing why this is not a valid
987
  OS.
988

989
  Args:
990
    os_dir: Directory containing the OS scripts. Defaults to a search
991
            in all the OS_SEARCH_PATH directories.
992

993
  """
994

    
995
  if base_dir is None:
996
    base_dir = _OSSearch(name)
997

    
998
  if base_dir is None:
999
    raise errors.InvalidOS(name, None, "OS dir not found in search path")
1000

    
1001
  os_dir = os.path.sep.join([base_dir, name])
1002
  api_version = _OSOndiskVersion(name, os_dir)
1003

    
1004
  if api_version != constants.OS_API_VERSION:
1005
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1006
                           " (found %s want %s)"
1007
                           % (api_version, constants.OS_API_VERSION))
1008

    
1009
  # OS Scripts dictionary, we will populate it with the actual script names
1010
  os_scripts = {'create': '', 'export': '', 'import': '', 'rename': ''}
1011

    
1012
  for script in os_scripts:
1013
    os_scripts[script] = os.path.sep.join([os_dir, script])
1014

    
1015
    try:
1016
      st = os.stat(os_scripts[script])
1017
    except EnvironmentError, err:
1018
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1019
                             (script, _ErrnoOrStr(err)))
1020

    
1021
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1022
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1023
                             script)
1024

    
1025
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1026
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1027
                             script)
1028

    
1029

    
1030
  return objects.OS(name=name, path=os_dir,
1031
                    create_script=os_scripts['create'],
1032
                    export_script=os_scripts['export'],
1033
                    import_script=os_scripts['import'],
1034
                    rename_script=os_scripts['rename'],
1035
                    api_version=api_version)
1036

    
1037

    
1038
def SnapshotBlockDevice(disk):
1039
  """Create a snapshot copy of a block device.
1040

1041
  This function is called recursively, and the snapshot is actually created
1042
  just for the leaf lvm backend device.
1043

1044
  Args:
1045
    disk: the disk to be snapshotted
1046

1047
  Returns:
1048
    a config entry for the actual lvm device snapshotted.
1049

1050
  """
1051
  if disk.children:
1052
    if len(disk.children) == 1:
1053
      # only one child, let's recurse on it
1054
      return SnapshotBlockDevice(disk.children[0])
1055
    else:
1056
      # more than one child, choose one that matches
1057
      for child in disk.children:
1058
        if child.size == disk.size:
1059
          # return implies breaking the loop
1060
          return SnapshotBlockDevice(child)
1061
  elif disk.dev_type == "lvm":
1062
    r_dev = _RecursiveFindBD(disk)
1063
    if r_dev is not None:
1064
      # let's stay on the safe side and ask for the full size, for now
1065
      return r_dev.Snapshot(disk.size)
1066
    else:
1067
      return None
1068
  else:
1069
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1070
                                 "'%s' of type '%s'" %
1071
                                 (disk.unique_id, disk.dev_type))
1072

    
1073

    
1074
def ExportSnapshot(disk, dest_node, instance):
1075
  """Export a block device snapshot to a remote node.
1076

1077
  Args:
1078
    disk: the snapshot block device
1079
    dest_node: the node to send the image to
1080
    instance: instance being exported
1081

1082
  Returns:
1083
    True if successful, False otherwise.
1084

1085
  """
1086
  inst_os = OSFromDisk(instance.os)
1087
  export_script = inst_os.export_script
1088

    
1089
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1090
                                     instance.name, int(time.time()))
1091
  if not os.path.exists(constants.LOG_OS_DIR):
1092
    os.mkdir(constants.LOG_OS_DIR, 0750)
1093

    
1094
  real_os_dev = _RecursiveFindBD(disk)
1095
  if real_os_dev is None:
1096
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1097
                                  str(disk))
1098
  real_os_dev.Open()
1099

    
1100
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1101
  destfile = disk.physical_id[1]
1102

    
1103
  # the target command is built out of three individual commands,
1104
  # which are joined by pipes; we check each individual command for
1105
  # valid parameters
1106

    
1107
  expcmd = utils.BuildShellCmd("cd %s; %s -i %s -b %s 2>%s", inst_os.path,
1108
                               export_script, instance.name,
1109
                               real_os_dev.dev_path, logfile)
1110

    
1111
  comprcmd = "gzip"
1112

    
1113
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1114
                                destdir, destdir, destfile)
1115
  remotecmd = ssh.BuildSSHCmd(dest_node, constants.GANETI_RUNAS, destcmd)
1116

    
1117

    
1118

    
1119
  # all commands have been checked, so we're safe to combine them
1120
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1121

    
1122
  result = utils.RunCmd(command)
1123

    
1124
  if result.failed:
1125
    logger.Error("os snapshot export command '%s' returned error: %s"
1126
                 " output: %s" %
1127
                 (command, result.fail_reason, result.output))
1128
    return False
1129

    
1130
  return True
1131

    
1132

    
1133
def FinalizeExport(instance, snap_disks):
1134
  """Write out the export configuration information.
1135

1136
  Args:
1137
    instance: instance configuration
1138
    snap_disks: snapshot block devices
1139

1140
  Returns:
1141
    False in case of error, True otherwise.
1142

1143
  """
1144
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1145
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1146

    
1147
  config = objects.SerializableConfigParser()
1148

    
1149
  config.add_section(constants.INISECT_EXP)
1150
  config.set(constants.INISECT_EXP, 'version', '0')
1151
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1152
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1153
  config.set(constants.INISECT_EXP, 'os', instance.os)
1154
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1155

    
1156
  config.add_section(constants.INISECT_INS)
1157
  config.set(constants.INISECT_INS, 'name', instance.name)
1158
  config.set(constants.INISECT_INS, 'memory', '%d' % instance.memory)
1159
  config.set(constants.INISECT_INS, 'vcpus', '%d' % instance.vcpus)
1160
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1161
  for nic_count, nic in enumerate(instance.nics):
1162
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1163
               nic_count, '%s' % nic.mac)
1164
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1165
  # TODO: redundant: on load can read nics until it doesn't exist
1166
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1167

    
1168
  for disk_count, disk in enumerate(snap_disks):
1169
    config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1170
               ('%s' % disk.iv_name))
1171
    config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1172
               ('%s' % disk.physical_id[1]))
1173
    config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1174
               ('%d' % disk.size))
1175
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_count)
1176

    
1177
  cff = os.path.join(destdir, constants.EXPORT_CONF_FILE)
1178
  cfo = open(cff, 'w')
1179
  try:
1180
    config.write(cfo)
1181
  finally:
1182
    cfo.close()
1183

    
1184
  shutil.rmtree(finaldestdir, True)
1185
  shutil.move(destdir, finaldestdir)
1186

    
1187
  return True
1188

    
1189

    
1190
def ExportInfo(dest):
1191
  """Get export configuration information.
1192

1193
  Args:
1194
    dest: directory containing the export
1195

1196
  Returns:
1197
    A serializable config file containing the export info.
1198

1199
  """
1200
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1201

    
1202
  config = objects.SerializableConfigParser()
1203
  config.read(cff)
1204

    
1205
  if (not config.has_section(constants.INISECT_EXP) or
1206
      not config.has_section(constants.INISECT_INS)):
1207
    return None
1208

    
1209
  return config
1210

    
1211

    
1212
def ImportOSIntoInstance(instance, os_disk, swap_disk, src_node, src_image):
1213
  """Import an os image into an instance.
1214

1215
  Args:
1216
    instance: the instance object
1217
    os_disk: the instance-visible name of the os device
1218
    swap_disk: the instance-visible name of the swap device
1219
    src_node: node holding the source image
1220
    src_image: path to the source image on src_node
1221

1222
  Returns:
1223
    False in case of error, True otherwise.
1224

1225
  """
1226
  inst_os = OSFromDisk(instance.os)
1227
  import_script = inst_os.import_script
1228

    
1229
  os_device = instance.FindDisk(os_disk)
1230
  if os_device is None:
1231
    logger.Error("Can't find this device-visible name '%s'" % os_disk)
1232
    return False
1233

    
1234
  swap_device = instance.FindDisk(swap_disk)
1235
  if swap_device is None:
1236
    logger.Error("Can't find this device-visible name '%s'" % swap_disk)
1237
    return False
1238

    
1239
  real_os_dev = _RecursiveFindBD(os_device)
1240
  if real_os_dev is None:
1241
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1242
                                  str(os_device))
1243
  real_os_dev.Open()
1244

    
1245
  real_swap_dev = _RecursiveFindBD(swap_device)
1246
  if real_swap_dev is None:
1247
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1248
                                  str(swap_device))
1249
  real_swap_dev.Open()
1250

    
1251
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1252
                                        instance.name, int(time.time()))
1253
  if not os.path.exists(constants.LOG_OS_DIR):
1254
    os.mkdir(constants.LOG_OS_DIR, 0750)
1255

    
1256
  destcmd = utils.BuildShellCmd('cat %s', src_image)
1257
  remotecmd = ssh.BuildSSHCmd(src_node, constants.GANETI_RUNAS, destcmd)
1258

    
1259
  comprcmd = "gunzip"
1260
  impcmd = utils.BuildShellCmd("(cd %s; %s -i %s -b %s -s %s &>%s)",
1261
                               inst_os.path, import_script, instance.name,
1262
                               real_os_dev.dev_path, real_swap_dev.dev_path,
1263
                               logfile)
1264

    
1265
  command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1266

    
1267
  result = utils.RunCmd(command)
1268

    
1269
  if result.failed:
1270
    logger.Error("os import command '%s' returned error: %s"
1271
                 " output: %s" %
1272
                 (command, result.fail_reason, result.output))
1273
    return False
1274

    
1275
  return True
1276

    
1277

    
1278
def ListExports():
1279
  """Return a list of exports currently available on this machine.
1280

1281
  """
1282
  if os.path.isdir(constants.EXPORT_DIR):
1283
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1284
  else:
1285
    return []
1286

    
1287

    
1288
def RemoveExport(export):
1289
  """Remove an existing export from the node.
1290

1291
  Args:
1292
    export: the name of the export to remove
1293

1294
  Returns:
1295
    False in case of error, True otherwise.
1296

1297
  """
1298
  target = os.path.join(constants.EXPORT_DIR, export)
1299

    
1300
  shutil.rmtree(target)
1301
  # TODO: catch some of the relevant exceptions and provide a pretty
1302
  # error message if rmtree fails.
1303

    
1304
  return True
1305

    
1306

    
1307
class HooksRunner(object):
1308
  """Hook runner.
1309

1310
  This class is instantiated on the node side (ganeti-noded) and not on
1311
  the master side.
1312

1313
  """
1314
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
1315

    
1316
  def __init__(self, hooks_base_dir=None):
1317
    """Constructor for hooks runner.
1318

1319
    Args:
1320
      - hooks_base_dir: if not None, this overrides the
1321
        constants.HOOKS_BASE_DIR (useful for unittests)
1322
      - logs_base_dir: if not None, this overrides the
1323
        constants.LOG_HOOKS_DIR (useful for unittests)
1324
      - logging: enable or disable logging of script output
1325

1326
    """
1327
    if hooks_base_dir is None:
1328
      hooks_base_dir = constants.HOOKS_BASE_DIR
1329
    self._BASE_DIR = hooks_base_dir
1330

    
1331
  @staticmethod
1332
  def ExecHook(script, env):
1333
    """Exec one hook script.
1334

1335
    Args:
1336
     - phase: the phase
1337
     - script: the full path to the script
1338
     - env: the environment with which to exec the script
1339

1340
    """
1341
    # exec the process using subprocess and log the output
1342
    fdstdin = None
1343
    try:
1344
      fdstdin = open("/dev/null", "r")
1345
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
1346
                               stderr=subprocess.STDOUT, close_fds=True,
1347
                               shell=False, cwd="/",env=env)
1348
      output = ""
1349
      try:
1350
        output = child.stdout.read(4096)
1351
        child.stdout.close()
1352
      except EnvironmentError, err:
1353
        output += "Hook script error: %s" % str(err)
1354

    
1355
      while True:
1356
        try:
1357
          result = child.wait()
1358
          break
1359
        except EnvironmentError, err:
1360
          if err.errno == errno.EINTR:
1361
            continue
1362
          raise
1363
    finally:
1364
      # try not to leak fds
1365
      for fd in (fdstdin, ):
1366
        if fd is not None:
1367
          try:
1368
            fd.close()
1369
          except EnvironmentError, err:
1370
            # just log the error
1371
            #logger.Error("While closing fd %s: %s" % (fd, err))
1372
            pass
1373

    
1374
    return result == 0, output
1375

    
1376
  def RunHooks(self, hpath, phase, env):
1377
    """Run the scripts in the hooks directory.
1378

1379
    This method will not be usually overriden by child opcodes.
1380

1381
    """
1382
    if phase == constants.HOOKS_PHASE_PRE:
1383
      suffix = "pre"
1384
    elif phase == constants.HOOKS_PHASE_POST:
1385
      suffix = "post"
1386
    else:
1387
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
1388
    rr = []
1389

    
1390
    subdir = "%s-%s.d" % (hpath, suffix)
1391
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
1392
    try:
1393
      dir_contents = utils.ListVisibleFiles(dir_name)
1394
    except OSError, err:
1395
      # must log
1396
      return rr
1397

    
1398
    # we use the standard python sort order,
1399
    # so 00name is the recommended naming scheme
1400
    dir_contents.sort()
1401
    for relname in dir_contents:
1402
      fname = os.path.join(dir_name, relname)
1403
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
1404
          self.RE_MASK.match(relname) is not None):
1405
        rrval = constants.HKR_SKIP
1406
        output = ""
1407
      else:
1408
        result, output = self.ExecHook(fname, env)
1409
        if not result:
1410
          rrval = constants.HKR_FAIL
1411
        else:
1412
          rrval = constants.HKR_SUCCESS
1413
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
1414

    
1415
    return rr