Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 52e2f66e

History | View | Annotate | Download (69.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon"""
23

    
24

    
25
import os
26
import os.path
27
import shutil
28
import time
29
import stat
30
import errno
31
import re
32
import subprocess
33
import random
34
import logging
35
import tempfile
36
import zlib
37
import base64
38

    
39
from ganeti import errors
40
from ganeti import utils
41
from ganeti import ssh
42
from ganeti import hypervisor
43
from ganeti import constants
44
from ganeti import bdev
45
from ganeti import objects
46
from ganeti import ssconf
47

    
48

    
49
def _GetConfig():
50
  """Simple wrapper to return a SimpleStore.
51

52
  @rtype: L{ssconf.SimpleStore}
53
  @return: a SimpleStore instance
54

55
  """
56
  return ssconf.SimpleStore()
57

    
58

    
59
def _GetSshRunner(cluster_name):
60
  """Simple wrapper to return an SshRunner.
61

62
  @type cluster_name: str
63
  @param cluster_name: the cluster name, which is needed
64
      by the SshRunner constructor
65
  @rtype: L{ssh.SshRunner}
66
  @return: an SshRunner instance
67

68
  """
69
  return ssh.SshRunner(cluster_name)
70

    
71

    
72
def _Decompress(data):
73
  """Unpacks data compressed by the RPC client.
74

75
  @type data: list or tuple
76
  @param data: Data sent by RPC client
77
  @rtype: str
78
  @return: Decompressed data
79

80
  """
81
  assert isinstance(data, (list, tuple))
82
  assert len(data) == 2
83
  (encoding, content) = data
84
  if encoding == constants.RPC_ENCODING_NONE:
85
    return content
86
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
87
    return zlib.decompress(base64.b64decode(content))
88
  else:
89
    raise AssertionError("Unknown data encoding")
90

    
91

    
92
def _CleanDirectory(path, exclude=[]):
93
  """Removes all regular files in a directory.
94

95
  @type path: str
96
  @param path: the directory to clean
97
  @type exclude: list
98
  @param exclude: list of files to be excluded, defaults
99
      to the empty list
100
  @rtype: None
101

102
  """
103
  if not os.path.isdir(path):
104
    return
105

    
106
  # Normalize excluded paths
107
  exclude = [os.path.normpath(i) for i in exclude]
108

    
109
  for rel_name in utils.ListVisibleFiles(path):
110
    full_name = os.path.normpath(os.path.join(path, rel_name))
111
    if full_name in exclude:
112
      continue
113
    if os.path.isfile(full_name) and not os.path.islink(full_name):
114
      utils.RemoveFile(full_name)
115

    
116

    
117
def JobQueuePurge():
118
  """Removes job queue files and archived jobs.
119

120
  @rtype: None
121

122
  """
123
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
124
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
125

    
126

    
127
def GetMasterInfo():
128
  """Returns master information.
129

130
  This is an utility function to compute master information, either
131
  for consumption here or from the node daemon.
132

133
  @rtype: tuple
134
  @return: (master_netdev, master_ip, master_name) if we have a good
135
      configuration, otherwise (None, None, None)
136

137
  """
138
  try:
139
    cfg = _GetConfig()
140
    master_netdev = cfg.GetMasterNetdev()
141
    master_ip = cfg.GetMasterIP()
142
    master_node = cfg.GetMasterNode()
143
  except errors.ConfigurationError, err:
144
    logging.exception("Cluster configuration incomplete")
145
    return (None, None, None)
146
  return (master_netdev, master_ip, master_node)
147

    
148

    
149
def StartMaster(start_daemons):
150
  """Activate local node as master node.
151

152
  The function will always try activate the IP address of the master
153
  (unless someone else has it). It will also start the master daemons,
154
  based on the start_daemons parameter.
155

156
  @type start_daemons: boolean
157
  @param start_daemons: whther to also start the master
158
      daemons (ganeti-masterd and ganeti-rapi)
159
  @rtype: None
160

161
  """
162
  ok = True
163
  master_netdev, master_ip, _ = GetMasterInfo()
164
  if not master_netdev:
165
    return False
166

    
167
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
168
    if utils.OwnIpAddress(master_ip):
169
      # we already have the ip:
170
      logging.debug("Already started")
171
    else:
172
      logging.error("Someone else has the master ip, not activating")
173
      ok = False
174
  else:
175
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
176
                           "dev", master_netdev, "label",
177
                           "%s:0" % master_netdev])
178
    if result.failed:
179
      logging.error("Can't activate master IP: %s", result.output)
180
      ok = False
181

    
182
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
183
                           "-s", master_ip, master_ip])
184
    # we'll ignore the exit code of arping
185

    
186
  # and now start the master and rapi daemons
187
  if start_daemons:
188
    for daemon in 'ganeti-masterd', 'ganeti-rapi':
189
      result = utils.RunCmd([daemon])
190
      if result.failed:
191
        logging.error("Can't start daemon %s: %s", daemon, result.output)
192
        ok = False
193
  return ok
194

    
195

    
196
def StopMaster(stop_daemons):
197
  """Deactivate this node as master.
198

199
  The function will always try to deactivate the IP address of the
200
  master. It will also stop the master daemons depending on the
201
  stop_daemons parameter.
202

203
  @type stop_daemons: boolean
204
  @param stop_daemons: whether to also stop the master daemons
205
      (ganeti-masterd and ganeti-rapi)
206
  @rtype: None
207

208
  """
209
  master_netdev, master_ip, _ = GetMasterInfo()
210
  if not master_netdev:
211
    return False
212

    
213
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
214
                         "dev", master_netdev])
215
  if result.failed:
216
    logging.error("Can't remove the master IP, error: %s", result.output)
217
    # but otherwise ignore the failure
218

    
219
  if stop_daemons:
220
    # stop/kill the rapi and the master daemon
221
    for daemon in constants.RAPI_PID, constants.MASTERD_PID:
222
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
223

    
224
  return True
225

    
226

    
227
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
228
  """Joins this node to the cluster.
229

230
  This does the following:
231
      - updates the hostkeys of the machine (rsa and dsa)
232
      - adds the ssh private key to the user
233
      - adds the ssh public key to the users' authorized_keys file
234

235
  @type dsa: str
236
  @param dsa: the DSA private key to write
237
  @type dsapub: str
238
  @param dsapub: the DSA public key to write
239
  @type rsa: str
240
  @param rsa: the RSA private key to write
241
  @type rsapub: str
242
  @param rsapub: the RSA public key to write
243
  @type sshkey: str
244
  @param sshkey: the SSH private key to write
245
  @type sshpub: str
246
  @param sshpub: the SSH public key to write
247
  @rtype: boolean
248
  @return: the success of the operation
249

250
  """
251
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
252
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
253
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
254
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
255
  for name, content, mode in sshd_keys:
256
    utils.WriteFile(name, data=content, mode=mode)
257

    
258
  try:
259
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
260
                                                    mkdir=True)
261
  except errors.OpExecError, err:
262
    logging.exception("Error while processing user ssh files")
263
    return False
264

    
265
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
266
    utils.WriteFile(name, data=content, mode=0600)
267

    
268
  utils.AddAuthorizedKey(auth_keys, sshpub)
269

    
270
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
271

    
272
  return True
273

    
274

    
275
def LeaveCluster():
276
  """Cleans up and remove the current node.
277

278
  This function cleans up and prepares the current node to be removed
279
  from the cluster.
280

281
  If processing is successful, then it raises an
282
  L{errors.GanetiQuitException} which is used as a special case to
283
  shutdown the node daemon.
284

285
  """
286
  _CleanDirectory(constants.DATA_DIR)
287
  JobQueuePurge()
288

    
289
  try:
290
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
291
  except errors.OpExecError:
292
    logging.exception("Error while processing ssh files")
293
    return
294

    
295
  f = open(pub_key, 'r')
296
  try:
297
    utils.RemoveAuthorizedKey(auth_keys, f.read(8192))
298
  finally:
299
    f.close()
300

    
301
  utils.RemoveFile(priv_key)
302
  utils.RemoveFile(pub_key)
303

    
304
  # Return a reassuring string to the caller, and quit
305
  raise errors.QuitGanetiException(False, 'Shutdown scheduled')
306

    
307

    
308
def GetNodeInfo(vgname, hypervisor_type):
309
  """Gives back a hash with different informations about the node.
310

311
  @type vgname: C{string}
312
  @param vgname: the name of the volume group to ask for disk space information
313
  @type hypervisor_type: C{str}
314
  @param hypervisor_type: the name of the hypervisor to ask for
315
      memory information
316
  @rtype: C{dict}
317
  @return: dictionary with the following keys:
318
      - vg_size is the size of the configured volume group in MiB
319
      - vg_free is the free size of the volume group in MiB
320
      - memory_dom0 is the memory allocated for domain0 in MiB
321
      - memory_free is the currently available (free) ram in MiB
322
      - memory_total is the total number of ram in MiB
323

324
  """
325
  outputarray = {}
326
  vginfo = _GetVGInfo(vgname)
327
  outputarray['vg_size'] = vginfo['vg_size']
328
  outputarray['vg_free'] = vginfo['vg_free']
329

    
330
  hyper = hypervisor.GetHypervisor(hypervisor_type)
331
  hyp_info = hyper.GetNodeInfo()
332
  if hyp_info is not None:
333
    outputarray.update(hyp_info)
334

    
335
  f = open("/proc/sys/kernel/random/boot_id", 'r')
336
  try:
337
    outputarray["bootid"] = f.read(128).rstrip("\n")
338
  finally:
339
    f.close()
340

    
341
  return outputarray
342

    
343

    
344
def VerifyNode(what, cluster_name):
345
  """Verify the status of the local node.
346

347
  Based on the input L{what} parameter, various checks are done on the
348
  local node.
349

350
  If the I{filelist} key is present, this list of
351
  files is checksummed and the file/checksum pairs are returned.
352

353
  If the I{nodelist} key is present, we check that we have
354
  connectivity via ssh with the target nodes (and check the hostname
355
  report).
356

357
  If the I{node-net-test} key is present, we check that we have
358
  connectivity to the given nodes via both primary IP and, if
359
  applicable, secondary IPs.
360

361
  @type what: C{dict}
362
  @param what: a dictionary of things to check:
363
      - filelist: list of files for which to compute checksums
364
      - nodelist: list of nodes we should check ssh communication with
365
      - node-net-test: list of nodes we should check node daemon port
366
        connectivity with
367
      - hypervisor: list with hypervisors to run the verify for
368
  @rtype: dict
369
  @return: a dictionary with the same keys as the input dict, and
370
      values representing the result of the checks
371

372
  """
373
  result = {}
374

    
375
  if constants.NV_HYPERVISOR in what:
376
    result[constants.NV_HYPERVISOR] = tmp = {}
377
    for hv_name in what[constants.NV_HYPERVISOR]:
378
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
379

    
380
  if constants.NV_FILELIST in what:
381
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
382
      what[constants.NV_FILELIST])
383

    
384
  if constants.NV_NODELIST in what:
385
    result[constants.NV_NODELIST] = tmp = {}
386
    random.shuffle(what[constants.NV_NODELIST])
387
    for node in what[constants.NV_NODELIST]:
388
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
389
      if not success:
390
        tmp[node] = message
391

    
392
  if constants.NV_NODENETTEST in what:
393
    result[constants.NV_NODENETTEST] = tmp = {}
394
    my_name = utils.HostInfo().name
395
    my_pip = my_sip = None
396
    for name, pip, sip in what[constants.NV_NODENETTEST]:
397
      if name == my_name:
398
        my_pip = pip
399
        my_sip = sip
400
        break
401
    if not my_pip:
402
      tmp[my_name] = ("Can't find my own primary/secondary IP"
403
                      " in the node list")
404
    else:
405
      port = utils.GetNodeDaemonPort()
406
      for name, pip, sip in what[constants.NV_NODENETTEST]:
407
        fail = []
408
        if not utils.TcpPing(pip, port, source=my_pip):
409
          fail.append("primary")
410
        if sip != pip:
411
          if not utils.TcpPing(sip, port, source=my_sip):
412
            fail.append("secondary")
413
        if fail:
414
          tmp[name] = ("failure using the %s interface(s)" %
415
                       " and ".join(fail))
416

    
417
  if constants.NV_LVLIST in what:
418
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
419

    
420
  if constants.NV_INSTANCELIST in what:
421
    result[constants.NV_INSTANCELIST] = GetInstanceList(
422
      what[constants.NV_INSTANCELIST])
423

    
424
  if constants.NV_VGLIST in what:
425
    result[constants.NV_VGLIST] = ListVolumeGroups()
426

    
427
  if constants.NV_VERSION in what:
428
    result[constants.NV_VERSION] = constants.PROTOCOL_VERSION
429

    
430
  if constants.NV_HVINFO in what:
431
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
432
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
433

    
434
  return result
435

    
436

    
437
def GetVolumeList(vg_name):
438
  """Compute list of logical volumes and their size.
439

440
  @type vg_name: str
441
  @param vg_name: the volume group whose LVs we should list
442
  @rtype: dict
443
  @return:
444
      dictionary of all partions (key) with value being a tuple of
445
      their size (in MiB), inactive and online status::
446

447
        {'test1': ('20.06', True, True)}
448

449
      in case of errors, a string is returned with the error
450
      details.
451

452
  """
453
  lvs = {}
454
  sep = '|'
455
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
456
                         "--separator=%s" % sep,
457
                         "-olv_name,lv_size,lv_attr", vg_name])
458
  if result.failed:
459
    logging.error("Failed to list logical volumes, lvs output: %s",
460
                  result.output)
461
    return result.output
462

    
463
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
464
  for line in result.stdout.splitlines():
465
    line = line.strip()
466
    match = valid_line_re.match(line)
467
    if not match:
468
      logging.error("Invalid line returned from lvs output: '%s'", line)
469
      continue
470
    name, size, attr = match.groups()
471
    inactive = attr[4] == '-'
472
    online = attr[5] == 'o'
473
    lvs[name] = (size, inactive, online)
474

    
475
  return lvs
476

    
477

    
478
def ListVolumeGroups():
479
  """List the volume groups and their size.
480

481
  @rtype: dict
482
  @return: dictionary with keys volume name and values the
483
      size of the volume
484

485
  """
486
  return utils.ListVolumeGroups()
487

    
488

    
489
def NodeVolumes():
490
  """List all volumes on this node.
491

492
  @rtype: list
493
  @return:
494
    A list of dictionaries, each having four keys:
495
      - name: the logical volume name,
496
      - size: the size of the logical volume
497
      - dev: the physical device on which the LV lives
498
      - vg: the volume group to which it belongs
499

500
    In case of errors, we return an empty list and log the
501
    error.
502

503
    Note that since a logical volume can live on multiple physical
504
    volumes, the resulting list might include a logical volume
505
    multiple times.
506

507
  """
508
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
509
                         "--separator=|",
510
                         "--options=lv_name,lv_size,devices,vg_name"])
511
  if result.failed:
512
    logging.error("Failed to list logical volumes, lvs output: %s",
513
                  result.output)
514
    return []
515

    
516
  def parse_dev(dev):
517
    if '(' in dev:
518
      return dev.split('(')[0]
519
    else:
520
      return dev
521

    
522
  def map_line(line):
523
    return {
524
      'name': line[0].strip(),
525
      'size': line[1].strip(),
526
      'dev': parse_dev(line[2].strip()),
527
      'vg': line[3].strip(),
528
    }
529

    
530
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
531
          if line.count('|') >= 3]
532

    
533

    
534
def BridgesExist(bridges_list):
535
  """Check if a list of bridges exist on the current node.
536

537
  @rtype: boolean
538
  @return: C{True} if all of them exist, C{False} otherwise
539

540
  """
541
  for bridge in bridges_list:
542
    if not utils.BridgeExists(bridge):
543
      return False
544

    
545
  return True
546

    
547

    
548
def GetInstanceList(hypervisor_list):
549
  """Provides a list of instances.
550

551
  @type hypervisor_list: list
552
  @param hypervisor_list: the list of hypervisors to query information
553

554
  @rtype: list
555
  @return: a list of all running instances on the current node
556
    - instance1.example.com
557
    - instance2.example.com
558

559
  """
560
  results = []
561
  for hname in hypervisor_list:
562
    try:
563
      names = hypervisor.GetHypervisor(hname).ListInstances()
564
      results.extend(names)
565
    except errors.HypervisorError, err:
566
      logging.exception("Error enumerating instances for hypevisor %s", hname)
567
      # FIXME: should we somehow not propagate this to the master?
568
      raise
569

    
570
  return results
571

    
572

    
573
def GetInstanceInfo(instance, hname):
574
  """Gives back the informations about an instance as a dictionary.
575

576
  @type instance: string
577
  @param instance: the instance name
578
  @type hname: string
579
  @param hname: the hypervisor type of the instance
580

581
  @rtype: dict
582
  @return: dictionary with the following keys:
583
      - memory: memory size of instance (int)
584
      - state: xen state of instance (string)
585
      - time: cpu time of instance (float)
586

587
  """
588
  output = {}
589

    
590
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
591
  if iinfo is not None:
592
    output['memory'] = iinfo[2]
593
    output['state'] = iinfo[4]
594
    output['time'] = iinfo[5]
595

    
596
  return output
597

    
598

    
599
def GetAllInstancesInfo(hypervisor_list):
600
  """Gather data about all instances.
601

602
  This is the equivalent of L{GetInstanceInfo}, except that it
603
  computes data for all instances at once, thus being faster if one
604
  needs data about more than one instance.
605

606
  @type hypervisor_list: list
607
  @param hypervisor_list: list of hypervisors to query for instance data
608

609
  @rtype: dict
610
  @return: dictionary of instance: data, with data having the following keys:
611
      - memory: memory size of instance (int)
612
      - state: xen state of instance (string)
613
      - time: cpu time of instance (float)
614
      - vcpus: the number of vcpus
615

616
  """
617
  output = {}
618

    
619
  for hname in hypervisor_list:
620
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
621
    if iinfo:
622
      for name, inst_id, memory, vcpus, state, times in iinfo:
623
        value = {
624
          'memory': memory,
625
          'vcpus': vcpus,
626
          'state': state,
627
          'time': times,
628
          }
629
        if name in output and output[name] != value:
630
          raise errors.HypervisorError("Instance %s running duplicate"
631
                                       " with different parameters" % name)
632
        output[name] = value
633

    
634
  return output
635

    
636

    
637
def AddOSToInstance(instance):
638
  """Add an OS to an instance.
639

640
  @type instance: L{objects.Instance}
641
  @param instance: Instance whose OS is to be installed
642
  @rtype: boolean
643
  @return: the success of the operation
644

645
  """
646
  inst_os = OSFromDisk(instance.os)
647

    
648
  create_env = OSEnvironment(instance)
649

    
650
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
651
                                     instance.name, int(time.time()))
652

    
653
  result = utils.RunCmd([inst_os.create_script], env=create_env,
654
                        cwd=inst_os.path, output=logfile,)
655
  if result.failed:
656
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
657
                  " output: %s", result.cmd, result.fail_reason, logfile,
658
                  result.output)
659
    return False
660

    
661
  return True
662

    
663

    
664
def RunRenameInstance(instance, old_name):
665
  """Run the OS rename script for an instance.
666

667
  @type instance: L{objects.Instance}
668
  @param instance: Instance whose OS is to be installed
669
  @type old_name: string
670
  @param old_name: previous instance name
671
  @rtype: boolean
672
  @return: the success of the operation
673

674
  """
675
  inst_os = OSFromDisk(instance.os)
676

    
677
  rename_env = OSEnvironment(instance)
678
  rename_env['OLD_INSTANCE_NAME'] = old_name
679

    
680
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
681
                                           old_name,
682
                                           instance.name, int(time.time()))
683

    
684
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
685
                        cwd=inst_os.path, output=logfile)
686

    
687
  if result.failed:
688
    logging.error("os create command '%s' returned error: %s output: %s",
689
                  result.cmd, result.fail_reason, result.output)
690
    return False
691

    
692
  return True
693

    
694

    
695
def _GetVGInfo(vg_name):
696
  """Get informations about the volume group.
697

698
  @type vg_name: str
699
  @param vg_name: the volume group which we query
700
  @rtype: dict
701
  @return:
702
    A dictionary with the following keys:
703
      - C{vg_size} is the total size of the volume group in MiB
704
      - C{vg_free} is the free size of the volume group in MiB
705
      - C{pv_count} are the number of physical disks in that VG
706

707
    If an error occurs during gathering of data, we return the same dict
708
    with keys all set to None.
709

710
  """
711
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
712

    
713
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
714
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
715

    
716
  if retval.failed:
717
    logging.error("volume group %s not present", vg_name)
718
    return retdic
719
  valarr = retval.stdout.strip().rstrip(':').split(':')
720
  if len(valarr) == 3:
721
    try:
722
      retdic = {
723
        "vg_size": int(round(float(valarr[0]), 0)),
724
        "vg_free": int(round(float(valarr[1]), 0)),
725
        "pv_count": int(valarr[2]),
726
        }
727
    except ValueError, err:
728
      logging.exception("Fail to parse vgs output")
729
  else:
730
    logging.error("vgs output has the wrong number of fields (expected"
731
                  " three): %s", str(valarr))
732
  return retdic
733

    
734

    
735
def _GatherBlockDevs(instance):
736
  """Set up an instance's block device(s).
737

738
  This is run on the primary node at instance startup. The block
739
  devices must be already assembled.
740

741
  @type instance: L{objects.Instance}
742
  @param instance: the instance whose disks we shoul assemble
743
  @rtype: list of L{bdev.BlockDev}
744
  @return: list of the block devices
745

746
  """
747
  block_devices = []
748
  for disk in instance.disks:
749
    device = _RecursiveFindBD(disk)
750
    if device is None:
751
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
752
                                    str(disk))
753
    device.Open()
754
    block_devices.append((disk, device))
755
  return block_devices
756

    
757

    
758
def StartInstance(instance, extra_args):
759
  """Start an instance.
760

761
  @type instance: L{objects.Instance}
762
  @param instance: the instance object
763
  @rtype: boolean
764
  @return: whether the startup was successful or not
765

766
  """
767
  running_instances = GetInstanceList([instance.hypervisor])
768

    
769
  if instance.name in running_instances:
770
    return True
771

    
772
  block_devices = _GatherBlockDevs(instance)
773
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
774

    
775
  try:
776
    hyper.StartInstance(instance, block_devices, extra_args)
777
  except errors.HypervisorError, err:
778
    logging.exception("Failed to start instance")
779
    return False
780

    
781
  return True
782

    
783

    
784
def ShutdownInstance(instance):
785
  """Shut an instance down.
786

787
  @note: this functions uses polling with a hardcoded timeout.
788

789
  @type instance: L{objects.Instance}
790
  @param instance: the instance object
791
  @rtype: boolean
792
  @return: whether the startup was successful or not
793

794
  """
795
  hv_name = instance.hypervisor
796
  running_instances = GetInstanceList([hv_name])
797

    
798
  if instance.name not in running_instances:
799
    return True
800

    
801
  hyper = hypervisor.GetHypervisor(hv_name)
802
  try:
803
    hyper.StopInstance(instance)
804
  except errors.HypervisorError, err:
805
    logging.error("Failed to stop instance")
806
    return False
807

    
808
  # test every 10secs for 2min
809
  shutdown_ok = False
810

    
811
  time.sleep(1)
812
  for dummy in range(11):
813
    if instance.name not in GetInstanceList([hv_name]):
814
      break
815
    time.sleep(10)
816
  else:
817
    # the shutdown did not succeed
818
    logging.error("shutdown of '%s' unsuccessful, using destroy", instance)
819

    
820
    try:
821
      hyper.StopInstance(instance, force=True)
822
    except errors.HypervisorError, err:
823
      logging.exception("Failed to stop instance")
824
      return False
825

    
826
    time.sleep(1)
827
    if instance.name in GetInstanceList([hv_name]):
828
      logging.error("could not shutdown instance '%s' even by destroy",
829
                    instance.name)
830
      return False
831

    
832
  return True
833

    
834

    
835
def RebootInstance(instance, reboot_type, extra_args):
836
  """Reboot an instance.
837

838
  @type instance: L{objects.Instance}
839
  @param instance: the instance object to reboot
840
  @type reboot_type: str
841
  @param reboot_type: the type of reboot, one the following
842
    constants:
843
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
844
        instance OS, do not recreate the VM
845
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
846
        restart the VM (at the hypervisor level)
847
      - the other reboot type (L{constants.INSTANCE_REBOOT_HARD})
848
        is not accepted here, since that mode is handled
849
        differently
850
  @rtype: boolean
851
  @return: the success of the operation
852

853
  """
854
  running_instances = GetInstanceList([instance.hypervisor])
855

    
856
  if instance.name not in running_instances:
857
    logging.error("Cannot reboot instance that is not running")
858
    return False
859

    
860
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
861
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
862
    try:
863
      hyper.RebootInstance(instance)
864
    except errors.HypervisorError, err:
865
      logging.exception("Failed to soft reboot instance")
866
      return False
867
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
868
    try:
869
      ShutdownInstance(instance)
870
      StartInstance(instance, extra_args)
871
    except errors.HypervisorError, err:
872
      logging.exception("Failed to hard reboot instance")
873
      return False
874
  else:
875
    raise errors.ParameterError("reboot_type invalid")
876

    
877
  return True
878

    
879

    
880
def MigrateInstance(instance, target, live):
881
  """Migrates an instance to another node.
882

883
  @type instance: L{objects.Instance}
884
  @param instance: the instance definition
885
  @type target: string
886
  @param target: the target node name
887
  @type live: boolean
888
  @param live: whether the migration should be done live or not (the
889
      interpretation of this parameter is left to the hypervisor)
890
  @rtype: tuple
891
  @return: a tuple of (success, msg) where:
892
      - succes is a boolean denoting the success/failure of the operation
893
      - msg is a string with details in case of failure
894

895
  """
896
  hyper = hypervisor.GetHypervisor(instance.hypervisor_name)
897

    
898
  try:
899
    hyper.MigrateInstance(instance.name, target, live)
900
  except errors.HypervisorError, err:
901
    msg = "Failed to migrate instance: %s" % str(err)
902
    logging.error(msg)
903
    return (False, msg)
904
  return (True, "Migration successfull")
905

    
906

    
907
def CreateBlockDevice(disk, size, owner, on_primary, info):
908
  """Creates a block device for an instance.
909

910
  @type disk: L{objects.Disk}
911
  @param disk: the object describing the disk we should create
912
  @type size: int
913
  @param size: the size of the physical underlying device, in MiB
914
  @type owner: str
915
  @param owner: the name of the instance for which disk is created,
916
      used for device cache data
917
  @type on_primary: boolean
918
  @param on_primary:  indicates if it is the primary node or not
919
  @type info: string
920
  @param info: string that will be sent to the physical device
921
      creation, used for example to set (LVM) tags on LVs
922

923
  @return: the new unique_id of the device (this can sometime be
924
      computed only after creation), or None. On secondary nodes,
925
      it's not required to return anything.
926

927
  """
928
  clist = []
929
  if disk.children:
930
    for child in disk.children:
931
      crdev = _RecursiveAssembleBD(child, owner, on_primary)
932
      if on_primary or disk.AssembleOnSecondary():
933
        # we need the children open in case the device itself has to
934
        # be assembled
935
        crdev.Open()
936
      clist.append(crdev)
937
  try:
938
    device = bdev.FindDevice(disk.dev_type, disk.physical_id, clist)
939
    if device is not None:
940
      logging.info("removing existing device %s", disk)
941
      device.Remove()
942
  except errors.BlockDeviceError, err:
943
    pass
944

    
945
  device = bdev.Create(disk.dev_type, disk.physical_id,
946
                       clist, size)
947
  if device is None:
948
    raise ValueError("Can't create child device for %s, %s" %
949
                     (disk, size))
950
  if on_primary or disk.AssembleOnSecondary():
951
    if not device.Assemble():
952
      errorstring = "Can't assemble device after creation"
953
      logging.error(errorstring)
954
      raise errors.BlockDeviceError("%s, very unusual event - check the node"
955
                                    " daemon logs" % errorstring)
956
    device.SetSyncSpeed(constants.SYNC_SPEED)
957
    if on_primary or disk.OpenOnSecondary():
958
      device.Open(force=True)
959
    DevCacheManager.UpdateCache(device.dev_path, owner,
960
                                on_primary, disk.iv_name)
961

    
962
  device.SetInfo(info)
963

    
964
  physical_id = device.unique_id
965
  return physical_id
966

    
967

    
968
def RemoveBlockDevice(disk):
969
  """Remove a block device.
970

971
  @note: This is intended to be called recursively.
972

973
  @type disk: L{objects.disk}
974
  @param disk: the disk object we should remove
975
  @rtype: boolean
976
  @return: the success of the operation
977

978
  """
979
  try:
980
    # since we are removing the device, allow a partial match
981
    # this allows removal of broken mirrors
982
    rdev = _RecursiveFindBD(disk, allow_partial=True)
983
  except errors.BlockDeviceError, err:
984
    # probably can't attach
985
    logging.info("Can't attach to device %s in remove", disk)
986
    rdev = None
987
  if rdev is not None:
988
    r_path = rdev.dev_path
989
    result = rdev.Remove()
990
    if result:
991
      DevCacheManager.RemoveCache(r_path)
992
  else:
993
    result = True
994
  if disk.children:
995
    for child in disk.children:
996
      result = result and RemoveBlockDevice(child)
997
  return result
998

    
999

    
1000
def _RecursiveAssembleBD(disk, owner, as_primary):
1001
  """Activate a block device for an instance.
1002

1003
  This is run on the primary and secondary nodes for an instance.
1004

1005
  @note: this function is called recursively.
1006

1007
  @type disk: L{objects.Disk}
1008
  @param disk: the disk we try to assemble
1009
  @type owner: str
1010
  @param owner: the name of the instance which owns the disk
1011
  @type as_primary: boolean
1012
  @param as_primary: if we should make the block device
1013
      read/write
1014

1015
  @return: the assembled device or None (in case no device
1016
      was assembled)
1017
  @raise errors.BlockDeviceError: in case there is an error
1018
      during the activation of the children or the device
1019
      itself
1020

1021
  """
1022
  children = []
1023
  if disk.children:
1024
    mcn = disk.ChildrenNeeded()
1025
    if mcn == -1:
1026
      mcn = 0 # max number of Nones allowed
1027
    else:
1028
      mcn = len(disk.children) - mcn # max number of Nones
1029
    for chld_disk in disk.children:
1030
      try:
1031
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1032
      except errors.BlockDeviceError, err:
1033
        if children.count(None) >= mcn:
1034
          raise
1035
        cdev = None
1036
        logging.debug("Error in child activation: %s", str(err))
1037
      children.append(cdev)
1038

    
1039
  if as_primary or disk.AssembleOnSecondary():
1040
    r_dev = bdev.AttachOrAssemble(disk.dev_type, disk.physical_id, children)
1041
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1042
    result = r_dev
1043
    if as_primary or disk.OpenOnSecondary():
1044
      r_dev.Open()
1045
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1046
                                as_primary, disk.iv_name)
1047

    
1048
  else:
1049
    result = True
1050
  return result
1051

    
1052

    
1053
def AssembleBlockDevice(disk, owner, as_primary):
1054
  """Activate a block device for an instance.
1055

1056
  This is a wrapper over _RecursiveAssembleBD.
1057

1058
  @rtype: str or boolean
1059
  @return: a C{/dev/...} path for primary nodes, and
1060
      C{True} for secondary nodes
1061

1062
  """
1063
  result = _RecursiveAssembleBD(disk, owner, as_primary)
1064
  if isinstance(result, bdev.BlockDev):
1065
    result = result.dev_path
1066
  return result
1067

    
1068

    
1069
def ShutdownBlockDevice(disk):
1070
  """Shut down a block device.
1071

1072
  First, if the device is assembled (can L{Attach()}), then the device
1073
  is shutdown. Then the children of the device are shutdown.
1074

1075
  This function is called recursively. Note that we don't cache the
1076
  children or such, as oppossed to assemble, shutdown of different
1077
  devices doesn't require that the upper device was active.
1078

1079
  @type disk: L{objects.Disk}
1080
  @param disk: the description of the disk we should
1081
      shutdown
1082
  @rtype: boolean
1083
  @return: the success of the operation
1084

1085
  """
1086
  r_dev = _RecursiveFindBD(disk)
1087
  if r_dev is not None:
1088
    r_path = r_dev.dev_path
1089
    result = r_dev.Shutdown()
1090
    if result:
1091
      DevCacheManager.RemoveCache(r_path)
1092
  else:
1093
    result = True
1094
  if disk.children:
1095
    for child in disk.children:
1096
      result = result and ShutdownBlockDevice(child)
1097
  return result
1098

    
1099

    
1100
def MirrorAddChildren(parent_cdev, new_cdevs):
1101
  """Extend a mirrored block device.
1102

1103
  @type parent_cdev: L{objects.Disk}
1104
  @param parent_cdev: the disk to which we should add children
1105
  @type new_cdevs: list of L{objects.Disk}
1106
  @param new_cdevs: the list of children which we should add
1107
  @rtype: boolean
1108
  @return: the success of the operation
1109

1110
  """
1111
  parent_bdev = _RecursiveFindBD(parent_cdev, allow_partial=True)
1112
  if parent_bdev is None:
1113
    logging.error("Can't find parent device")
1114
    return False
1115
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1116
  if new_bdevs.count(None) > 0:
1117
    logging.error("Can't find new device(s) to add: %s:%s",
1118
                  new_bdevs, new_cdevs)
1119
    return False
1120
  parent_bdev.AddChildren(new_bdevs)
1121
  return True
1122

    
1123

    
1124
def MirrorRemoveChildren(parent_cdev, new_cdevs):
1125
  """Shrink a mirrored block device.
1126

1127
  @type parent_cdev: L{objects.Disk}
1128
  @param parent_cdev: the disk from which we should remove children
1129
  @type new_cdevs: list of L{objects.Disk}
1130
  @param new_cdevs: the list of children which we should remove
1131
  @rtype: boolean
1132
  @return: the success of the operation
1133

1134
  """
1135
  parent_bdev = _RecursiveFindBD(parent_cdev)
1136
  if parent_bdev is None:
1137
    logging.error("Can't find parent in remove children: %s", parent_cdev)
1138
    return False
1139
  devs = []
1140
  for disk in new_cdevs:
1141
    rpath = disk.StaticDevPath()
1142
    if rpath is None:
1143
      bd = _RecursiveFindBD(disk)
1144
      if bd is None:
1145
        logging.error("Can't find dynamic device %s while removing children",
1146
                      disk)
1147
        return False
1148
      else:
1149
        devs.append(bd.dev_path)
1150
    else:
1151
      devs.append(rpath)
1152
  parent_bdev.RemoveChildren(devs)
1153
  return True
1154

    
1155

    
1156
def GetMirrorStatus(disks):
1157
  """Get the mirroring status of a list of devices.
1158

1159
  @type disks: list of L{objects.Disk}
1160
  @param disks: the list of disks which we should query
1161
  @rtype: disk
1162
  @return:
1163
      a list of (mirror_done, estimated_time) tuples, which
1164
      are the result of L{bdev.BlockDevice.CombinedSyncStatus}
1165
  @raise errors.BlockDeviceError: if any of the disks cannot be
1166
      found
1167

1168
  """
1169
  stats = []
1170
  for dsk in disks:
1171
    rbd = _RecursiveFindBD(dsk)
1172
    if rbd is None:
1173
      raise errors.BlockDeviceError("Can't find device %s" % str(dsk))
1174
    stats.append(rbd.CombinedSyncStatus())
1175
  return stats
1176

    
1177

    
1178
def _RecursiveFindBD(disk, allow_partial=False):
1179
  """Check if a device is activated.
1180

1181
  If so, return informations about the real device.
1182

1183
  @type disk: L{objects.Disk}
1184
  @param disk: the disk object we need to find
1185
  @type allow_partial: boolean
1186
  @param allow_partial: if true, don't abort the find if a
1187
      child of the device can't be found; this is intended
1188
      to be used when repairing mirrors
1189

1190
  @return: None if the device can't be found,
1191
      otherwise the device instance
1192

1193
  """
1194
  children = []
1195
  if disk.children:
1196
    for chdisk in disk.children:
1197
      children.append(_RecursiveFindBD(chdisk))
1198

    
1199
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children)
1200

    
1201

    
1202
def FindBlockDevice(disk):
1203
  """Check if a device is activated.
1204

1205
  If it is, return informations about the real device.
1206

1207
  @type disk: L{objects.Disk}
1208
  @param disk: the disk to find
1209
  @rtype: None or tuple
1210
  @return: None if the disk cannot be found, otherwise a
1211
      tuple (device_path, major, minor, sync_percent,
1212
      estimated_time, is_degraded)
1213

1214
  """
1215
  rbd = _RecursiveFindBD(disk)
1216
  if rbd is None:
1217
    return rbd
1218
  return (rbd.dev_path, rbd.major, rbd.minor) + rbd.GetSyncStatus()
1219

    
1220

    
1221
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1222
  """Write a file to the filesystem.
1223

1224
  This allows the master to overwrite(!) a file. It will only perform
1225
  the operation if the file belongs to a list of configuration files.
1226

1227
  @type file_name: str
1228
  @param file_name: the target file name
1229
  @type data: str
1230
  @param data: the new contents of the file
1231
  @type mode: int
1232
  @param mode: the mode to give the file (can be None)
1233
  @type uid: int
1234
  @param uid: the owner of the file (can be -1 for default)
1235
  @type gid: int
1236
  @param gid: the group of the file (can be -1 for default)
1237
  @type atime: float
1238
  @param atime: the atime to set on the file (can be None)
1239
  @type mtime: float
1240
  @param mtime: the mtime to set on the file (can be None)
1241
  @rtype: boolean
1242
  @return: the success of the operation; errors are logged
1243
      in the node daemon log
1244

1245
  """
1246
  if not os.path.isabs(file_name):
1247
    logging.error("Filename passed to UploadFile is not absolute: '%s'",
1248
                  file_name)
1249
    return False
1250

    
1251
  allowed_files = [
1252
    constants.CLUSTER_CONF_FILE,
1253
    constants.ETC_HOSTS,
1254
    constants.SSH_KNOWN_HOSTS_FILE,
1255
    constants.VNC_PASSWORD_FILE,
1256
    ]
1257

    
1258
  if file_name not in allowed_files:
1259
    logging.error("Filename passed to UploadFile not in allowed"
1260
                 " upload targets: '%s'", file_name)
1261
    return False
1262

    
1263
  raw_data = _Decompress(data)
1264

    
1265
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1266
                  atime=atime, mtime=mtime)
1267
  return True
1268

    
1269

    
1270
def WriteSsconfFiles(values):
1271
  """Update all ssconf files.
1272

1273
  Wrapper around the SimpleStore.WriteFiles.
1274

1275
  """
1276
  ssconf.SimpleStore().WriteFiles(values)
1277

    
1278

    
1279
def _ErrnoOrStr(err):
1280
  """Format an EnvironmentError exception.
1281

1282
  If the L{err} argument has an errno attribute, it will be looked up
1283
  and converted into a textual C{E...} description. Otherwise the
1284
  string representation of the error will be returned.
1285

1286
  @type err: L{EnvironmentError}
1287
  @param err: the exception to format
1288

1289
  """
1290
  if hasattr(err, 'errno'):
1291
    detail = errno.errorcode[err.errno]
1292
  else:
1293
    detail = str(err)
1294
  return detail
1295

    
1296

    
1297
def _OSOndiskVersion(name, os_dir):
1298
  """Compute and return the API version of a given OS.
1299

1300
  This function will try to read the API version of the OS given by
1301
  the 'name' parameter and residing in the 'os_dir' directory.
1302

1303
  @type name: str
1304
  @param name: the OS name we should look for
1305
  @type os_dir: str
1306
  @param os_dir: the directory inwhich we should look for the OS
1307
  @rtype: int or None
1308
  @return:
1309
      Either an integer denoting the version or None in the
1310
      case when this is not a valid OS name.
1311
  @raise errors.InvalidOS: if the OS cannot be found
1312

1313
  """
1314
  api_file = os.path.sep.join([os_dir, "ganeti_api_version"])
1315

    
1316
  try:
1317
    st = os.stat(api_file)
1318
  except EnvironmentError, err:
1319
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file not"
1320
                           " found (%s)" % _ErrnoOrStr(err))
1321

    
1322
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1323
    raise errors.InvalidOS(name, os_dir, "'ganeti_api_version' file is not"
1324
                           " a regular file")
1325

    
1326
  try:
1327
    f = open(api_file)
1328
    try:
1329
      api_versions = f.readlines()
1330
    finally:
1331
      f.close()
1332
  except EnvironmentError, err:
1333
    raise errors.InvalidOS(name, os_dir, "error while reading the"
1334
                           " API version (%s)" % _ErrnoOrStr(err))
1335

    
1336
  api_versions = [version.strip() for version in api_versions]
1337
  try:
1338
    api_versions = [int(version) for version in api_versions]
1339
  except (TypeError, ValueError), err:
1340
    raise errors.InvalidOS(name, os_dir,
1341
                           "API version is not integer (%s)" % str(err))
1342

    
1343
  return api_versions
1344

    
1345

    
1346
def DiagnoseOS(top_dirs=None):
1347
  """Compute the validity for all OSes.
1348

1349
  @type top_dirs: list
1350
  @param top_dirs: the list of directories in which to
1351
      search (if not given defaults to
1352
      L{constants.OS_SEARCH_PATH})
1353
  @rtype: list of L{objects.OS}
1354
  @return: an OS object for each name in all the given
1355
      directories
1356

1357
  """
1358
  if top_dirs is None:
1359
    top_dirs = constants.OS_SEARCH_PATH
1360

    
1361
  result = []
1362
  for dir_name in top_dirs:
1363
    if os.path.isdir(dir_name):
1364
      try:
1365
        f_names = utils.ListVisibleFiles(dir_name)
1366
      except EnvironmentError, err:
1367
        logging.exception("Can't list the OS directory %s", dir_name)
1368
        break
1369
      for name in f_names:
1370
        try:
1371
          os_inst = OSFromDisk(name, base_dir=dir_name)
1372
          result.append(os_inst)
1373
        except errors.InvalidOS, err:
1374
          result.append(objects.OS.FromInvalidOS(err))
1375

    
1376
  return result
1377

    
1378

    
1379
def OSFromDisk(name, base_dir=None):
1380
  """Create an OS instance from disk.
1381

1382
  This function will return an OS instance if the given name is a
1383
  valid OS name. Otherwise, it will raise an appropriate
1384
  L{errors.InvalidOS} exception, detailing why this is not a valid OS.
1385

1386
  @type base_dir: string
1387
  @keyword base_dir: Base directory containing OS installations.
1388
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1389
  @rtype: L{objects.OS}
1390
  @return: the OS instance if we find a valid one
1391
  @raise errors.InvalidOS: if we don't find a valid OS
1392

1393
  """
1394
  if base_dir is None:
1395
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1396
    if os_dir is None:
1397
      raise errors.InvalidOS(name, None, "OS dir not found in search path")
1398
  else:
1399
    os_dir = os.path.sep.join([base_dir, name])
1400

    
1401
  api_versions = _OSOndiskVersion(name, os_dir)
1402

    
1403
  if constants.OS_API_VERSION not in api_versions:
1404
    raise errors.InvalidOS(name, os_dir, "API version mismatch"
1405
                           " (found %s want %s)"
1406
                           % (api_versions, constants.OS_API_VERSION))
1407

    
1408
  # OS Scripts dictionary, we will populate it with the actual script names
1409
  os_scripts = dict.fromkeys(constants.OS_SCRIPTS)
1410

    
1411
  for script in os_scripts:
1412
    os_scripts[script] = os.path.sep.join([os_dir, script])
1413

    
1414
    try:
1415
      st = os.stat(os_scripts[script])
1416
    except EnvironmentError, err:
1417
      raise errors.InvalidOS(name, os_dir, "'%s' script missing (%s)" %
1418
                             (script, _ErrnoOrStr(err)))
1419

    
1420
    if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1421
      raise errors.InvalidOS(name, os_dir, "'%s' script not executable" %
1422
                             script)
1423

    
1424
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1425
      raise errors.InvalidOS(name, os_dir, "'%s' is not a regular file" %
1426
                             script)
1427

    
1428

    
1429
  return objects.OS(name=name, path=os_dir, status=constants.OS_VALID_STATUS,
1430
                    create_script=os_scripts[constants.OS_SCRIPT_CREATE],
1431
                    export_script=os_scripts[constants.OS_SCRIPT_EXPORT],
1432
                    import_script=os_scripts[constants.OS_SCRIPT_IMPORT],
1433
                    rename_script=os_scripts[constants.OS_SCRIPT_RENAME],
1434
                    api_versions=api_versions)
1435

    
1436
def OSEnvironment(instance, debug=0):
1437
  """Calculate the environment for an os script.
1438

1439
  @type instance: L{objects.Instance}
1440
  @param instance: target instance for the os script run
1441
  @type debug: integer
1442
  @param debug: debug level (0 or 1, for OS Api 10)
1443
  @rtype: dict
1444
  @return: dict of environment variables
1445
  @raise errors.BlockDeviceError: if the block device
1446
      cannot be found
1447

1448
  """
1449
  result = {}
1450
  result['OS_API_VERSION'] = '%d' % constants.OS_API_VERSION
1451
  result['INSTANCE_NAME'] = instance.name
1452
  result['HYPERVISOR'] = instance.hypervisor
1453
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1454
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1455
  result['DEBUG_LEVEL'] = '%d' % debug
1456
  for idx, disk in enumerate(instance.disks):
1457
    real_disk = _RecursiveFindBD(disk)
1458
    if real_disk is None:
1459
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1460
                                    str(disk))
1461
    real_disk.Open()
1462
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1463
    # FIXME: When disks will have read-only mode, populate this
1464
    result['DISK_%d_ACCESS' % idx] = 'W'
1465
    if constants.HV_DISK_TYPE in instance.hvparams:
1466
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1467
        instance.hvparams[constants.HV_DISK_TYPE]
1468
    if disk.dev_type in constants.LDS_BLOCK:
1469
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1470
    elif disk.dev_type == constants.LD_FILE:
1471
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1472
        'file:%s' % disk.physical_id[0]
1473
  for idx, nic in enumerate(instance.nics):
1474
    result['NIC_%d_MAC' % idx] = nic.mac
1475
    if nic.ip:
1476
      result['NIC_%d_IP' % idx] = nic.ip
1477
    result['NIC_%d_BRIDGE' % idx] = nic.bridge
1478
    if constants.HV_NIC_TYPE in instance.hvparams:
1479
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1480
        instance.hvparams[constants.HV_NIC_TYPE]
1481

    
1482
  return result
1483

    
1484
def GrowBlockDevice(disk, amount):
1485
  """Grow a stack of block devices.
1486

1487
  This function is called recursively, with the childrens being the
1488
  first ones to resize.
1489

1490
  @type disk: L{objects.Disk}
1491
  @param disk: the disk to be grown
1492
  @rtype: (status, result)
1493
  @return: a tuple with the status of the operation
1494
      (True/False), and the errors message if status
1495
      is False
1496

1497
  """
1498
  r_dev = _RecursiveFindBD(disk)
1499
  if r_dev is None:
1500
    return False, "Cannot find block device %s" % (disk,)
1501

    
1502
  try:
1503
    r_dev.Grow(amount)
1504
  except errors.BlockDeviceError, err:
1505
    return False, str(err)
1506

    
1507
  return True, None
1508

    
1509

    
1510
def SnapshotBlockDevice(disk):
1511
  """Create a snapshot copy of a block device.
1512

1513
  This function is called recursively, and the snapshot is actually created
1514
  just for the leaf lvm backend device.
1515

1516
  @type disk: L{objects.Disk}
1517
  @param disk: the disk to be snapshotted
1518
  @rtype: string
1519
  @return: snapshot disk path
1520

1521
  """
1522
  if disk.children:
1523
    if len(disk.children) == 1:
1524
      # only one child, let's recurse on it
1525
      return SnapshotBlockDevice(disk.children[0])
1526
    else:
1527
      # more than one child, choose one that matches
1528
      for child in disk.children:
1529
        if child.size == disk.size:
1530
          # return implies breaking the loop
1531
          return SnapshotBlockDevice(child)
1532
  elif disk.dev_type == constants.LD_LV:
1533
    r_dev = _RecursiveFindBD(disk)
1534
    if r_dev is not None:
1535
      # let's stay on the safe side and ask for the full size, for now
1536
      return r_dev.Snapshot(disk.size)
1537
    else:
1538
      return None
1539
  else:
1540
    raise errors.ProgrammerError("Cannot snapshot non-lvm block device"
1541
                                 " '%s' of type '%s'" %
1542
                                 (disk.unique_id, disk.dev_type))
1543

    
1544

    
1545
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1546
  """Export a block device snapshot to a remote node.
1547

1548
  @type disk: L{objects.Disk}
1549
  @param disk: the description of the disk to export
1550
  @type dest_node: str
1551
  @param dest_node: the destination node to export to
1552
  @type instance: L{objects.Instance}
1553
  @param instance: the instance object to whom the disk belongs
1554
  @type cluster_name: str
1555
  @param cluster_name: the cluster name, needed for SSH hostalias
1556
  @type idx: int
1557
  @param idx: the index of the disk in the instance's disk list,
1558
      used to export to the OS scripts environment
1559
  @rtype: boolean
1560
  @return: the success of the operation
1561

1562
  """
1563
  export_env = OSEnvironment(instance)
1564

    
1565
  inst_os = OSFromDisk(instance.os)
1566
  export_script = inst_os.export_script
1567

    
1568
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1569
                                     instance.name, int(time.time()))
1570
  if not os.path.exists(constants.LOG_OS_DIR):
1571
    os.mkdir(constants.LOG_OS_DIR, 0750)
1572
  real_disk = _RecursiveFindBD(disk)
1573
  if real_disk is None:
1574
    raise errors.BlockDeviceError("Block device '%s' is not set up" %
1575
                                  str(disk))
1576
  real_disk.Open()
1577

    
1578
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1579
  export_env['EXPORT_INDEX'] = str(idx)
1580

    
1581
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1582
  destfile = disk.physical_id[1]
1583

    
1584
  # the target command is built out of three individual commands,
1585
  # which are joined by pipes; we check each individual command for
1586
  # valid parameters
1587
  expcmd = utils.BuildShellCmd("cd %s; %s 2>%s", inst_os.path,
1588
                               export_script, logfile)
1589

    
1590
  comprcmd = "gzip"
1591

    
1592
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1593
                                destdir, destdir, destfile)
1594
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1595
                                                   constants.GANETI_RUNAS,
1596
                                                   destcmd)
1597

    
1598
  # all commands have been checked, so we're safe to combine them
1599
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1600

    
1601
  result = utils.RunCmd(command, env=export_env)
1602

    
1603
  if result.failed:
1604
    logging.error("os snapshot export command '%s' returned error: %s"
1605
                  " output: %s", command, result.fail_reason, result.output)
1606
    return False
1607

    
1608
  return True
1609

    
1610

    
1611
def FinalizeExport(instance, snap_disks):
1612
  """Write out the export configuration information.
1613

1614
  @type instance: L{objects.Instance}
1615
  @param instance: the instance which we export, used for
1616
      saving configuration
1617
  @type snap_disks: list of L{objects.Disk}
1618
  @param snap_disks: list of snapshot block devices, which
1619
      will be used to get the actual name of the dump file
1620

1621
  @rtype: boolean
1622
  @return: the success of the operation
1623

1624
  """
1625
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1626
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1627

    
1628
  config = objects.SerializableConfigParser()
1629

    
1630
  config.add_section(constants.INISECT_EXP)
1631
  config.set(constants.INISECT_EXP, 'version', '0')
1632
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1633
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1634
  config.set(constants.INISECT_EXP, 'os', instance.os)
1635
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1636

    
1637
  config.add_section(constants.INISECT_INS)
1638
  config.set(constants.INISECT_INS, 'name', instance.name)
1639
  config.set(constants.INISECT_INS, 'memory', '%d' %
1640
             instance.beparams[constants.BE_MEMORY])
1641
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
1642
             instance.beparams[constants.BE_VCPUS])
1643
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
1644

    
1645
  nic_count = 0
1646
  for nic_count, nic in enumerate(instance.nics):
1647
    config.set(constants.INISECT_INS, 'nic%d_mac' %
1648
               nic_count, '%s' % nic.mac)
1649
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
1650
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
1651
               '%s' % nic.bridge)
1652
  # TODO: redundant: on load can read nics until it doesn't exist
1653
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_count)
1654

    
1655
  disk_total = 0
1656
  for disk_count, disk in enumerate(snap_disks):
1657
    if disk:
1658
      disk_total += 1
1659
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
1660
                 ('%s' % disk.iv_name))
1661
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
1662
                 ('%s' % disk.physical_id[1]))
1663
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
1664
                 ('%d' % disk.size))
1665

    
1666
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
1667

    
1668
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
1669
                  data=config.Dumps())
1670
  shutil.rmtree(finaldestdir, True)
1671
  shutil.move(destdir, finaldestdir)
1672

    
1673
  return True
1674

    
1675

    
1676
def ExportInfo(dest):
1677
  """Get export configuration information.
1678

1679
  @type dest: str
1680
  @param dest: directory containing the export
1681

1682
  @rtype: L{objects.SerializableConfigParser}
1683
  @return: a serializable config file containing the
1684
      export info
1685

1686
  """
1687
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
1688

    
1689
  config = objects.SerializableConfigParser()
1690
  config.read(cff)
1691

    
1692
  if (not config.has_section(constants.INISECT_EXP) or
1693
      not config.has_section(constants.INISECT_INS)):
1694
    return None
1695

    
1696
  return config
1697

    
1698

    
1699
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
1700
  """Import an os image into an instance.
1701

1702
  @type instance: L{objects.Instance}
1703
  @param instance: instance to import the disks into
1704
  @type src_node: string
1705
  @param src_node: source node for the disk images
1706
  @type src_images: list of string
1707
  @param src_images: absolute paths of the disk images
1708
  @rtype: list of boolean
1709
  @return: each boolean represent the success of importing the n-th disk
1710

1711
  """
1712
  import_env = OSEnvironment(instance)
1713
  inst_os = OSFromDisk(instance.os)
1714
  import_script = inst_os.import_script
1715

    
1716
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
1717
                                        instance.name, int(time.time()))
1718
  if not os.path.exists(constants.LOG_OS_DIR):
1719
    os.mkdir(constants.LOG_OS_DIR, 0750)
1720

    
1721
  comprcmd = "gunzip"
1722
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
1723
                               import_script, logfile)
1724

    
1725
  final_result = []
1726
  for idx, image in enumerate(src_images):
1727
    if image:
1728
      destcmd = utils.BuildShellCmd('cat %s', image)
1729
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
1730
                                                       constants.GANETI_RUNAS,
1731
                                                       destcmd)
1732
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
1733
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
1734
      import_env['IMPORT_INDEX'] = str(idx)
1735
      result = utils.RunCmd(command, env=import_env)
1736
      if result.failed:
1737
        logging.error("Disk import command '%s' returned error: %s"
1738
                      " output: %s", command, result.fail_reason,
1739
                      result.output)
1740
        final_result.append(False)
1741
      else:
1742
        final_result.append(True)
1743
    else:
1744
      final_result.append(True)
1745

    
1746
  return final_result
1747

    
1748

    
1749
def ListExports():
1750
  """Return a list of exports currently available on this machine.
1751

1752
  @rtype: list
1753
  @return: list of the exports
1754

1755
  """
1756
  if os.path.isdir(constants.EXPORT_DIR):
1757
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
1758
  else:
1759
    return []
1760

    
1761

    
1762
def RemoveExport(export):
1763
  """Remove an existing export from the node.
1764

1765
  @type export: str
1766
  @param export: the name of the export to remove
1767
  @rtype: boolean
1768
  @return: the success of the operation
1769

1770
  """
1771
  target = os.path.join(constants.EXPORT_DIR, export)
1772

    
1773
  shutil.rmtree(target)
1774
  # TODO: catch some of the relevant exceptions and provide a pretty
1775
  # error message if rmtree fails.
1776

    
1777
  return True
1778

    
1779

    
1780
def RenameBlockDevices(devlist):
1781
  """Rename a list of block devices.
1782

1783
  @type devlist: list of tuples
1784
  @param devlist: list of tuples of the form  (disk,
1785
      new_logical_id, new_physical_id); disk is an
1786
      L{objects.Disk} object describing the current disk,
1787
      and new logical_id/physical_id is the name we
1788
      rename it to
1789
  @rtype: boolean
1790
  @return: True if all renames succeeded, False otherwise
1791

1792
  """
1793
  result = True
1794
  for disk, unique_id in devlist:
1795
    dev = _RecursiveFindBD(disk)
1796
    if dev is None:
1797
      result = False
1798
      continue
1799
    try:
1800
      old_rpath = dev.dev_path
1801
      dev.Rename(unique_id)
1802
      new_rpath = dev.dev_path
1803
      if old_rpath != new_rpath:
1804
        DevCacheManager.RemoveCache(old_rpath)
1805
        # FIXME: we should add the new cache information here, like:
1806
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
1807
        # but we don't have the owner here - maybe parse from existing
1808
        # cache? for now, we only lose lvm data when we rename, which
1809
        # is less critical than DRBD or MD
1810
    except errors.BlockDeviceError, err:
1811
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
1812
      result = False
1813
  return result
1814

    
1815

    
1816
def _TransformFileStorageDir(file_storage_dir):
1817
  """Checks whether given file_storage_dir is valid.
1818

1819
  Checks wheter the given file_storage_dir is within the cluster-wide
1820
  default file_storage_dir stored in SimpleStore. Only paths under that
1821
  directory are allowed.
1822

1823
  @type file_storage_dir: str
1824
  @param file_storage_dir: the path to check
1825

1826
  @return: the normalized path if valid, None otherwise
1827

1828
  """
1829
  cfg = _GetConfig()
1830
  file_storage_dir = os.path.normpath(file_storage_dir)
1831
  base_file_storage_dir = cfg.GetFileStorageDir()
1832
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
1833
      base_file_storage_dir):
1834
    logging.error("file storage directory '%s' is not under base file"
1835
                  " storage directory '%s'",
1836
                  file_storage_dir, base_file_storage_dir)
1837
    return None
1838
  return file_storage_dir
1839

    
1840

    
1841
def CreateFileStorageDir(file_storage_dir):
1842
  """Create file storage directory.
1843

1844
  @type file_storage_dir: str
1845
  @param file_storage_dir: directory to create
1846

1847
  @rtype: tuple
1848
  @return: tuple with first element a boolean indicating wheter dir
1849
      creation was successful or not
1850

1851
  """
1852
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1853
  result = True,
1854
  if not file_storage_dir:
1855
    result = False,
1856
  else:
1857
    if os.path.exists(file_storage_dir):
1858
      if not os.path.isdir(file_storage_dir):
1859
        logging.error("'%s' is not a directory", file_storage_dir)
1860
        result = False,
1861
    else:
1862
      try:
1863
        os.makedirs(file_storage_dir, 0750)
1864
      except OSError, err:
1865
        logging.error("Cannot create file storage directory '%s': %s",
1866
                      file_storage_dir, err)
1867
        result = False,
1868
  return result
1869

    
1870

    
1871
def RemoveFileStorageDir(file_storage_dir):
1872
  """Remove file storage directory.
1873

1874
  Remove it only if it's empty. If not log an error and return.
1875

1876
  @type file_storage_dir: str
1877
  @param file_storage_dir: the directory we should cleanup
1878
  @rtype: tuple (success,)
1879
  @return: tuple of one element, C{success}, denoting
1880
      whether the operation was successfull
1881

1882
  """
1883
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
1884
  result = True,
1885
  if not file_storage_dir:
1886
    result = False,
1887
  else:
1888
    if os.path.exists(file_storage_dir):
1889
      if not os.path.isdir(file_storage_dir):
1890
        logging.error("'%s' is not a directory", file_storage_dir)
1891
        result = False,
1892
      # deletes dir only if empty, otherwise we want to return False
1893
      try:
1894
        os.rmdir(file_storage_dir)
1895
      except OSError, err:
1896
        logging.exception("Cannot remove file storage directory '%s'",
1897
                          file_storage_dir)
1898
        result = False,
1899
  return result
1900

    
1901

    
1902
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
1903
  """Rename the file storage directory.
1904

1905
  @type old_file_storage_dir: str
1906
  @param old_file_storage_dir: the current path
1907
  @type new_file_storage_dir: str
1908
  @param new_file_storage_dir: the name we should rename to
1909
  @rtype: tuple (success,)
1910
  @return: tuple of one element, C{success}, denoting
1911
      whether the operation was successful
1912

1913
  """
1914
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
1915
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
1916
  result = True,
1917
  if not old_file_storage_dir or not new_file_storage_dir:
1918
    result = False,
1919
  else:
1920
    if not os.path.exists(new_file_storage_dir):
1921
      if os.path.isdir(old_file_storage_dir):
1922
        try:
1923
          os.rename(old_file_storage_dir, new_file_storage_dir)
1924
        except OSError, err:
1925
          logging.exception("Cannot rename '%s' to '%s'",
1926
                            old_file_storage_dir, new_file_storage_dir)
1927
          result =  False,
1928
      else:
1929
        logging.error("'%s' is not a directory", old_file_storage_dir)
1930
        result = False,
1931
    else:
1932
      if os.path.exists(old_file_storage_dir):
1933
        logging.error("Cannot rename '%s' to '%s'. Both locations exist.",
1934
                      old_file_storage_dir, new_file_storage_dir)
1935
        result = False,
1936
  return result
1937

    
1938

    
1939
def _IsJobQueueFile(file_name):
1940
  """Checks whether the given filename is in the queue directory.
1941

1942
  @type file_name: str
1943
  @param file_name: the file name we should check
1944
  @rtype: boolean
1945
  @return: whether the file is under the queue directory
1946

1947
  """
1948
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
1949
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
1950

    
1951
  if not result:
1952
    logging.error("'%s' is not a file in the queue directory",
1953
                  file_name)
1954

    
1955
  return result
1956

    
1957

    
1958
def JobQueueUpdate(file_name, content):
1959
  """Updates a file in the queue directory.
1960

1961
  This is just a wrapper over L{utils.WriteFile}, with proper
1962
  checking.
1963

1964
  @type file_name: str
1965
  @param file_name: the job file name
1966
  @type content: str
1967
  @param content: the new job contents
1968
  @rtype: boolean
1969
  @return: the success of the operation
1970

1971
  """
1972
  if not _IsJobQueueFile(file_name):
1973
    return False
1974

    
1975
  # Write and replace the file atomically
1976
  utils.WriteFile(file_name, data=_Decompress(content))
1977

    
1978
  return True
1979

    
1980

    
1981
def JobQueueRename(old, new):
1982
  """Renames a job queue file.
1983

1984
  This is just a wrapper over L{os.rename} with proper checking.
1985

1986
  @type old: str
1987
  @param old: the old (actual) file name
1988
  @type new: str
1989
  @param new: the desired file name
1990
  @rtype: boolean
1991
  @return: the success of the operation
1992

1993
  """
1994
  if not (_IsJobQueueFile(old) and _IsJobQueueFile(new)):
1995
    return False
1996

    
1997
  os.rename(old, new)
1998

    
1999
  return True
2000

    
2001

    
2002
def JobQueueSetDrainFlag(drain_flag):
2003
  """Set the drain flag for the queue.
2004

2005
  This will set or unset the queue drain flag.
2006

2007
  @type drain_flag: boolean
2008
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2009
  @rtype: boolean
2010
  @return: always True
2011
  @warning: the function always returns True
2012

2013
  """
2014
  if drain_flag:
2015
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2016
  else:
2017
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2018

    
2019
  return True
2020

    
2021

    
2022
def CloseBlockDevices(disks):
2023
  """Closes the given block devices.
2024

2025
  This means they will be switched to secondary mode (in case of
2026
  DRBD).
2027

2028
  @type disks: list of L{objects.Disk}
2029
  @param disks: the list of disks to be closed
2030
  @rtype: tuple (success, message)
2031
  @return: a tuple of success and message, where success
2032
      indicates the succes of the operation, and message
2033
      which will contain the error details in case we
2034
      failed
2035

2036
  """
2037
  bdevs = []
2038
  for cf in disks:
2039
    rd = _RecursiveFindBD(cf)
2040
    if rd is None:
2041
      return (False, "Can't find device %s" % cf)
2042
    bdevs.append(rd)
2043

    
2044
  msg = []
2045
  for rd in bdevs:
2046
    try:
2047
      rd.Close()
2048
    except errors.BlockDeviceError, err:
2049
      msg.append(str(err))
2050
  if msg:
2051
    return (False, "Can't make devices secondary: %s" % ",".join(msg))
2052
  else:
2053
    return (True, "All devices secondary")
2054

    
2055

    
2056
def ValidateHVParams(hvname, hvparams):
2057
  """Validates the given hypervisor parameters.
2058

2059
  @type hvname: string
2060
  @param hvname: the hypervisor name
2061
  @type hvparams: dict
2062
  @param hvparams: the hypervisor parameters to be validated
2063
  @rtype: tuple (success, message)
2064
  @return: a tuple of success and message, where success
2065
      indicates the succes of the operation, and message
2066
      which will contain the error details in case we
2067
      failed
2068

2069
  """
2070
  try:
2071
    hv_type = hypervisor.GetHypervisor(hvname)
2072
    hv_type.ValidateParameters(hvparams)
2073
    return (True, "Validation passed")
2074
  except errors.HypervisorError, err:
2075
    return (False, str(err))
2076

    
2077

    
2078
def DemoteFromMC():
2079
  """Demotes the current node from master candidate role.
2080

2081
  """
2082
  # try to ensure we're not the master by mistake
2083
  master, myself = ssconf.GetMasterAndMyself()
2084
  if master == myself:
2085
    return (False, "ssconf status shows I'm the master node, will not demote")
2086
  pid_file = utils.DaemonPidFileName(constants.MASTERD_PID)
2087
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2088
    return (False, "The master daemon is running, will not demote")
2089
  try:
2090
    utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2091
  except EnvironmentError, err:
2092
    if err.errno != errno.ENOENT:
2093
      return (False, "Error while backing up cluster file: %s" % str(err))
2094
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2095
  return (True, "Done")
2096

    
2097

    
2098
class HooksRunner(object):
2099
  """Hook runner.
2100

2101
  This class is instantiated on the node side (ganeti-noded) and not
2102
  on the master side.
2103

2104
  """
2105
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2106

    
2107
  def __init__(self, hooks_base_dir=None):
2108
    """Constructor for hooks runner.
2109

2110
    @type hooks_base_dir: str or None
2111
    @param hooks_base_dir: if not None, this overrides the
2112
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2113

2114
    """
2115
    if hooks_base_dir is None:
2116
      hooks_base_dir = constants.HOOKS_BASE_DIR
2117
    self._BASE_DIR = hooks_base_dir
2118

    
2119
  @staticmethod
2120
  def ExecHook(script, env):
2121
    """Exec one hook script.
2122

2123
    @type script: str
2124
    @param script: the full path to the script
2125
    @type env: dict
2126
    @param env: the environment with which to exec the script
2127
    @rtype: tuple (success, message)
2128
    @return: a tuple of success and message, where success
2129
        indicates the succes of the operation, and message
2130
        which will contain the error details in case we
2131
        failed
2132

2133
    """
2134
    # exec the process using subprocess and log the output
2135
    fdstdin = None
2136
    try:
2137
      fdstdin = open("/dev/null", "r")
2138
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2139
                               stderr=subprocess.STDOUT, close_fds=True,
2140
                               shell=False, cwd="/", env=env)
2141
      output = ""
2142
      try:
2143
        output = child.stdout.read(4096)
2144
        child.stdout.close()
2145
      except EnvironmentError, err:
2146
        output += "Hook script error: %s" % str(err)
2147

    
2148
      while True:
2149
        try:
2150
          result = child.wait()
2151
          break
2152
        except EnvironmentError, err:
2153
          if err.errno == errno.EINTR:
2154
            continue
2155
          raise
2156
    finally:
2157
      # try not to leak fds
2158
      for fd in (fdstdin, ):
2159
        if fd is not None:
2160
          try:
2161
            fd.close()
2162
          except EnvironmentError, err:
2163
            # just log the error
2164
            #logging.exception("Error while closing fd %s", fd)
2165
            pass
2166

    
2167
    return result == 0, output
2168

    
2169
  def RunHooks(self, hpath, phase, env):
2170
    """Run the scripts in the hooks directory.
2171

2172
    @type hpath: str
2173
    @param hpath: the path to the hooks directory which
2174
        holds the scripts
2175
    @type phase: str
2176
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2177
        L{constants.HOOKS_PHASE_POST}
2178
    @type env: dict
2179
    @param env: dictionary with the environment for the hook
2180
    @rtype: list
2181
    @return: list of 3-element tuples:
2182
      - script path
2183
      - script result, either L{constants.HKR_SUCCESS} or
2184
        L{constants.HKR_FAIL}
2185
      - output of the script
2186

2187
    @raise errors.ProgrammerError: for invalid input
2188
        parameters
2189

2190
    """
2191
    if phase == constants.HOOKS_PHASE_PRE:
2192
      suffix = "pre"
2193
    elif phase == constants.HOOKS_PHASE_POST:
2194
      suffix = "post"
2195
    else:
2196
      raise errors.ProgrammerError("Unknown hooks phase: '%s'" % phase)
2197
    rr = []
2198

    
2199
    subdir = "%s-%s.d" % (hpath, suffix)
2200
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2201
    try:
2202
      dir_contents = utils.ListVisibleFiles(dir_name)
2203
    except OSError, err:
2204
      # FIXME: must log output in case of failures
2205
      return rr
2206

    
2207
    # we use the standard python sort order,
2208
    # so 00name is the recommended naming scheme
2209
    dir_contents.sort()
2210
    for relname in dir_contents:
2211
      fname = os.path.join(dir_name, relname)
2212
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2213
          self.RE_MASK.match(relname) is not None):
2214
        rrval = constants.HKR_SKIP
2215
        output = ""
2216
      else:
2217
        result, output = self.ExecHook(fname, env)
2218
        if not result:
2219
          rrval = constants.HKR_FAIL
2220
        else:
2221
          rrval = constants.HKR_SUCCESS
2222
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2223

    
2224
    return rr
2225

    
2226

    
2227
class IAllocatorRunner(object):
2228
  """IAllocator runner.
2229

2230
  This class is instantiated on the node side (ganeti-noded) and not on
2231
  the master side.
2232

2233
  """
2234
  def Run(self, name, idata):
2235
    """Run an iallocator script.
2236

2237
    @type name: str
2238
    @param name: the iallocator script name
2239
    @type idata: str
2240
    @param idata: the allocator input data
2241

2242
    @rtype: tuple
2243
    @return: four element tuple of:
2244
       - run status (one of the IARUN_ constants)
2245
       - stdout
2246
       - stderr
2247
       - fail reason (as from L{utils.RunResult})
2248

2249
    """
2250
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2251
                                  os.path.isfile)
2252
    if alloc_script is None:
2253
      return (constants.IARUN_NOTFOUND, None, None, None)
2254

    
2255
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2256
    try:
2257
      os.write(fd, idata)
2258
      os.close(fd)
2259
      result = utils.RunCmd([alloc_script, fin_name])
2260
      if result.failed:
2261
        return (constants.IARUN_FAILURE, result.stdout, result.stderr,
2262
                result.fail_reason)
2263
    finally:
2264
      os.unlink(fin_name)
2265

    
2266
    return (constants.IARUN_SUCCESS, result.stdout, result.stderr, None)
2267

    
2268

    
2269
class DevCacheManager(object):
2270
  """Simple class for managing a cache of block device information.
2271

2272
  """
2273
  _DEV_PREFIX = "/dev/"
2274
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2275

    
2276
  @classmethod
2277
  def _ConvertPath(cls, dev_path):
2278
    """Converts a /dev/name path to the cache file name.
2279

2280
    This replaces slashes with underscores and strips the /dev
2281
    prefix. It then returns the full path to the cache file.
2282

2283
    @type dev_path: str
2284
    @param dev_path: the C{/dev/} path name
2285
    @rtype: str
2286
    @return: the converted path name
2287

2288
    """
2289
    if dev_path.startswith(cls._DEV_PREFIX):
2290
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2291
    dev_path = dev_path.replace("/", "_")
2292
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2293
    return fpath
2294

    
2295
  @classmethod
2296
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2297
    """Updates the cache information for a given device.
2298

2299
    @type dev_path: str
2300
    @param dev_path: the pathname of the device
2301
    @type owner: str
2302
    @param owner: the owner (instance name) of the device
2303
    @type on_primary: bool
2304
    @param on_primary: whether this is the primary
2305
        node nor not
2306
    @type iv_name: str
2307
    @param iv_name: the instance-visible name of the
2308
        device, as in L{objects.Disk.iv_name}
2309

2310
    @rtype: None
2311

2312
    """
2313
    if dev_path is None:
2314
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2315
      return
2316
    fpath = cls._ConvertPath(dev_path)
2317
    if on_primary:
2318
      state = "primary"
2319
    else:
2320
      state = "secondary"
2321
    if iv_name is None:
2322
      iv_name = "not_visible"
2323
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2324
    try:
2325
      utils.WriteFile(fpath, data=fdata)
2326
    except EnvironmentError, err:
2327
      logging.exception("Can't update bdev cache for %s", dev_path)
2328

    
2329
  @classmethod
2330
  def RemoveCache(cls, dev_path):
2331
    """Remove data for a dev_path.
2332

2333
    This is just a wrapper over L{utils.RemoveFile} with a converted
2334
    path name and logging.
2335

2336
    @type dev_path: str
2337
    @param dev_path: the pathname of the device
2338

2339
    @rtype: None
2340

2341
    """
2342
    if dev_path is None:
2343
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2344
      return
2345
    fpath = cls._ConvertPath(dev_path)
2346
    try:
2347
      utils.RemoveFile(fpath)
2348
    except EnvironmentError, err:
2349
      logging.exception("Can't update bdev cache for %s", dev_path)