Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ c41eea6e

History | View | Annotate | Download (69.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36
import zlib
37
import base64
38

    
39
from ganeti import errors
40
from ganeti import utils
41
from ganeti import ssh
42
from ganeti import hypervisor
43
from ganeti import constants
44
from ganeti import bdev
45
from ganeti import objects
46
from ganeti import ssconf
47

    
48

    
49
def _GetConfig():
50
  """Simple wrapper to return a SimpleStore.
51

52
  @rtype: L{ssconf.SimpleStore}
53
  @return: a SimpleStore instance
54

55
  """
56
  return ssconf.SimpleStore()
57

    
58

    
59
def _GetSshRunner(cluster_name):
60
  """Simple wrapper to return an SshRunner.
61

62
  @type cluster_name: str
63
  @param cluster_name: the cluster name, which is needed
64
      by the SshRunner constructor
65
  @rtype: L{ssh.SshRunner}
66
  @return: an SshRunner instance
67

68
  """
69
  return ssh.SshRunner(cluster_name)
70

    
71

    
72
def _Decompress(data):
73
  """Unpacks data compressed by the RPC client.
74

75
  @type data: list or tuple
76
  @param data: Data sent by RPC client
77
  @rtype: str
78
  @return: Decompressed data
79

80
  """
81
  assert isinstance(data, (list, tuple))
82
  assert len(data) == 2
83
  (encoding, content) = data
84
  if encoding == constants.RPC_ENCODING_NONE:
85
    return content
86
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87
    return zlib.decompress(base64.b64decode(content))
88
  else:
89
    raise AssertionError("Unknown data encoding")
90

    
91

    
92
def _CleanDirectory(path, exclude=[]):
93
  """Removes all regular files in a directory.
94

95
  @type path: str
96
  @param path: the directory to clean
97
  @type exclude: list
98
  @param exclude: list of files to be excluded, defaults
99
      to the empty list
100
  @rtype: None
101

102
  """
103
  if not os.path.isdir(path):
104
    return
105

    
106
  # Normalize excluded paths
107
  exclude = [os.path.normpath(i) for i in exclude]
108

    
109
  for rel_name in utils.ListVisibleFiles(path):
110
    full_name = os.path.normpath(os.path.join(path, rel_name))
111
    if full_name in exclude:
112
      continue
113
    if os.path.isfile(full_name) and not os.path.islink(full_name):
114
      utils.RemoveFile(full_name)
115

    
116

    
117
def JobQueuePurge():
118
  """Removes job queue files and archived jobs.
119

120
  @rtype: None
121

122
  """
123
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
124
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
125

    
126

    
127
def GetMasterInfo():
128
  """Returns master information.
129

130
  This is an utility function to compute master information, either
131
  for consumption here or from the node daemon.
132

133
  @rtype: tuple
134
  @return: (master_netdev, master_ip, master_name) if we have a good
135
      configuration, otherwise (None, None, None)
136

137
  """
138
  try:
139
    cfg = _GetConfig()
140
    master_netdev = cfg.GetMasterNetdev()
141
    master_ip = cfg.GetMasterIP()
142
    master_node = cfg.GetMasterNode()
143
  except errors.ConfigurationError, err:
144
    logging.exception("Cluster configuration incomplete")
145
    return (None, None, None)
146
  return (master_netdev, master_ip, master_node)
147

    
148

    
149
def StartMaster(start_daemons):
150
  """Activate local node as master node.
151

152
  The function will always try activate the IP address of the master
153
  (unless someone else has it). It will also start the master daemons,
154
  based on the start_daemons parameter.
155

156
  @type start_daemons: boolean
157
  @param start_daemons: whther to also start the master
158
      daemons (ganeti-masterd and ganeti-rapi)
159
  @rtype: None
160

161
  """
162
  ok = True
163
  master_netdev, master_ip, _ = GetMasterInfo()
164
  if not master_netdev:
165
    return False
166

    
167
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
168
    if utils.OwnIpAddress(master_ip):
169
      # we already have the ip:
170
      logging.debug("Already started")
171
    else:
172
      logging.error("Someone else has the master ip, not activating")
173
      ok = False
174
  else:
175
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
176
                           "dev", master_netdev, "label",
177
                           "%s:0" % master_netdev])
178
    if result.failed:
179
      logging.error("Can't activate master IP: %s", result.output)
180
      ok = False
181

    
182
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
183
                           "-s", master_ip, master_ip])
184
    # we'll ignore the exit code of arping
185

    
186
  # and now start the master and rapi daemons
187
  if start_daemons:
188
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
189
      result = utils.RunCmd([daemon])
190
      if result.failed:
191
        logging.error("Can't start daemon %s: %s", daemon, result.output)
192
        ok = False
193
  return ok
194

    
195

    
196
def StopMaster(stop_daemons):
197
  """Deactivate this node as master.
198

199
  The function will always try to deactivate the IP address of the
200
  master. It will also stop the master daemons depending on the
201
  stop_daemons parameter.
202

203
  @type stop_daemons: boolean
204
  @param stop_daemons: whether to also stop the master daemons
205
      (ganeti-masterd and ganeti-rapi)
206
  @rtype: None
207

208
  """
209
  master_netdev, master_ip, _ = GetMasterInfo()
210
  if not master_netdev:
211
    return False
212

    
213
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
214
                         "dev", master_netdev])
215
  if result.failed:
216
    logging.error("Can't remove the master IP, error: %s", result.output)
217
    # but otherwise ignore the failure
218

    
219
  if stop_daemons:
220
    # stop/kill the rapi and the master daemon
221
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
222
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
223

    
224
  return True
225

    
226

    
227
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
228
  """Joins this node to the cluster.
229

230
  This does the following:
231
      - updates the hostkeys of the machine (rsa and dsa)
232
      - adds the ssh private key to the user
233
      - adds the ssh public key to the users' authorized_keys file
234

235
  @type dsa: str
236
  @param dsa: the DSA private key to write
237
  @type dsapub: str
238
  @param dsapub: the DSA public key to write
239
  @type rsa: str
240
  @param rsa: the RSA private key to write
241
  @type rsapub: str
242
  @param rsapub: the RSA public key to write
243
  @type sshkey: str
244
  @param sshkey: the SSH private key to write
245
  @type sshpub: str
246
  @param sshpub: the SSH public key to write
247
  @rtype: boolean
248
  @return: the success of the operation
249

250
  """
251
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
252
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
253
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
254
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
255
  for name, content, mode in sshd_keys:
256
    utils.WriteFile(name, data=content, mode=mode)
257

    
258
  try:
259
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
260
                                                    mkdir=True)
261
  except errors.OpExecError, err:
262
    logging.exception("Error while processing user ssh files")
263
    return False
264

    
265
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
266
    utils.WriteFile(name, data=content, mode=0600)
267

    
268
  utils.AddAuthorizedKey(auth_keys, sshpub)
269

    
270
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
271

    
272
  return True
273

    
274

    
275
def LeaveCluster():
276
  """Cleans up and remove the current node.
277

278
  This function cleans up and prepares the current node to be removed
279
  from the cluster.
280

281
  If processing is successful, then it raises an
282
  L{errors.QuitGanetiException} which is used as a special case to
283
  shutdown the node daemon.
284

285
  """
286
  _CleanDirectory(constants.DATA_DIR)
287
  JobQueuePurge()
288

    
289
  try:
290
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
291
  except errors.OpExecError:
292
    logging.exception("Error while processing ssh files")
293
    return
294

    
295
  f = open(pub_key, 'r')
296
  try:
297
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
298
  finally:
299
    f.close()
300

    
301
  utils.RemoveFile(priv_key)
302
  utils.RemoveFile(pub_key)
303

    
304
  # Return a reassuring string to the caller, and quit
305
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
306

    
307

    
308
def GetNodeInfo(vgname, hypervisor_type):
309
  """Gives back a hash with different informations about the node.
310

311
  @type vgname: C{string}
312
  @param vgname: the name of the volume group to ask for disk space information
313
  @type hypervisor_type: C{str}
314
  @param hypervisor_type: the name of the hypervisor to ask for
315
      memory information
316
  @rtype: C{dict}
317
  @return: dictionary with the following keys:
318
      - vg_size is the size of the configured volume group in MiB
319
      - vg_free is the free size of the volume group in MiB
320
      - memory_dom0 is the memory allocated for domain0 in MiB
321
      - memory_free is the currently available (free) ram in MiB
322
      - memory_total is the total number of ram in MiB
323

324
  """
325
  outputarray = {}
326
  vginfo = _GetVGInfo(vgname)
327
  outputarray['vg_size'] = vginfo['vg_size']
328
  outputarray['vg_free'] = vginfo['vg_free']
329

    
330
  hyper = hypervisor.GetHypervisor(hypervisor_type)
331
  hyp_info = hyper.GetNodeInfo()
332
  if hyp_info is not None:
333
    outputarray.update(hyp_info)
334

    
335
  f = open("/proc/sys/kernel/random/boot_id", 'r')
336
  try:
337
    outputarray["bootid"] = f.read(128).rstrip("\n")
338
  finally:
339
    f.close()
340

    
341
  return outputarray
342

    
343

    
344
def VerifyNode(what, cluster_name):
345
  """Verify the status of the local node.
346

347
  Based on the input L{what} parameter, various checks are done on the
348
  local node.
349

350
  If the I{filelist} key is present, this list of
351
  files is checksummed and the file/checksum pairs are returned.
352

353
  If the I{nodelist} key is present, we check that we have
354
  connectivity via ssh with the target nodes (and check the hostname
355
  report).
356

357
  If the I{node-net-test} key is present, we check that we have
358
  connectivity to the given nodes via both primary IP and, if
359
  applicable, secondary IPs.
360

361
  @type what: C{dict}
362
  @param what: a dictionary of things to check:
363
      - filelist: list of files for which to compute checksums
364
      - nodelist: list of nodes we should check ssh communication with
365
      - node-net-test: list of nodes we should check node daemon port
366
        connectivity with
367
      - hypervisor: list with hypervisors to run the verify for
368
  @rtype: dict
369
  @return: a dictionary with the same keys as the input dict, and
370
      values representing the result of the checks
371

372
  """
373
  result = {}
374

    
375
  if constants.NV_HYPERVISOR in what:
376
    result[constants.NV_HYPERVISOR] = tmp = {}
377
    for hv_name in what[constants.NV_HYPERVISOR]:
378
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
379

    
380
  if constants.NV_FILELIST in what:
381
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
382
      what[constants.NV_FILELIST])
383

    
384
  if constants.NV_NODELIST in what:
385
    result[constants.NV_NODELIST] = tmp = {}
386
    random.shuffle(what[constants.NV_NODELIST])
387
    for node in what[constants.NV_NODELIST]:
388
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
389
      if not success:
390
        tmp[node] = message
391

    
392
  if constants.NV_NODENETTEST in what:
393
    result[constants.NV_NODENETTEST] = tmp = {}
394
    my_name = utils.HostInfo().name
395
    my_pip = my_sip = None
396
    for name, pip, sip in what[constants.NV_NODENETTEST]:
397
      if name == my_name:
398
        my_pip = pip
399
        my_sip = sip
400
        break
401
    if not my_pip:
402
      tmp[my_name] = ("Can't find my own primary/secondary IP"
403
                      " in the node list")
404
    else:
405
      port = utils.GetNodeDaemonPort()
406
      for name, pip, sip in what[constants.NV_NODENETTEST]:
407
        fail = []
408
        if not utils.TcpPing(pip, port, source=my_pip):
409
          fail.append("primary")
410
        if sip != pip:
411
          if not utils.TcpPing(sip, port, source=my_sip):
412
            fail.append("secondary")
413
        if fail:
414
          tmp[name] = ("failure using the %s interface(s)" %
415
                       " and ".join(fail))
416

    
417
  if constants.NV_LVLIST in what:
418
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
419

    
420
  if constants.NV_INSTANCELIST in what:
421
    result[constants.NV_INSTANCELIST] = GetInstanceList(
422
      what[constants.NV_INSTANCELIST])
423

    
424
  if constants.NV_VGLIST in what:
425
    result[constants.NV_VGLIST] = ListVolumeGroups()
426

    
427
  if constants.NV_VERSION in what:
428
    result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
429

    
430
  if constants.NV_HVINFO in what:
431
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
432
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
433

    
434
  return result
435

    
436

    
437
def GetVolumeList(vg_name):
438
  """Compute list of logical volumes and their size.
439

440
  @type vg_name: str
441
  @param vg_name: the volume group whose LVs we should list
442
  @rtype: dict
443
  @return:
444
      dictionary of all partions (key) with value being a tuple of
445
      their size (in MiB), inactive and online status::
446

447
        {'test1': ('20.06', True, True)}
448

449
      in case of errors, a string is returned with the error
450
      details.
451

452
  """
453
  lvs = {}
454
  sep = '|'
455
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
456
                         "--separator=%s" % sep,
457
                         "-olv_name,lv_size,lv_attr", vg_name])
458
  if result.failed:
459
    logging.error("Failed to list logical volumes, lvs output: %s",
460
                  result.output)
461
    return result.output
462

    
463
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
464
  for line in result.stdout.splitlines():
465
    line = line.strip()
466
    match = valid_line_re.match(line)
467
    if not match:
468
      logging.error("Invalid line returned from lvs output: '%s'", line)
469
      continue
470
    name, size, attr = match.groups()
471
    inactive = attr[4] == '-'
472
    online = attr[5] == 'o'
473
    lvs[name] = (size, inactive, online)
474

    
475
  return lvs
476

    
477

    
478
def ListVolumeGroups():
479
  """List the volume groups and their size.
480

481
  @rtype: dict
482
  @return: dictionary with keys volume name and values the
483
      size of the volume
484

485
  """
486
  return utils.ListVolumeGroups()
487

    
488

    
489
def NodeVolumes():
490
  """List all volumes on this node.
491

492
  @rtype: list
493
  @return:
494
    A list of dictionaries, each having four keys:
495
      - name: the logical volume name,
496
      - size: the size of the logical volume
497
      - dev: the physical device on which the LV lives
498
      - vg: the volume group to which it belongs
499

500
    In case of errors, we return an empty list and log the
501
    error.
502

503
    Note that since a logical volume can live on multiple physical
504
    volumes, the resulting list might include a logical volume
505
    multiple times.
506

507
  """
508
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
509
                         "--separator=|",
510
                         "--options=lv_name,lv_size,devices,vg_name"])
511
  if result.failed:
512
    logging.error("Failed to list logical volumes, lvs output: %s",
513
                  result.output)
514
    return []
515

    
516
  def parse_dev(dev):
517
    if '(' in dev:
518
      return dev.split('(')[0]
519
    else:
520
      return dev
521

    
522
  def map_line(line):
523
    return {
524
      'name': line[0].strip(),
525
      'size': line[1].strip(),
526
      'dev': parse_dev(line[2].strip()),
527
      'vg': line[3].strip(),
528
    }
529

    
530
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
531
          if line.count('|') >= 3]
532

    
533

    
534
def BridgesExist(bridges_list):
535
  """Check if a list of bridges exist on the current node.
536

537
  @rtype: boolean
538
  @return: C{True} if all of them exist, C{False} otherwise
539

540
  """
541
  for bridge in bridges_list:
542
    if not utils.BridgeExists(bridge):
543
      return False
544

    
545
  return True
546

    
547

    
548
def GetInstanceList(hypervisor_list):
549
  """Provides a list of instances.
550

551
  @type hypervisor_list: list
552
  @param hypervisor_list: the list of hypervisors to query information
553

554
  @rtype: list
555
  @return: a list of all running instances on the current node
556
    - instance1.example.com
557
    - instance2.example.com
558

559
  """
560
  results = []
561
  for hname in hypervisor_list:
562
    try:
563
      names = hypervisor.GetHypervisor(hname).ListInstances()
564
      results.extend(names)
565
    except errors.HypervisorError, err:
566
      logging.exception("Error enumerating instances for hypevisor %s", hname)
567
      # FIXME: should we somehow not propagate this to the master?
568
      raise
569

    
570
  return results
571

    
572

    
573
def GetInstanceInfo(instance, hname):
574
  """Gives back the informations about an instance as a dictionary.
575

576
  @type instance: string
577
  @param instance: the instance name
578
  @type hname: string
579
  @param hname: the hypervisor type of the instance
580

581
  @rtype: dict
582
  @return: dictionary with the following keys:
583
      - memory: memory size of instance (int)
584
      - state: xen state of instance (string)
585
      - time: cpu time of instance (float)
586

587
  """
588
  output = {}
589

    
590
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
591
  if iinfo is not None:
592
    output['memory'] = iinfo[2]
593
    output['state'] = iinfo[4]
594
    output['time'] = iinfo[5]
595

    
596
  return output
597

    
598

    
599
def GetAllInstancesInfo(hypervisor_list):
600
  """Gather data about all instances.
601

602
  This is the equivalent of L{GetInstanceInfo}, except that it
603
  computes data for all instances at once, thus being faster if one
604
  needs data about more than one instance.
605

606
  @type hypervisor_list: list
607
  @param hypervisor_list: list of hypervisors to query for instance data
608

609
  @rtype: dict
610
  @return: dictionary of instance: data, with data having the following keys:
611
      - memory: memory size of instance (int)
612
      - state: xen state of instance (string)
613
      - time: cpu time of instance (float)
614
      - vcpus: the number of vcpus
615

616
  """
617
  output = {}
618

    
619
  for hname in hypervisor_list:
620
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
621
    if iinfo:
622
      for name, inst_id, memory, vcpus, state, times in iinfo:
623
        value = {
624
          'memory': memory,
625
          'vcpus': vcpus,
626
          'state': state,
627
          'time': times,
628
          }
629
        if name in output and output[name] != value:
630
          raise errors.HypervisorError("Instance %s running duplicate"
631
                                       " with different parameters" % name)
632
        output[name] = value
633

    
634
  return output
635

    
636

    
637
def AddOSToInstance(instance):
638
  """Add an OS to an instance.
639

640
  @type instance: L{objects.Instance}
641
  @param instance: Instance whose OS is to be installed
642
  @rtype: boolean
643
  @return: the success of the operation
644

645
  """
646
  inst_os = OSFromDisk(instance.os)
647

    
648
  create_env = OSEnvironment(instance)
649

    
650
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
651
                                     instance.name, int(time.time()))
652

    
653
  result = utils.RunCmd([inst_os.create_script], env=create_env,
654
                        cwd=inst_os.path, output=logfile,)
655
  if result.failed:
656
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
657
                  " output: %s", result.cmd, result.fail_reason, logfile,
658
                  result.output)
659
    return False
660

    
661
  return True
662

    
663

    
664
def RunRenameInstance(instance, old_name):
665
  """Run the OS rename script for an instance.
666

667
  @type instance: L{objects.Instance}
668
  @param instance: Instance whose OS is to be installed
669
  @type old_name: string
670
  @param old_name: previous instance name
671
  @rtype: boolean
672
  @return: the success of the operation
673

674
  """
675
  inst_os = OSFromDisk(instance.os)
676

    
677
  rename_env = OSEnvironment(instance)
678
  rename_env['OLD_INSTANCE_NAME'] = old_name
679

    
680
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
681
                                           old_name,
682
                                           instance.name, int(time.time()))
683

    
684
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
685
                        cwd=inst_os.path, output=logfile)
686

    
687
  if result.failed:
688
    logging.error("os create command '%s' returned error: %s output: %s",
689
                  result.cmd, result.fail_reason, result.output)
690
    return False
691

    
692
  return True
693

    
694

    
695
def _GetVGInfo(vg_name):
696
  """Get informations about the volume group.
697

698
  @type vg_name: str
699
  @param vg_name: the volume group which we query
700
  @rtype: dict
701
  @return:
702
    A dictionary with the following keys:
703
      - C{vg_size} is the total size of the volume group in MiB
704
      - C{vg_free} is the free size of the volume group in MiB
705
      - C{pv_count} are the number of physical disks in that VG
706

707
    If an error occurs during gathering of data, we return the same dict
708
    with keys all set to None.
709

710
  """
711
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
712

    
713
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
714
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
715

    
716
  if retval.failed:
717
    logging.error("volume group %s not present", vg_name)
718
    return retdic
719
  valarr = retval.stdout.strip().rstrip(':').split(':')
720
  if len(valarr) == 3:
721
    try:
722
      retdic = {
723
        "vg_size": int(round(float(valarr[0]), 0)),
724
        "vg_free": int(round(float(valarr[1]), 0)),
725
        "pv_count": int(valarr[2]),
726
        }
727
    except ValueError, err:
728
      logging.exception("Fail to parse vgs output")
729
  else:
730
    logging.error("vgs output has the wrong number of fields (expected"
731
                  " three): %s", str(valarr))
732
  return retdic
733

    
734

    
735
def _GatherBlockDevs(instance):
736
  """Set up an instance's block device(s).
737

738
  This is run on the primary node at instance startup. The block
739
  devices must be already assembled.
740

741
  @type instance: L{objects.Instance}
742
  @param instance: the instance whose disks we shoul assemble
743
  @rtype: list of L{bdev.BlockDev}
744
  @return: list of the block devices
745

746
  """
747
  block_devices = []
748
  for disk in instance.disks:
749
    device = _RecursiveFindBD(disk)
750
    if device is None:
751
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
752
                                    str(disk))
753
    device.Open()
754
    block_devices.append((disk, device))
755
  return block_devices
756

    
757

    
758
def StartInstance(instance, extra_args):
759
  """Start an instance.
760

761
  @type instance: L{objects.Instance}
762
  @param instance: the instance object
763
  @rtype: boolean
764
  @return: whether the startup was successful or not
765

766
  """
767
  running_instances = GetInstanceList([instance.hypervisor])
768

    
769
  if instance.name in running_instances:
770
    return True
771

    
772
  block_devices = _GatherBlockDevs(instance)
773
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
774

    
775
  try:
776
    hyper.StartInstance(instance, block_devices, extra_args)
777
  except errors.HypervisorError, err:
778
    logging.exception("Failed to start instance")
779
    return False
780

    
781
  return True
782

    
783

    
784
def ShutdownInstance(instance):
785
  """Shut an instance down.
786

787
  @note: this functions uses polling with a hardcoded timeout.
788

789
  @type instance: L{objects.Instance}
790
  @param instance: the instance object
791
  @rtype: boolean
792
  @return: whether the startup was successful or not
793

794
  """
795
  hv_name = instance.hypervisor
796
  running_instances = GetInstanceList([hv_name])
797

    
798
  if instance.name not in running_instances:
799
    return True
800

    
801
  hyper = hypervisor.GetHypervisor(hv_name)
802
  try:
803
    hyper.StopInstance(instance)
804
  except errors.HypervisorError, err:
805
    logging.error("Failed to stop instance")
806
    return False
807

    
808
  # test every 10secs for 2min
809
  shutdown_ok = False
810

    
811
  time.sleep(1)
812
  for dummy in range(11):
813
    if instance.name not in GetInstanceList([hv_name]):
814
      break
815
    time.sleep(10)
816
  else:
817
    # the shutdown did not succeed
818
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
819

    
820
    try:
821
      hyper.StopInstance(instance, force=True)
822
    except errors.HypervisorError, err:
823
      logging.exception("Failed to stop instance")
824
      return False
825

    
826
    time.sleep(1)
827
    if instance.name in GetInstanceList([hv_name]):
828
      logging.error("could not shutdown instance '%s' even by destroy",
829
                    instance.name)
830
      return False
831

    
832
  return True
833

    
834

    
835
def RebootInstance(instance, reboot_type, extra_args):
836
  """Reboot an instance.
837

838
  @type instance: L{objects.Instance}
839
  @param instance: the instance object to reboot
840
  @type reboot_type: str
841
  @param reboot_type: the type of reboot, one the following
842
    constants:
843
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
844
        instance OS, do not recreate the VM
845
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
846
        restart the VM (at the hypervisor level)
847
      - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
848
        is not accepted here, since that mode is handled
849
        differently
850
  @rtype: boolean
851
  @return: the success of the operation
852

853
  """
854
  running_instances = GetInstanceList([instance.hypervisor])
855

    
856
  if instance.name not in running_instances:
857
    logging.error("Cannot reboot instance that is not running")
858
    return False
859

    
860
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
861
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
862
    try:
863
      hyper.RebootInstance(instance)
864
    except errors.HypervisorError, err:
865
      logging.exception("Failed to soft reboot instance")
866
      return False
867
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
868
    try:
869
      ShutdownInstance(instance)
870
      StartInstance(instance, extra_args)
871
    except errors.HypervisorError, err:
872
      logging.exception("Failed to hard reboot instance")
873
      return False
874
  else:
875
    raise errors.ParameterError("reboot_type invalid")
876

    
877
  return True
878

    
879

    
880
def MigrateInstance(instance, target, live):
881
  """Migrates an instance to another node.
882

883
  @type instance: L{objects.Instance}
884
  @param instance: the instance definition
885
  @type target: string
886
  @param target: the target node name
887
  @type live: boolean
888
  @param live: whether the migration should be done live or not (the
889
      interpretation of this parameter is left to the hypervisor)
890
  @rtype: tuple
891
  @return: a tuple of (success, msg) where:
892
      - succes is a boolean denoting the success/failure of the operation
893
      - msg is a string with details in case of failure
894

895
  """
896
  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
897

    
898
  try:
899
    hyper.MigrateInstance(instance.name, target, live)
900
  except errors.HypervisorError, err:
901
    msg = "Failed to migrate instance: %s" % str(err)
902
    logging.error(msg)
903
    return (False, msg)
904
  return (True, "Migration successfull")
905

    
906

    
907
def CreateBlockDevice(disk, size, owner, on_primary, info):
908
  """Creates a block device for an instance.
909

910
  @type disk: L{objects.Disk}
911
  @param disk: the object describing the disk we should create
912
  @type size: int
913
  @param size: the size of the physical underlying device, in MiB
914
  @type owner: str
915
  @param owner: the name of the instance for which disk is created,
916
      used for device cache data
917
  @type on_primary: boolean
918
  @param on_primary:  indicates if it is the primary node or not
919
  @type info: string
920
  @param info: string that will be sent to the physical device
921
      creation, used for example to set (LVM) tags on LVs
922

923
  @return: the new unique_id of the device (this can sometime be
924
      computed only after creation), or None. On secondary nodes,
925
      it's not required to return anything.
926

927
  """
928
  clist = []
929
  if disk.children:
930
    for child in disk.children:
931
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
932
      if on_primary or disk.AssembleOnSecondary():
933
        # we need the children open in case the device itself has to
934
        # be assembled
935
        crdev.Open()
936
      clist.append(crdev)
937
  try:
938
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
939
    if device is not None:
940
      logging.info("removing existing device %s", disk)
941
      device.Remove()
942
  except errors.BlockDeviceError, err:
943
    pass
944

    
945
  device = bdev.Create(disk.dev_type, disk.physical_id,
946
                       clist, size)
947
  if device is None:
948
    raise ValueError("Can't create child device for %s, %s" %
949
                     (disk, size))
950
  if on_primary or disk.AssembleOnSecondary():
951
    if not device.Assemble():
952
      errorstring = "Can't assemble device after creation"
953
      logging.error(errorstring)
954
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
955
                                    " daemon logs" % errorstring)
956
    device.SetSyncSpeed(constants.SYNC_SPEED)
957
    if on_primary or disk.OpenOnSecondary():
958
      device.Open(force=True)
959
    DevCacheManager.UpdateCache(device.dev_path, owner,
960
                                on_primary, disk.iv_name)
961

    
962
  device.SetInfo(info)
963

    
964
  physical_id = device.unique_id
965
  return physical_id
966

    
967

    
968
def RemoveBlockDevice(disk):
969
  """Remove a block device.
970

971
  @note: This is intended to be called recursively.
972

973
  @type disk: L{objects.Disk}
974
  @param disk: the disk object we should remove
975
  @rtype: boolean
976
  @return: the success of the operation
977

978
  """
979
  try:
980
    # since we are removing the device, allow a partial match
981
    # this allows removal of broken mirrors
982
    rdev = _RecursiveFindBD(disk, allow_partial=True)
983
  except errors.BlockDeviceError, err:
984
    # probably can't attach
985
    logging.info("Can't attach to device %s in remove", disk)
986
    rdev = None
987
  if rdev is not None:
988
    r_path = rdev.dev_path
989
    result = rdev.Remove()
990
    if result:
991
      DevCacheManager.RemoveCache(r_path)
992
  else:
993
    result = True
994
  if disk.children:
995
    for child in disk.children:
996
      result = result and RemoveBlockDevice(child)
997
  return result
998

    
999

    
1000
def _RecursiveAssembleBD(disk, owner, as_primary):
1001
  """Activate a block device for an instance.
1002

1003
  This is run on the primary and secondary nodes for an instance.
1004

1005
  @note: this function is called recursively.
1006

1007
  @type disk: L{objects.Disk}
1008
  @param disk: the disk we try to assemble
1009
  @type owner: str
1010
  @param owner: the name of the instance which owns the disk
1011
  @type as_primary: boolean
1012
  @param as_primary: if we should make the block device
1013
      read/write
1014

1015
  @return: the assembled device or None (in case no device
1016
      was assembled)
1017
  @raise errors.BlockDeviceError: in case there is an error
1018
      during the activation of the children or the device
1019
      itself
1020

1021
  """
1022
  children = []
1023
  if disk.children:
1024
    mcn = disk.ChildrenNeeded()
1025
    if mcn == -1:
1026
      mcn = 0 # max number of Nones allowed
1027
    else:
1028
      mcn = len(disk.children) - mcn # max number of Nones
1029
    for chld_disk in disk.children:
1030
      try:
1031
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1032
      except errors.BlockDeviceError, err:
1033
        if children.count(None) >= mcn:
1034
          raise
1035
        cdev = None
1036
        logging.debug("Error in child activation: %s", str(err))
1037
      children.append(cdev)
1038

    
1039
  if as_primary or disk.AssembleOnSecondary():
1040
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
1041
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1042
    result = r_dev
1043
    if as_primary or disk.OpenOnSecondary():
1044
      r_dev.Open()
1045
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1046
                                as_primary, disk.iv_name)
1047

    
1048
  else:
1049
    result = True
1050
  return result
1051

    
1052

    
1053
def AssembleBlockDevice(disk, owner, as_primary):
1054
  """Activate a block device for an instance.
1055

1056
  This is a wrapper over _RecursiveAssembleBD.
1057

1058
  @rtype: str or boolean
1059
  @return: a C{/dev/...} path for primary nodes, and
1060
      C{True} for secondary nodes
1061

1062
  """
1063
  result = _RecursiveAssembleBD(disk, owner, as_primary)
1064
  if isinstance(result, bdev.BlockDev):
1065
    result = result.dev_path
1066
  return result
1067

    
1068

    
1069
def ShutdownBlockDevice(disk):
1070
  """Shut down a block device.
1071

1072
  First, if the device is assembled (Attach() is successfull), then
1073
  the device is shutdown. Then the children of the device are
1074
  shutdown.
1075

1076
  This function is called recursively. Note that we don't cache the
1077
  children or such, as oppossed to assemble, shutdown of different
1078
  devices doesn't require that the upper device was active.
1079

1080
  @type disk: L{objects.Disk}
1081
  @param disk: the description of the disk we should
1082
      shutdown
1083
  @rtype: boolean
1084
  @return: the success of the operation
1085

1086
  """
1087
  r_dev = _RecursiveFindBD(disk)
1088
  if r_dev is not None:
1089
    r_path = r_dev.dev_path
1090
    result = r_dev.Shutdown()
1091
    if result:
1092
      DevCacheManager.RemoveCache(r_path)
1093
  else:
1094
    result = True
1095
  if disk.children:
1096
    for child in disk.children:
1097
      result = result and ShutdownBlockDevice(child)
1098
  return result
1099

    
1100

    
1101
def MirrorAddChildren(parent_cdev, new_cdevs):
1102
  """Extend a mirrored block device.
1103

1104
  @type parent_cdev: L{objects.Disk}
1105
  @param parent_cdev: the disk to which we should add children
1106
  @type new_cdevs: list of L{objects.Disk}
1107
  @param new_cdevs: the list of children which we should add
1108
  @rtype: boolean
1109
  @return: the success of the operation
1110

1111
  """
1112
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
1113
  if parent_bdev is None:
1114
    logging.error("Can't find parent device")
1115
    return False
1116
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1117
  if new_bdevs.count(None) > 0:
1118
    logging.error("Can't find new device(s) to add: %s:%s",
1119
                  new_bdevs, new_cdevs)
1120
    return False
1121
  parent_bdev.AddChildren(new_bdevs)
1122
  return True
1123

    
1124

    
1125
def MirrorRemoveChildren(parent_cdev, new_cdevs):
1126
  """Shrink a mirrored block device.
1127

1128
  @type parent_cdev: L{objects.Disk}
1129
  @param parent_cdev: the disk from which we should remove children
1130
  @type new_cdevs: list of L{objects.Disk}
1131
  @param new_cdevs: the list of children which we should remove
1132
  @rtype: boolean
1133
  @return: the success of the operation
1134

1135
  """
1136
  parent_bdev = _RecursiveFindBD(parent_cdev)
1137
  if parent_bdev is None:
1138
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1139
    return False
1140
  devs = []
1141
  for disk in new_cdevs:
1142
    rpath = disk.StaticDevPath()
1143
    if rpath is None:
1144
      bd = _RecursiveFindBD(disk)
1145
      if bd is None:
1146
        logging.error("Can't find dynamic device %s while removing children",
1147
                      disk)
1148
        return False
1149
      else:
1150
        devs.append(bd.dev_path)
1151
    else:
1152
      devs.append(rpath)
1153
  parent_bdev.RemoveChildren(devs)
1154
  return True
1155

    
1156

    
1157
def GetMirrorStatus(disks):
1158
  """Get the mirroring status of a list of devices.
1159

1160
  @type disks: list of L{objects.Disk}
1161
  @param disks: the list of disks which we should query
1162
  @rtype: disk
1163
  @return:
1164
      a list of (mirror_done, estimated_time) tuples, which
1165
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1166
  @raise errors.BlockDeviceError: if any of the disks cannot be
1167
      found
1168

1169
  """
1170
  stats = []
1171
  for dsk in disks:
1172
    rbd = _RecursiveFindBD(dsk)
1173
    if rbd is None:
1174
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1175
    stats.append(rbd.CombinedSyncStatus())
1176
  return stats
1177

    
1178

    
1179
def _RecursiveFindBD(disk, allow_partial=False):
1180
  """Check if a device is activated.
1181

1182
  If so, return informations about the real device.
1183

1184
  @type disk: L{objects.Disk}
1185
  @param disk: the disk object we need to find
1186
  @type allow_partial: boolean
1187
  @param allow_partial: if true, don't abort the find if a
1188
      child of the device can't be found; this is intended
1189
      to be used when repairing mirrors
1190

1191
  @return: None if the device can't be found,
1192
      otherwise the device instance
1193

1194
  """
1195
  children = []
1196
  if disk.children:
1197
    for chdisk in disk.children:
1198
      children.append(_RecursiveFindBD(chdisk))
1199

    
1200
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1201

    
1202

    
1203
def FindBlockDevice(disk):
1204
  """Check if a device is activated.
1205

1206
  If it is, return informations about the real device.
1207

1208
  @type disk: L{objects.Disk}
1209
  @param disk: the disk to find
1210
  @rtype: None or tuple
1211
  @return: None if the disk cannot be found, otherwise a
1212
      tuple (device_path, major, minor, sync_percent,
1213
      estimated_time, is_degraded)
1214

1215
  """
1216
  rbd = _RecursiveFindBD(disk)
1217
  if rbd is None:
1218
    return rbd
1219
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1220

    
1221

    
1222
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1223
  """Write a file to the filesystem.
1224

1225
  This allows the master to overwrite(!) a file. It will only perform
1226
  the operation if the file belongs to a list of configuration files.
1227

1228
  @type file_name: str
1229
  @param file_name: the target file name
1230
  @type data: str
1231
  @param data: the new contents of the file
1232
  @type mode: int
1233
  @param mode: the mode to give the file (can be None)
1234
  @type uid: int
1235
  @param uid: the owner of the file (can be -1 for default)
1236
  @type gid: int
1237
  @param gid: the group of the file (can be -1 for default)
1238
  @type atime: float
1239
  @param atime: the atime to set on the file (can be None)
1240
  @type mtime: float
1241
  @param mtime: the mtime to set on the file (can be None)
1242
  @rtype: boolean
1243
  @return: the success of the operation; errors are logged
1244
      in the node daemon log
1245

1246
  """
1247
  if not os.path.isabs(file_name):
1248
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1249
                  file_name)
1250
    return False
1251

    
1252
  allowed_files = [
1253
    constants.CLUSTER_CONF_FILE,
1254
    constants.ETC_HOSTS,
1255
    constants.SSH_KNOWN_HOSTS_FILE,
1256
    constants.VNC_PASSWORD_FILE,
1257
    ]
1258

    
1259
  if file_name not in allowed_files:
1260
    logging.error("Filename passed to UploadFile not in allowed"
1261
                 " upload targets: '%s'", file_name)
1262
    return False
1263

    
1264
  raw_data = _Decompress(data)
1265

    
1266
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1267
                  atime=atime, mtime=mtime)
1268
  return True
1269

    
1270

    
1271
def WriteSsconfFiles(values):
1272
  """Update all ssconf files.
1273

1274
  Wrapper around the SimpleStore.WriteFiles.
1275

1276
  """
1277
  ssconf.SimpleStore().WriteFiles(values)
1278

    
1279

    
1280
def _ErrnoOrStr(err):
1281
  """Format an EnvironmentError exception.
1282

1283
  If the L{err} argument has an errno attribute, it will be looked up
1284
  and converted into a textual C{E...} description. Otherwise the
1285
  string representation of the error will be returned.
1286

1287
  @type err: L{EnvironmentError}
1288
  @param err: the exception to format
1289

1290
  """
1291
  if hasattr(err, 'errno'):
1292
    detail = errno.errorcode[err.errno]
1293
  else:
1294
    detail = str(err)
1295
  return detail
1296

    
1297

    
1298
def _OSOndiskVersion(name, os_dir):
1299
  """Compute and return the API version of a given OS.
1300

1301
  This function will try to read the API version of the OS given by
1302
  the 'name' parameter and residing in the 'os_dir' directory.
1303

1304
  @type name: str
1305
  @param name: the OS name we should look for
1306
  @type os_dir: str
1307
  @param os_dir: the directory inwhich we should look for the OS
1308
  @rtype: int or None
1309
  @return:
1310
      Either an integer denoting the version or None in the
1311
      case when this is not a valid OS name.
1312
  @raise errors.InvalidOS: if the OS cannot be found
1313

1314
  """
1315
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1316

    
1317
  try:
1318
    st = os.stat(api_file)
1319
  except EnvironmentError, err:
1320
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1321
                           " found (%s)" % _ErrnoOrStr(err))
1322

    
1323
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1324
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1325
                           " a regular file")
1326

    
1327
  try:
1328
    f = open(api_file)
1329
    try:
1330
      api_versions = f.readlines()
1331
    finally:
1332
      f.close()
1333
  except EnvironmentError, err:
1334
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1335
                           " API version (%s)" % _ErrnoOrStr(err))
1336

    
1337
  api_versions = [version.strip() for version in api_versions]
1338
  try:
1339
    api_versions = [int(version) for version in api_versions]
1340
  except (TypeError, ValueError), err:
1341
    raise errors.InvalidOS(name, os_dir,
1342
                           "API version is not integer (%s)" % str(err))
1343

    
1344
  return api_versions
1345

    
1346

    
1347
def DiagnoseOS(top_dirs=None):
1348
  """Compute the validity for all OSes.
1349

1350
  @type top_dirs: list
1351
  @param top_dirs: the list of directories in which to
1352
      search (if not given defaults to
1353
      L{constants.OS_SEARCH_PATH})
1354
  @rtype: list of L{objects.OS}
1355
  @return: an OS object for each name in all the given
1356
      directories
1357

1358
  """
1359
  if top_dirs is None:
1360
    top_dirs = constants.OS_SEARCH_PATH
1361

    
1362
  result = []
1363
  for dir_name in top_dirs:
1364
    if os.path.isdir(dir_name):
1365
      try:
1366
        f_names = utils.ListVisibleFiles(dir_name)
1367
      except EnvironmentError, err:
1368
        logging.exception("Can't list the OS directory %s", dir_name)
1369
        break
1370
      for name in f_names:
1371
        try:
1372
          os_inst = OSFromDisk(name, base_dir=dir_name)
1373
          result.append(os_inst)
1374
        except errors.InvalidOS, err:
1375
          result.append(objects.OS.FromInvalidOS(err))
1376

    
1377
  return result
1378

    
1379

    
1380
def OSFromDisk(name, base_dir=None):
1381
  """Create an OS instance from disk.
1382

1383
  This function will return an OS instance if the given name is a
1384
  valid OS name. Otherwise, it will raise an appropriate
1385
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1386

1387
  @type base_dir: string
1388
  @keyword base_dir: Base directory containing OS installations.
1389
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1390
  @rtype: L{objects.OS}
1391
  @return: the OS instance if we find a valid one
1392
  @raise errors.InvalidOS: if we don't find a valid OS
1393

1394
  """
1395
  if base_dir is None:
1396
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1397
    if os_dir is None:
1398
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1399
  else:
1400
    os_dir = os.path.sep.join([base_dir, name])
1401

    
1402
  api_versions = _OSOndiskVersion(name, os_dir)
1403

    
1404
  if constants.OS_API_VERSION not in api_versions:
1405
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1406
                           " (found %s want %s)"
1407
                           % (api_versions, constants.OS_API_VERSION))
1408

    
1409
  # OS Scripts dictionary, we will populate it with the actual script names
1410
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1411

    
1412
  for script in os_scripts:
1413
    os_scripts[script] = os.path.sep.join([os_dir, script])
1414

    
1415
    try:
1416
      st = os.stat(os_scripts[script])
1417
    except EnvironmentError, err:
1418
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1419
                             (script, _ErrnoOrStr(err)))
1420

    
1421
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1422
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1423
                             script)
1424

    
1425
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1426
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1427
                             script)
1428

    
1429

    
1430
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1431
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1432
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1433
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1434
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1435
                    api_versions=api_versions)
1436

    
1437
def OSEnvironment(instance, debug=0):
1438
  """Calculate the environment for an os script.
1439

1440
  @type instance: L{objects.Instance}
1441
  @param instance: target instance for the os script run
1442
  @type debug: integer
1443
  @param debug: debug level (0 or 1, for OS Api 10)
1444
  @rtype: dict
1445
  @return: dict of environment variables
1446
  @raise errors.BlockDeviceError: if the block device
1447
      cannot be found
1448

1449
  """
1450
  result = {}
1451
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1452
  result['INSTANCE_NAME'] = instance.name
1453
  result['HYPERVISOR'] = instance.hypervisor
1454
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1455
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1456
  result['DEBUG_LEVEL'] = '%d' % debug
1457
  for idx, disk in enumerate(instance.disks):
1458
    real_disk = _RecursiveFindBD(disk)
1459
    if real_disk is None:
1460
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1461
                                    str(disk))
1462
    real_disk.Open()
1463
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1464
    # FIXME: When disks will have read-only mode, populate this
1465
    result['DISK_%d_ACCESS' % idx] = 'W'
1466
    if constants.HV_DISK_TYPE in instance.hvparams:
1467
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1468
        instance.hvparams[constants.HV_DISK_TYPE]
1469
    if disk.dev_type in constants.LDS_BLOCK:
1470
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1471
    elif disk.dev_type == constants.LD_FILE:
1472
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1473
        'file:%s' % disk.physical_id[0]
1474
  for idx, nic in enumerate(instance.nics):
1475
    result['NIC_%d_MAC' % idx] = nic.mac
1476
    if nic.ip:
1477
      result['NIC_%d_IP' % idx] = nic.ip
1478
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1479
    if constants.HV_NIC_TYPE in instance.hvparams:
1480
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1481
        instance.hvparams[constants.HV_NIC_TYPE]
1482

    
1483
  return result
1484

    
1485
def GrowBlockDevice(disk, amount):
1486
  """Grow a stack of block devices.
1487

1488
  This function is called recursively, with the childrens being the
1489
  first ones to resize.
1490

1491
  @type disk: L{objects.Disk}
1492
  @param disk: the disk to be grown
1493
  @rtype: (status, result)
1494
  @return: a tuple with the status of the operation
1495
      (True/False), and the errors message if status
1496
      is False
1497

1498
  """
1499
  r_dev = _RecursiveFindBD(disk)
1500
  if r_dev is None:
1501
    return False, "Cannot find block device %s" % (disk,)
1502

    
1503
  try:
1504
    r_dev.Grow(amount)
1505
  except errors.BlockDeviceError, err:
1506
    return False, str(err)
1507

    
1508
  return True, None
1509

    
1510

    
1511
def SnapshotBlockDevice(disk):
1512
  """Create a snapshot copy of a block device.
1513

1514
  This function is called recursively, and the snapshot is actually created
1515
  just for the leaf lvm backend device.
1516

1517
  @type disk: L{objects.Disk}
1518
  @param disk: the disk to be snapshotted
1519
  @rtype: string
1520
  @return: snapshot disk path
1521

1522
  """
1523
  if disk.children:
1524
    if len(disk.children) == 1:
1525
      # only one child, let's recurse on it
1526
      return SnapshotBlockDevice(disk.children[0])
1527
    else:
1528
      # more than one child, choose one that matches
1529
      for child in disk.children:
1530
        if child.size == disk.size:
1531
          # return implies breaking the loop
1532
          return SnapshotBlockDevice(child)
1533
  elif disk.dev_type == constants.LD_LV:
1534
    r_dev = _RecursiveFindBD(disk)
1535
    if r_dev is not None:
1536
      # let's stay on the safe side and ask for the full size, for now
1537
      return r_dev.Snapshot(disk.size)
1538
    else:
1539
      return None
1540
  else:
1541
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1542
                                 " '%s' of type '%s'" %
1543
                                 (disk.unique_id, disk.dev_type))
1544

    
1545

    
1546
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1547
  """Export a block device snapshot to a remote node.
1548

1549
  @type disk: L{objects.Disk}
1550
  @param disk: the description of the disk to export
1551
  @type dest_node: str
1552
  @param dest_node: the destination node to export to
1553
  @type instance: L{objects.Instance}
1554
  @param instance: the instance object to whom the disk belongs
1555
  @type cluster_name: str
1556
  @param cluster_name: the cluster name, needed for SSH hostalias
1557
  @type idx: int
1558
  @param idx: the index of the disk in the instance's disk list,
1559
      used to export to the OS scripts environment
1560
  @rtype: boolean
1561
  @return: the success of the operation
1562

1563
  """
1564
  export_env = OSEnvironment(instance)
1565

    
1566
  inst_os = OSFromDisk(instance.os)
1567
  export_script = inst_os.export_script
1568

    
1569
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1570
                                     instance.name, int(time.time()))
1571
  if not os.path.exists(constants.LOG_OS_DIR):
1572
    os.mkdir(constants.LOG_OS_DIR, 0750)
1573
  real_disk = _RecursiveFindBD(disk)
1574
  if real_disk is None:
1575
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1576
                                  str(disk))
1577
  real_disk.Open()
1578

    
1579
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1580
  export_env['EXPORT_INDEX'] = str(idx)
1581

    
1582
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1583
  destfile = disk.physical_id[1]
1584

    
1585
  # the target command is built out of three individual commands,
1586
  # which are joined by pipes; we check each individual command for
1587
  # valid parameters
1588
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1589
                               export_script, logfile)
1590

    
1591
  comprcmd = "gzip"
1592

    
1593
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1594
                                destdir, destdir, destfile)
1595
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1596
                                                   constants.GANETI_RUNAS,
1597
                                                   destcmd)
1598

    
1599
  # all commands have been checked, so we're safe to combine them
1600
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1601

    
1602
  result = utils.RunCmd(command, env=export_env)
1603

    
1604
  if result.failed:
1605
    logging.error("os snapshot export command '%s' returned error: %s"
1606
                  " output: %s", command, result.fail_reason, result.output)
1607
    return False
1608

    
1609
  return True
1610

    
1611

    
1612
def FinalizeExport(instance, snap_disks):
1613
  """Write out the export configuration information.
1614

1615
  @type instance: L{objects.Instance}
1616
  @param instance: the instance which we export, used for
1617
      saving configuration
1618
  @type snap_disks: list of L{objects.Disk}
1619
  @param snap_disks: list of snapshot block devices, which
1620
      will be used to get the actual name of the dump file
1621

1622
  @rtype: boolean
1623
  @return: the success of the operation
1624

1625
  """
1626
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1627
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1628

    
1629
  config = objects.SerializableConfigParser()
1630

    
1631
  config.add_section(constants.INISECT_EXP)
1632
  config.set(constants.INISECT_EXP, 'version', '0')
1633
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1634
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1635
  config.set(constants.INISECT_EXP, 'os', instance.os)
1636
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1637

    
1638
  config.add_section(constants.INISECT_INS)
1639
  config.set(constants.INISECT_INS, 'name', instance.name)
1640
  config.set(constants.INISECT_INS, 'memory', '%d' %
1641
             instance.beparams[constants.BE_MEMORY])
1642
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1643
             instance.beparams[constants.BE_VCPUS])
1644
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1645

    
1646
  nic_count = 0
1647
  for nic_count, nic in enumerate(instance.nics):
1648
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1649
               nic_count, '%s' % nic.mac)
1650
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1651
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1652
               '%s' % nic.bridge)
1653
  # TODO: redundant: on load can read nics until it doesn't exist
1654
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1655

    
1656
  disk_total = 0
1657
  for disk_count, disk in enumerate(snap_disks):
1658
    if disk:
1659
      disk_total += 1
1660
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1661
                 ('%s' % disk.iv_name))
1662
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1663
                 ('%s' % disk.physical_id[1]))
1664
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1665
                 ('%d' % disk.size))
1666

    
1667
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1668

    
1669
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1670
                  data=config.Dumps())
1671
  shutil.rmtree(finaldestdir, True)
1672
  shutil.move(destdir, finaldestdir)
1673

    
1674
  return True
1675

    
1676

    
1677
def ExportInfo(dest):
1678
  """Get export configuration information.
1679

1680
  @type dest: str
1681
  @param dest: directory containing the export
1682

1683
  @rtype: L{objects.SerializableConfigParser}
1684
  @return: a serializable config file containing the
1685
      export info
1686

1687
  """
1688
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1689

    
1690
  config = objects.SerializableConfigParser()
1691
  config.read(cff)
1692

    
1693
  if (not config.has_section(constants.INISECT_EXP) or
1694
      not config.has_section(constants.INISECT_INS)):
1695
    return None
1696

    
1697
  return config
1698

    
1699

    
1700
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1701
  """Import an os image into an instance.
1702

1703
  @type instance: L{objects.Instance}
1704
  @param instance: instance to import the disks into
1705
  @type src_node: string
1706
  @param src_node: source node for the disk images
1707
  @type src_images: list of string
1708
  @param src_images: absolute paths of the disk images
1709
  @rtype: list of boolean
1710
  @return: each boolean represent the success of importing the n-th disk
1711

1712
  """
1713
  import_env = OSEnvironment(instance)
1714
  inst_os = OSFromDisk(instance.os)
1715
  import_script = inst_os.import_script
1716

    
1717
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1718
                                        instance.name, int(time.time()))
1719
  if not os.path.exists(constants.LOG_OS_DIR):
1720
    os.mkdir(constants.LOG_OS_DIR, 0750)
1721

    
1722
  comprcmd = "gunzip"
1723
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1724
                               import_script, logfile)
1725

    
1726
  final_result = []
1727
  for idx, image in enumerate(src_images):
1728
    if image:
1729
      destcmd = utils.BuildShellCmd('cat %s', image)
1730
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1731
                                                       constants.GANETI_RUNAS,
1732
                                                       destcmd)
1733
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1734
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1735
      import_env['IMPORT_INDEX'] = str(idx)
1736
      result = utils.RunCmd(command, env=import_env)
1737
      if result.failed:
1738
        logging.error("Disk import command '%s' returned error: %s"
1739
                      " output: %s", command, result.fail_reason,
1740
                      result.output)
1741
        final_result.append(False)
1742
      else:
1743
        final_result.append(True)
1744
    else:
1745
      final_result.append(True)
1746

    
1747
  return final_result
1748

    
1749

    
1750
def ListExports():
1751
  """Return a list of exports currently available on this machine.
1752

1753
  @rtype: list
1754
  @return: list of the exports
1755

1756
  """
1757
  if os.path.isdir(constants.EXPORT_DIR):
1758
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1759
  else:
1760
    return []
1761

    
1762

    
1763
def RemoveExport(export):
1764
  """Remove an existing export from the node.
1765

1766
  @type export: str
1767
  @param export: the name of the export to remove
1768
  @rtype: boolean
1769
  @return: the success of the operation
1770

1771
  """
1772
  target = os.path.join(constants.EXPORT_DIR, export)
1773

    
1774
  shutil.rmtree(target)
1775
  # TODO: catch some of the relevant exceptions and provide a pretty
1776
  # error message if rmtree fails.
1777

    
1778
  return True
1779

    
1780

    
1781
def RenameBlockDevices(devlist):
1782
  """Rename a list of block devices.
1783

1784
  @type devlist: list of tuples
1785
  @param devlist: list of tuples of the form  (disk,
1786
      new_logical_id, new_physical_id); disk is an
1787
      L{objects.Disk} object describing the current disk,
1788
      and new logical_id/physical_id is the name we
1789
      rename it to
1790
  @rtype: boolean
1791
  @return: True if all renames succeeded, False otherwise
1792

1793
  """
1794
  result = True
1795
  for disk, unique_id in devlist:
1796
    dev = _RecursiveFindBD(disk)
1797
    if dev is None:
1798
      result = False
1799
      continue
1800
    try:
1801
      old_rpath = dev.dev_path
1802
      dev.Rename(unique_id)
1803
      new_rpath = dev.dev_path
1804
      if old_rpath != new_rpath:
1805
        DevCacheManager.RemoveCache(old_rpath)
1806
        # FIXME: we should add the new cache information here, like:
1807
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1808
        # but we don't have the owner here - maybe parse from existing
1809
        # cache? for now, we only lose lvm data when we rename, which
1810
        # is less critical than DRBD or MD
1811
    except errors.BlockDeviceError, err:
1812
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1813
      result = False
1814
  return result
1815

    
1816

    
1817
def _TransformFileStorageDir(file_storage_dir):
1818
  """Checks whether given file_storage_dir is valid.
1819

1820
  Checks wheter the given file_storage_dir is within the cluster-wide
1821
  default file_storage_dir stored in SimpleStore. Only paths under that
1822
  directory are allowed.
1823

1824
  @type file_storage_dir: str
1825
  @param file_storage_dir: the path to check
1826

1827
  @return: the normalized path if valid, None otherwise
1828

1829
  """
1830
  cfg = _GetConfig()
1831
  file_storage_dir = os.path.normpath(file_storage_dir)
1832
  base_file_storage_dir = cfg.GetFileStorageDir()
1833
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1834
      base_file_storage_dir):
1835
    logging.error("file storage directory '%s' is not under base file"
1836
                  " storage directory '%s'",
1837
                  file_storage_dir, base_file_storage_dir)
1838
    return None
1839
  return file_storage_dir
1840

    
1841

    
1842
def CreateFileStorageDir(file_storage_dir):
1843
  """Create file storage directory.
1844

1845
  @type file_storage_dir: str
1846
  @param file_storage_dir: directory to create
1847

1848
  @rtype: tuple
1849
  @return: tuple with first element a boolean indicating wheter dir
1850
      creation was successful or not
1851

1852
  """
1853
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1854
  result = True,
1855
  if not file_storage_dir:
1856
    result = False,
1857
  else:
1858
    if os.path.exists(file_storage_dir):
1859
      if not os.path.isdir(file_storage_dir):
1860
        logging.error("'%s' is not a directory", file_storage_dir)
1861
        result = False,
1862
    else:
1863
      try:
1864
        os.makedirs(file_storage_dir, 0750)
1865
      except OSError, err:
1866
        logging.error("Cannot create file storage directory '%s': %s",
1867
                      file_storage_dir, err)
1868
        result = False,
1869
  return result
1870

    
1871

    
1872
def RemoveFileStorageDir(file_storage_dir):
1873
  """Remove file storage directory.
1874

1875
  Remove it only if it's empty. If not log an error and return.
1876

1877
  @type file_storage_dir: str
1878
  @param file_storage_dir: the directory we should cleanup
1879
  @rtype: tuple (success,)
1880
  @return: tuple of one element, C{success}, denoting
1881
      whether the operation was successfull
1882

1883
  """
1884
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1885
  result = True,
1886
  if not file_storage_dir:
1887
    result = False,
1888
  else:
1889
    if os.path.exists(file_storage_dir):
1890
      if not os.path.isdir(file_storage_dir):
1891
        logging.error("'%s' is not a directory", file_storage_dir)
1892
        result = False,
1893
      # deletes dir only if empty, otherwise we want to return False
1894
      try:
1895
        os.rmdir(file_storage_dir)
1896
      except OSError, err:
1897
        logging.exception("Cannot remove file storage directory '%s'",
1898
                          file_storage_dir)
1899
        result = False,
1900
  return result
1901

    
1902

    
1903
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1904
  """Rename the file storage directory.
1905

1906
  @type old_file_storage_dir: str
1907
  @param old_file_storage_dir: the current path
1908
  @type new_file_storage_dir: str
1909
  @param new_file_storage_dir: the name we should rename to
1910
  @rtype: tuple (success,)
1911
  @return: tuple of one element, C{success}, denoting
1912
      whether the operation was successful
1913

1914
  """
1915
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1916
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1917
  result = True,
1918
  if not old_file_storage_dir or not new_file_storage_dir:
1919
    result = False,
1920
  else:
1921
    if not os.path.exists(new_file_storage_dir):
1922
      if os.path.isdir(old_file_storage_dir):
1923
        try:
1924
          os.rename(old_file_storage_dir, new_file_storage_dir)
1925
        except OSError, err:
1926
          logging.exception("Cannot rename '%s' to '%s'",
1927
                            old_file_storage_dir, new_file_storage_dir)
1928
          result =  False,
1929
      else:
1930
        logging.error("'%s' is not a directory", old_file_storage_dir)
1931
        result = False,
1932
    else:
1933
      if os.path.exists(old_file_storage_dir):
1934
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1935
                      old_file_storage_dir, new_file_storage_dir)
1936
        result = False,
1937
  return result
1938

    
1939

    
1940
def _IsJobQueueFile(file_name):
1941
  """Checks whether the given filename is in the queue directory.
1942

1943
  @type file_name: str
1944
  @param file_name: the file name we should check
1945
  @rtype: boolean
1946
  @return: whether the file is under the queue directory
1947

1948
  """
1949
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1950
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1951

    
1952
  if not result:
1953
    logging.error("'%s' is not a file in the queue directory",
1954
                  file_name)
1955

    
1956
  return result
1957

    
1958

    
1959
def JobQueueUpdate(file_name, content):
1960
  """Updates a file in the queue directory.
1961

1962
  This is just a wrapper over L{utils.WriteFile}, with proper
1963
  checking.
1964

1965
  @type file_name: str
1966
  @param file_name: the job file name
1967
  @type content: str
1968
  @param content: the new job contents
1969
  @rtype: boolean
1970
  @return: the success of the operation
1971

1972
  """
1973
  if not _IsJobQueueFile(file_name):
1974
    return False
1975

    
1976
  # Write and replace the file atomically
1977
  utils.WriteFile(file_name, data=_Decompress(content))
1978

    
1979
  return True
1980

    
1981

    
1982
def JobQueueRename(old, new):
1983
  """Renames a job queue file.
1984

1985
  This is just a wrapper over os.rename with proper checking.
1986

1987
  @type old: str
1988
  @param old: the old (actual) file name
1989
  @type new: str
1990
  @param new: the desired file name
1991
  @rtype: boolean
1992
  @return: the success of the operation
1993

1994
  """
1995
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1996
    return False
1997

    
1998
  os.rename(old, new)
1999

    
2000
  return True
2001

    
2002

    
2003
def JobQueueSetDrainFlag(drain_flag):
2004
  """Set the drain flag for the queue.
2005

2006
  This will set or unset the queue drain flag.
2007

2008
  @type drain_flag: boolean
2009
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2010
  @rtype: boolean
2011
  @return: always True
2012
  @warning: the function always returns True
2013

2014
  """
2015
  if drain_flag:
2016
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2017
  else:
2018
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2019

    
2020
  return True
2021

    
2022

    
2023
def CloseBlockDevices(disks):
2024
  """Closes the given block devices.
2025

2026
  This means they will be switched to secondary mode (in case of
2027
  DRBD).
2028

2029
  @type disks: list of L{objects.Disk}
2030
  @param disks: the list of disks to be closed
2031
  @rtype: tuple (success, message)
2032
  @return: a tuple of success and message, where success
2033
      indicates the succes of the operation, and message
2034
      which will contain the error details in case we
2035
      failed
2036

2037
  """
2038
  bdevs = []
2039
  for cf in disks:
2040
    rd = _RecursiveFindBD(cf)
2041
    if rd is None:
2042
      return (False, "Can't find device %s" % cf)
2043
    bdevs.append(rd)
2044

    
2045
  msg = []
2046
  for rd in bdevs:
2047
    try:
2048
      rd.Close()
2049
    except errors.BlockDeviceError, err:
2050
      msg.append(str(err))
2051
  if msg:
2052
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2053
  else:
2054
    return (True, "All devices secondary")
2055

    
2056

    
2057
def ValidateHVParams(hvname, hvparams):
2058
  """Validates the given hypervisor parameters.
2059

2060
  @type hvname: string
2061
  @param hvname: the hypervisor name
2062
  @type hvparams: dict
2063
  @param hvparams: the hypervisor parameters to be validated
2064
  @rtype: tuple (success, message)
2065
  @return: a tuple of success and message, where success
2066
      indicates the succes of the operation, and message
2067
      which will contain the error details in case we
2068
      failed
2069

2070
  """
2071
  try:
2072
    hv_type = hypervisor.GetHypervisor(hvname)
2073
    hv_type.ValidateParameters(hvparams)
2074
    return (True, "Validation passed")
2075
  except errors.HypervisorError, err:
2076
    return (False, str(err))
2077

    
2078

    
2079
def DemoteFromMC():
2080
  """Demotes the current node from master candidate role.
2081

2082
  """
2083
  # try to ensure we're not the master by mistake
2084
  master, myself = ssconf.GetMasterAndMyself()
2085
  if master == myself:
2086
    return (False, "ssconf status shows I'm the master node, will not demote")
2087
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2088
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2089
    return (False, "The master daemon is running, will not demote")
2090
  try:
2091
    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2092
  except EnvironmentError, err:
2093
    if err.errno != errno.ENOENT:
2094
      return (False, "Error while backing up cluster file: %s" % str(err))
2095
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2096
  return (True, "Done")
2097

    
2098

    
2099
class HooksRunner(object):
2100
  """Hook runner.
2101

2102
  This class is instantiated on the node side (ganeti-noded) and not
2103
  on the master side.
2104

2105
  """
2106
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2107

    
2108
  def __init__(self, hooks_base_dir=None):
2109
    """Constructor for hooks runner.
2110

2111
    @type hooks_base_dir: str or None
2112
    @param hooks_base_dir: if not None, this overrides the
2113
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2114

2115
    """
2116
    if hooks_base_dir is None:
2117
      hooks_base_dir = constants.HOOKS_BASE_DIR
2118
    self._BASE_DIR = hooks_base_dir
2119

    
2120
  @staticmethod
2121
  def ExecHook(script, env):
2122
    """Exec one hook script.
2123

2124
    @type script: str
2125
    @param script: the full path to the script
2126
    @type env: dict
2127
    @param env: the environment with which to exec the script
2128
    @rtype: tuple (success, message)
2129
    @return: a tuple of success and message, where success
2130
        indicates the succes of the operation, and message
2131
        which will contain the error details in case we
2132
        failed
2133

2134
    """
2135
    # exec the process using subprocess and log the output
2136
    fdstdin = None
2137
    try:
2138
      fdstdin = open("/dev/null", "r")
2139
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2140
                               stderr=subprocess.STDOUT, close_fds=True,
2141
                               shell=False, cwd="/", env=env)
2142
      output = ""
2143
      try:
2144
        output = child.stdout.read(4096)
2145
        child.stdout.close()
2146
      except EnvironmentError, err:
2147
        output += "Hook script error: %s" % str(err)
2148

    
2149
      while True:
2150
        try:
2151
          result = child.wait()
2152
          break
2153
        except EnvironmentError, err:
2154
          if err.errno == errno.EINTR:
2155
            continue
2156
          raise
2157
    finally:
2158
      # try not to leak fds
2159
      for fd in (fdstdin, ):
2160
        if fd is not None:
2161
          try:
2162
            fd.close()
2163
          except EnvironmentError, err:
2164
            # just log the error
2165
            #logging.exception("Error while closing fd %s", fd)
2166
            pass
2167

    
2168
    return result == 0, output
2169

    
2170
  def RunHooks(self, hpath, phase, env):
2171
    """Run the scripts in the hooks directory.
2172

2173
    @type hpath: str
2174
    @param hpath: the path to the hooks directory which
2175
        holds the scripts
2176
    @type phase: str
2177
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2178
        L{constants.HOOKS_PHASE_POST}
2179
    @type env: dict
2180
    @param env: dictionary with the environment for the hook
2181
    @rtype: list
2182
    @return: list of 3-element tuples:
2183
      - script path
2184
      - script result, either L{constants.HKR_SUCCESS} or
2185
        L{constants.HKR_FAIL}
2186
      - output of the script
2187

2188
    @raise errors.ProgrammerError: for invalid input
2189
        parameters
2190

2191
    """
2192
    if phase == constants.HOOKS_PHASE_PRE:
2193
      suffix = "pre"
2194
    elif phase == constants.HOOKS_PHASE_POST:
2195
      suffix = "post"
2196
    else:
2197
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2198
    rr = []
2199

    
2200
    subdir = "%s-%s.d" % (hpath, suffix)
2201
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2202
    try:
2203
      dir_contents = utils.ListVisibleFiles(dir_name)
2204
    except OSError, err:
2205
      # FIXME: must log output in case of failures
2206
      return rr
2207

    
2208
    # we use the standard python sort order,
2209
    # so 00name is the recommended naming scheme
2210
    dir_contents.sort()
2211
    for relname in dir_contents:
2212
      fname = os.path.join(dir_name, relname)
2213
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2214
          self.RE_MASK.match(relname) is not None):
2215
        rrval = constants.HKR_SKIP
2216
        output = ""
2217
      else:
2218
        result, output = self.ExecHook(fname, env)
2219
        if not result:
2220
          rrval = constants.HKR_FAIL
2221
        else:
2222
          rrval = constants.HKR_SUCCESS
2223
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2224

    
2225
    return rr
2226

    
2227

    
2228
class IAllocatorRunner(object):
2229
  """IAllocator runner.
2230

2231
  This class is instantiated on the node side (ganeti-noded) and not on
2232
  the master side.
2233

2234
  """
2235
  def Run(self, name, idata):
2236
    """Run an iallocator script.
2237

2238
    @type name: str
2239
    @param name: the iallocator script name
2240
    @type idata: str
2241
    @param idata: the allocator input data
2242

2243
    @rtype: tuple
2244
    @return: four element tuple of:
2245
       - run status (one of the IARUN_ constants)
2246
       - stdout
2247
       - stderr
2248
       - fail reason (as from L{utils.RunResult})
2249

2250
    """
2251
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2252
                                  os.path.isfile)
2253
    if alloc_script is None:
2254
      return (constants.IARUN_NOTFOUND, None, None, None)
2255

    
2256
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2257
    try:
2258
      os.write(fd, idata)
2259
      os.close(fd)
2260
      result = utils.RunCmd([alloc_script, fin_name])
2261
      if result.failed:
2262
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2263
                result.fail_reason)
2264
    finally:
2265
      os.unlink(fin_name)
2266

    
2267
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2268

    
2269

    
2270
class DevCacheManager(object):
2271
  """Simple class for managing a cache of block device information.
2272

2273
  """
2274
  _DEV_PREFIX = "/dev/"
2275
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2276

    
2277
  @classmethod
2278
  def _ConvertPath(cls, dev_path):
2279
    """Converts a /dev/name path to the cache file name.
2280

2281
    This replaces slashes with underscores and strips the /dev
2282
    prefix. It then returns the full path to the cache file.
2283

2284
    @type dev_path: str
2285
    @param dev_path: the C{/dev/} path name
2286
    @rtype: str
2287
    @return: the converted path name
2288

2289
    """
2290
    if dev_path.startswith(cls._DEV_PREFIX):
2291
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2292
    dev_path = dev_path.replace("/", "_")
2293
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2294
    return fpath
2295

    
2296
  @classmethod
2297
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2298
    """Updates the cache information for a given device.
2299

2300
    @type dev_path: str
2301
    @param dev_path: the pathname of the device
2302
    @type owner: str
2303
    @param owner: the owner (instance name) of the device
2304
    @type on_primary: bool
2305
    @param on_primary: whether this is the primary
2306
        node nor not
2307
    @type iv_name: str
2308
    @param iv_name: the instance-visible name of the
2309
        device, as in objects.Disk.iv_name
2310

2311
    @rtype: None
2312

2313
    """
2314
    if dev_path is None:
2315
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2316
      return
2317
    fpath = cls._ConvertPath(dev_path)
2318
    if on_primary:
2319
      state = "primary"
2320
    else:
2321
      state = "secondary"
2322
    if iv_name is None:
2323
      iv_name = "not_visible"
2324
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2325
    try:
2326
      utils.WriteFile(fpath, data=fdata)
2327
    except EnvironmentError, err:
2328
      logging.exception("Can't update bdev cache for %s", dev_path)
2329

    
2330
  @classmethod
2331
  def RemoveCache(cls, dev_path):
2332
    """Remove data for a dev_path.
2333

2334
    This is just a wrapper over L{utils.RemoveFile} with a converted
2335
    path name and logging.
2336

2337
    @type dev_path: str
2338
    @param dev_path: the pathname of the device
2339

2340
    @rtype: None
2341

2342
    """
2343
    if dev_path is None:
2344
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2345
      return
2346
    fpath = cls._ConvertPath(dev_path)
2347
    try:
2348
      utils.RemoveFile(fpath)
2349
    except EnvironmentError, err:
2350
      logging.exception("Can't update bdev cache for %s", dev_path)