Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 7734de0a

History | View | Annotate | Download (85.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55

    
56

    
57
class RPCFail(Exception):
58
  """Class denoting RPC failure.
59

60
  Its argument is the error message.
61

62
  """
63

    
64

    
65
def _Fail(msg, *args, **kwargs):
66
  """Log an error and the raise an RPCFail exception.
67

68
  This exception is then handled specially in the ganeti daemon and
69
  turned into a 'failed' return type. As such, this function is a
70
  useful shortcut for logging the error and returning it to the master
71
  daemon.
72

73
  @type msg: string
74
  @param msg: the text of the exception
75
  @raise RPCFail
76

77
  """
78
  if args:
79
    msg = msg % args
80
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
81
    if "exc" in kwargs and kwargs["exc"]:
82
      logging.exception(msg)
83
    else:
84
      logging.error(msg)
85
  raise RPCFail(msg)
86

    
87

    
88
def _GetConfig():
89
  """Simple wrapper to return a SimpleStore.
90

91
  @rtype: L{ssconf.SimpleStore}
92
  @return: a SimpleStore instance
93

94
  """
95
  return ssconf.SimpleStore()
96

    
97

    
98
def _GetSshRunner(cluster_name):
99
  """Simple wrapper to return an SshRunner.
100

101
  @type cluster_name: str
102
  @param cluster_name: the cluster name, which is needed
103
      by the SshRunner constructor
104
  @rtype: L{ssh.SshRunner}
105
  @return: an SshRunner instance
106

107
  """
108
  return ssh.SshRunner(cluster_name)
109

    
110

    
111
def _Decompress(data):
112
  """Unpacks data compressed by the RPC client.
113

114
  @type data: list or tuple
115
  @param data: Data sent by RPC client
116
  @rtype: str
117
  @return: Decompressed data
118

119
  """
120
  assert isinstance(data, (list, tuple))
121
  assert len(data) == 2
122
  (encoding, content) = data
123
  if encoding == constants.RPC_ENCODING_NONE:
124
    return content
125
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126
    return zlib.decompress(base64.b64decode(content))
127
  else:
128
    raise AssertionError("Unknown data encoding")
129

    
130

    
131
def _CleanDirectory(path, exclude=None):
132
  """Removes all regular files in a directory.
133

134
  @type path: str
135
  @param path: the directory to clean
136
  @type exclude: list
137
  @param exclude: list of files to be excluded, defaults
138
      to the empty list
139

140
  """
141
  if not os.path.isdir(path):
142
    return
143
  if exclude is None:
144
    exclude = []
145
  else:
146
    # Normalize excluded paths
147
    exclude = [os.path.normpath(i) for i in exclude]
148

    
149
  for rel_name in utils.ListVisibleFiles(path):
150
    full_name = os.path.normpath(os.path.join(path, rel_name))
151
    if full_name in exclude:
152
      continue
153
    if os.path.isfile(full_name) and not os.path.islink(full_name):
154
      utils.RemoveFile(full_name)
155

    
156

    
157
def _BuildUploadFileList():
158
  """Build the list of allowed upload files.
159

160
  This is abstracted so that it's built only once at module import time.
161

162
  """
163
  allowed_files = set([
164
    constants.CLUSTER_CONF_FILE,
165
    constants.ETC_HOSTS,
166
    constants.SSH_KNOWN_HOSTS_FILE,
167
    constants.VNC_PASSWORD_FILE,
168
    constants.RAPI_CERT_FILE,
169
    constants.RAPI_USERS_FILE,
170
    constants.HMAC_CLUSTER_KEY,
171
    ])
172

    
173
  for hv_name in constants.HYPER_TYPES:
174
    hv_class = hypervisor.GetHypervisorClass(hv_name)
175
    allowed_files.update(hv_class.GetAncillaryFiles())
176

    
177
  return frozenset(allowed_files)
178

    
179

    
180
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181

    
182

    
183
def JobQueuePurge():
184
  """Removes job queue files and archived jobs.
185

186
  @rtype: tuple
187
  @return: True, None
188

189
  """
190
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192

    
193

    
194
def GetMasterInfo():
195
  """Returns master information.
196

197
  This is an utility function to compute master information, either
198
  for consumption here or from the node daemon.
199

200
  @rtype: tuple
201
  @return: master_netdev, master_ip, master_name
202
  @raise RPCFail: in case of errors
203

204
  """
205
  try:
206
    cfg = _GetConfig()
207
    master_netdev = cfg.GetMasterNetdev()
208
    master_ip = cfg.GetMasterIP()
209
    master_node = cfg.GetMasterNode()
210
  except errors.ConfigurationError, err:
211
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
212
  return (master_netdev, master_ip, master_node)
213

    
214

    
215
def StartMaster(start_daemons, no_voting):
216
  """Activate local node as master node.
217

218
  The function will always try activate the IP address of the master
219
  (unless someone else has it). It will also start the master daemons,
220
  based on the start_daemons parameter.
221

222
  @type start_daemons: boolean
223
  @param start_daemons: whether to also start the master
224
      daemons (ganeti-masterd and ganeti-rapi)
225
  @type no_voting: boolean
226
  @param no_voting: whether to start ganeti-masterd without a node vote
227
      (if start_daemons is True), but still non-interactively
228
  @rtype: None
229

230
  """
231
  # GetMasterInfo will raise an exception if not able to return data
232
  master_netdev, master_ip, _ = GetMasterInfo()
233

    
234
  err_msgs = []
235
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236
    if utils.OwnIpAddress(master_ip):
237
      # we already have the ip:
238
      logging.debug("Master IP already configured, doing nothing")
239
    else:
240
      msg = "Someone else has the master ip, not activating"
241
      logging.error(msg)
242
      err_msgs.append(msg)
243
  else:
244
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245
                           "dev", master_netdev, "label",
246
                           "%s:0" % master_netdev])
247
    if result.failed:
248
      msg = "Can't activate master IP: %s" % result.output
249
      logging.error(msg)
250
      err_msgs.append(msg)
251

    
252
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253
                           "-s", master_ip, master_ip])
254
    # we'll ignore the exit code of arping
255

    
256
  # and now start the master and rapi daemons
257
  if start_daemons:
258
    daemons_params = {
259
        'ganeti-masterd': [],
260
        'ganeti-rapi': [],
261
        }
262
    if no_voting:
263
      daemons_params['ganeti-masterd'].append('--no-voting')
264
      daemons_params['ganeti-masterd'].append('--yes-do-it')
265
    for daemon in daemons_params:
266
      cmd = [daemon]
267
      cmd.extend(daemons_params[daemon])
268
      result = utils.RunCmd(cmd)
269
      if result.failed:
270
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
271
        logging.error(msg)
272
        err_msgs.append(msg)
273

    
274
  if err_msgs:
275
    _Fail("; ".join(err_msgs))
276

    
277

    
278
def StopMaster(stop_daemons):
279
  """Deactivate this node as master.
280

281
  The function will always try to deactivate the IP address of the
282
  master. It will also stop the master daemons depending on the
283
  stop_daemons parameter.
284

285
  @type stop_daemons: boolean
286
  @param stop_daemons: whether to also stop the master daemons
287
      (ganeti-masterd and ganeti-rapi)
288
  @rtype: None
289

290
  """
291
  # TODO: log and report back to the caller the error failures; we
292
  # need to decide in which case we fail the RPC for this
293

    
294
  # GetMasterInfo will raise an exception if not able to return data
295
  master_netdev, master_ip, _ = GetMasterInfo()
296

    
297
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298
                         "dev", master_netdev])
299
  if result.failed:
300
    logging.error("Can't remove the master IP, error: %s", result.output)
301
    # but otherwise ignore the failure
302

    
303
  if stop_daemons:
304
    # stop/kill the rapi and the master daemon
305
    for daemon in constants.RAPI, constants.MASTERD:
306
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307

    
308

    
309
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310
  """Joins this node to the cluster.
311

312
  This does the following:
313
      - updates the hostkeys of the machine (rsa and dsa)
314
      - adds the ssh private key to the user
315
      - adds the ssh public key to the users' authorized_keys file
316

317
  @type dsa: str
318
  @param dsa: the DSA private key to write
319
  @type dsapub: str
320
  @param dsapub: the DSA public key to write
321
  @type rsa: str
322
  @param rsa: the RSA private key to write
323
  @type rsapub: str
324
  @param rsapub: the RSA public key to write
325
  @type sshkey: str
326
  @param sshkey: the SSH private key to write
327
  @type sshpub: str
328
  @param sshpub: the SSH public key to write
329
  @rtype: boolean
330
  @return: the success of the operation
331

332
  """
333
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337
  for name, content, mode in sshd_keys:
338
    utils.WriteFile(name, data=content, mode=mode)
339

    
340
  try:
341
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342
                                                    mkdir=True)
343
  except errors.OpExecError, err:
344
    _Fail("Error while processing user ssh files: %s", err, exc=True)
345

    
346
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347
    utils.WriteFile(name, data=content, mode=0600)
348

    
349
  utils.AddAuthorizedKey(auth_keys, sshpub)
350

    
351
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352

    
353

    
354
def LeaveCluster():
355
  """Cleans up and remove the current node.
356

357
  This function cleans up and prepares the current node to be removed
358
  from the cluster.
359

360
  If processing is successful, then it raises an
361
  L{errors.QuitGanetiException} which is used as a special case to
362
  shutdown the node daemon.
363

364
  """
365
  _CleanDirectory(constants.DATA_DIR)
366
  JobQueuePurge()
367

    
368
  try:
369
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370

    
371
    utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372

    
373
    utils.RemoveFile(priv_key)
374
    utils.RemoveFile(pub_key)
375
  except errors.OpExecError:
376
    logging.exception("Error while processing ssh files")
377

    
378
  try:
379
    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
380
    utils.RemoveFile(constants.RAPI_CERT_FILE)
381
    utils.RemoveFile(constants.SSL_CERT_FILE)
382
  except:
383
    logging.exception("Error while removing cluster secrets")
384

    
385
  confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
386

    
387
  if confd_pid:
388
    utils.KillProcess(confd_pid, timeout=2)
389

    
390
  # Raise a custom exception (handled in ganeti-noded)
391
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
392

    
393

    
394
def GetNodeInfo(vgname, hypervisor_type):
395
  """Gives back a hash with different information about the node.
396

397
  @type vgname: C{string}
398
  @param vgname: the name of the volume group to ask for disk space information
399
  @type hypervisor_type: C{str}
400
  @param hypervisor_type: the name of the hypervisor to ask for
401
      memory information
402
  @rtype: C{dict}
403
  @return: dictionary with the following keys:
404
      - vg_size is the size of the configured volume group in MiB
405
      - vg_free is the free size of the volume group in MiB
406
      - memory_dom0 is the memory allocated for domain0 in MiB
407
      - memory_free is the currently available (free) ram in MiB
408
      - memory_total is the total number of ram in MiB
409

410
  """
411
  outputarray = {}
412
  vginfo = _GetVGInfo(vgname)
413
  outputarray['vg_size'] = vginfo['vg_size']
414
  outputarray['vg_free'] = vginfo['vg_free']
415

    
416
  hyper = hypervisor.GetHypervisor(hypervisor_type)
417
  hyp_info = hyper.GetNodeInfo()
418
  if hyp_info is not None:
419
    outputarray.update(hyp_info)
420

    
421
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
422

    
423
  return outputarray
424

    
425

    
426
def VerifyNode(what, cluster_name):
427
  """Verify the status of the local node.
428

429
  Based on the input L{what} parameter, various checks are done on the
430
  local node.
431

432
  If the I{filelist} key is present, this list of
433
  files is checksummed and the file/checksum pairs are returned.
434

435
  If the I{nodelist} key is present, we check that we have
436
  connectivity via ssh with the target nodes (and check the hostname
437
  report).
438

439
  If the I{node-net-test} key is present, we check that we have
440
  connectivity to the given nodes via both primary IP and, if
441
  applicable, secondary IPs.
442

443
  @type what: C{dict}
444
  @param what: a dictionary of things to check:
445
      - filelist: list of files for which to compute checksums
446
      - nodelist: list of nodes we should check ssh communication with
447
      - node-net-test: list of nodes we should check node daemon port
448
        connectivity with
449
      - hypervisor: list with hypervisors to run the verify for
450
  @rtype: dict
451
  @return: a dictionary with the same keys as the input dict, and
452
      values representing the result of the checks
453

454
  """
455
  result = {}
456

    
457
  if constants.NV_HYPERVISOR in what:
458
    result[constants.NV_HYPERVISOR] = tmp = {}
459
    for hv_name in what[constants.NV_HYPERVISOR]:
460
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
461

    
462
  if constants.NV_FILELIST in what:
463
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
464
      what[constants.NV_FILELIST])
465

    
466
  if constants.NV_NODELIST in what:
467
    result[constants.NV_NODELIST] = tmp = {}
468
    random.shuffle(what[constants.NV_NODELIST])
469
    for node in what[constants.NV_NODELIST]:
470
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
471
      if not success:
472
        tmp[node] = message
473

    
474
  if constants.NV_NODENETTEST in what:
475
    result[constants.NV_NODENETTEST] = tmp = {}
476
    my_name = utils.HostInfo().name
477
    my_pip = my_sip = None
478
    for name, pip, sip in what[constants.NV_NODENETTEST]:
479
      if name == my_name:
480
        my_pip = pip
481
        my_sip = sip
482
        break
483
    if not my_pip:
484
      tmp[my_name] = ("Can't find my own primary/secondary IP"
485
                      " in the node list")
486
    else:
487
      port = utils.GetDaemonPort(constants.NODED)
488
      for name, pip, sip in what[constants.NV_NODENETTEST]:
489
        fail = []
490
        if not utils.TcpPing(pip, port, source=my_pip):
491
          fail.append("primary")
492
        if sip != pip:
493
          if not utils.TcpPing(sip, port, source=my_sip):
494
            fail.append("secondary")
495
        if fail:
496
          tmp[name] = ("failure using the %s interface(s)" %
497
                       " and ".join(fail))
498

    
499
  if constants.NV_LVLIST in what:
500
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
501

    
502
  if constants.NV_INSTANCELIST in what:
503
    result[constants.NV_INSTANCELIST] = GetInstanceList(
504
      what[constants.NV_INSTANCELIST])
505

    
506
  if constants.NV_VGLIST in what:
507
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
508

    
509
  if constants.NV_VERSION in what:
510
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
511
                                    constants.RELEASE_VERSION)
512

    
513
  if constants.NV_HVINFO in what:
514
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
515
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
516

    
517
  if constants.NV_DRBDLIST in what:
518
    try:
519
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
520
    except errors.BlockDeviceError, err:
521
      logging.warning("Can't get used minors list", exc_info=True)
522
      used_minors = str(err)
523
    result[constants.NV_DRBDLIST] = used_minors
524

    
525
  if constants.NV_NODESETUP in what:
526
    result[constants.NV_NODESETUP] = tmpr = []
527
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
528
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
529
                  " under /sys, missing required directories /sys/block"
530
                  " and /sys/class/net")
531
    if (not os.path.isdir("/proc/sys") or
532
        not os.path.isfile("/proc/sysrq-trigger")):
533
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
534
                  " under /proc, missing required directory /proc/sys and"
535
                  " the file /proc/sysrq-trigger")
536
  return result
537

    
538

    
539
def GetVolumeList(vg_name):
540
  """Compute list of logical volumes and their size.
541

542
  @type vg_name: str
543
  @param vg_name: the volume group whose LVs we should list
544
  @rtype: dict
545
  @return:
546
      dictionary of all partions (key) with value being a tuple of
547
      their size (in MiB), inactive and online status::
548

549
        {'test1': ('20.06', True, True)}
550

551
      in case of errors, a string is returned with the error
552
      details.
553

554
  """
555
  lvs = {}
556
  sep = '|'
557
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
558
                         "--separator=%s" % sep,
559
                         "-olv_name,lv_size,lv_attr", vg_name])
560
  if result.failed:
561
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
562

    
563
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
564
  for line in result.stdout.splitlines():
565
    line = line.strip()
566
    match = valid_line_re.match(line)
567
    if not match:
568
      logging.error("Invalid line returned from lvs output: '%s'", line)
569
      continue
570
    name, size, attr = match.groups()
571
    inactive = attr[4] == '-'
572
    online = attr[5] == 'o'
573
    virtual = attr[0] == 'v'
574
    if virtual:
575
      # we don't want to report such volumes as existing, since they
576
      # don't really hold data
577
      continue
578
    lvs[name] = (size, inactive, online)
579

    
580
  return lvs
581

    
582

    
583
def ListVolumeGroups():
584
  """List the volume groups and their size.
585

586
  @rtype: dict
587
  @return: dictionary with keys volume name and values the
588
      size of the volume
589

590
  """
591
  return utils.ListVolumeGroups()
592

    
593

    
594
def NodeVolumes():
595
  """List all volumes on this node.
596

597
  @rtype: list
598
  @return:
599
    A list of dictionaries, each having four keys:
600
      - name: the logical volume name,
601
      - size: the size of the logical volume
602
      - dev: the physical device on which the LV lives
603
      - vg: the volume group to which it belongs
604

605
    In case of errors, we return an empty list and log the
606
    error.
607

608
    Note that since a logical volume can live on multiple physical
609
    volumes, the resulting list might include a logical volume
610
    multiple times.
611

612
  """
613
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
614
                         "--separator=|",
615
                         "--options=lv_name,lv_size,devices,vg_name"])
616
  if result.failed:
617
    _Fail("Failed to list logical volumes, lvs output: %s",
618
          result.output)
619

    
620
  def parse_dev(dev):
621
    if '(' in dev:
622
      return dev.split('(')[0]
623
    else:
624
      return dev
625

    
626
  def map_line(line):
627
    return {
628
      'name': line[0].strip(),
629
      'size': line[1].strip(),
630
      'dev': parse_dev(line[2].strip()),
631
      'vg': line[3].strip(),
632
    }
633

    
634
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
635
          if line.count('|') >= 3]
636

    
637

    
638
def BridgesExist(bridges_list):
639
  """Check if a list of bridges exist on the current node.
640

641
  @rtype: boolean
642
  @return: C{True} if all of them exist, C{False} otherwise
643

644
  """
645
  missing = []
646
  for bridge in bridges_list:
647
    if not utils.BridgeExists(bridge):
648
      missing.append(bridge)
649

    
650
  if missing:
651
    _Fail("Missing bridges %s", ", ".join(missing))
652

    
653

    
654
def GetInstanceList(hypervisor_list):
655
  """Provides a list of instances.
656

657
  @type hypervisor_list: list
658
  @param hypervisor_list: the list of hypervisors to query information
659

660
  @rtype: list
661
  @return: a list of all running instances on the current node
662
    - instance1.example.com
663
    - instance2.example.com
664

665
  """
666
  results = []
667
  for hname in hypervisor_list:
668
    try:
669
      names = hypervisor.GetHypervisor(hname).ListInstances()
670
      results.extend(names)
671
    except errors.HypervisorError, err:
672
      _Fail("Error enumerating instances (hypervisor %s): %s",
673
            hname, err, exc=True)
674

    
675
  return results
676

    
677

    
678
def GetInstanceInfo(instance, hname):
679
  """Gives back the information about an instance as a dictionary.
680

681
  @type instance: string
682
  @param instance: the instance name
683
  @type hname: string
684
  @param hname: the hypervisor type of the instance
685

686
  @rtype: dict
687
  @return: dictionary with the following keys:
688
      - memory: memory size of instance (int)
689
      - state: xen state of instance (string)
690
      - time: cpu time of instance (float)
691

692
  """
693
  output = {}
694

    
695
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
696
  if iinfo is not None:
697
    output['memory'] = iinfo[2]
698
    output['state'] = iinfo[4]
699
    output['time'] = iinfo[5]
700

    
701
  return output
702

    
703

    
704
def GetInstanceMigratable(instance):
705
  """Gives whether an instance can be migrated.
706

707
  @type instance: L{objects.Instance}
708
  @param instance: object representing the instance to be checked.
709

710
  @rtype: tuple
711
  @return: tuple of (result, description) where:
712
      - result: whether the instance can be migrated or not
713
      - description: a description of the issue, if relevant
714

715
  """
716
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
717
  iname = instance.name
718
  if iname not in hyper.ListInstances():
719
    _Fail("Instance %s is not running", iname)
720

    
721
  for idx in range(len(instance.disks)):
722
    link_name = _GetBlockDevSymlinkPath(iname, idx)
723
    if not os.path.islink(link_name):
724
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
725

    
726

    
727
def GetAllInstancesInfo(hypervisor_list):
728
  """Gather data about all instances.
729

730
  This is the equivalent of L{GetInstanceInfo}, except that it
731
  computes data for all instances at once, thus being faster if one
732
  needs data about more than one instance.
733

734
  @type hypervisor_list: list
735
  @param hypervisor_list: list of hypervisors to query for instance data
736

737
  @rtype: dict
738
  @return: dictionary of instance: data, with data having the following keys:
739
      - memory: memory size of instance (int)
740
      - state: xen state of instance (string)
741
      - time: cpu time of instance (float)
742
      - vcpus: the number of vcpus
743

744
  """
745
  output = {}
746

    
747
  for hname in hypervisor_list:
748
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
749
    if iinfo:
750
      for name, _, memory, vcpus, state, times in iinfo:
751
        value = {
752
          'memory': memory,
753
          'vcpus': vcpus,
754
          'state': state,
755
          'time': times,
756
          }
757
        if name in output:
758
          # we only check static parameters, like memory and vcpus,
759
          # and not state and time which can change between the
760
          # invocations of the different hypervisors
761
          for key in 'memory', 'vcpus':
762
            if value[key] != output[name][key]:
763
              _Fail("Instance %s is running twice"
764
                    " with different parameters", name)
765
        output[name] = value
766

    
767
  return output
768

    
769

    
770
def InstanceOsAdd(instance, reinstall):
771
  """Add an OS to an instance.
772

773
  @type instance: L{objects.Instance}
774
  @param instance: Instance whose OS is to be installed
775
  @type reinstall: boolean
776
  @param reinstall: whether this is an instance reinstall
777
  @rtype: None
778

779
  """
780
  inst_os = OSFromDisk(instance.os)
781

    
782
  create_env = OSEnvironment(instance, inst_os)
783
  if reinstall:
784
    create_env['INSTANCE_REINSTALL'] = "1"
785

    
786
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
787
                                     instance.name, int(time.time()))
788

    
789
  result = utils.RunCmd([inst_os.create_script], env=create_env,
790
                        cwd=inst_os.path, output=logfile,)
791
  if result.failed:
792
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
793
                  " output: %s", result.cmd, result.fail_reason, logfile,
794
                  result.output)
795
    lines = [utils.SafeEncode(val)
796
             for val in utils.TailFile(logfile, lines=20)]
797
    _Fail("OS create script failed (%s), last lines in the"
798
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
799

    
800

    
801
def RunRenameInstance(instance, old_name):
802
  """Run the OS rename script for an instance.
803

804
  @type instance: L{objects.Instance}
805
  @param instance: Instance whose OS is to be installed
806
  @type old_name: string
807
  @param old_name: previous instance name
808
  @rtype: boolean
809
  @return: the success of the operation
810

811
  """
812
  inst_os = OSFromDisk(instance.os)
813

    
814
  rename_env = OSEnvironment(instance, inst_os)
815
  rename_env['OLD_INSTANCE_NAME'] = old_name
816

    
817
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
818
                                           old_name,
819
                                           instance.name, int(time.time()))
820

    
821
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
822
                        cwd=inst_os.path, output=logfile)
823

    
824
  if result.failed:
825
    logging.error("os create command '%s' returned error: %s output: %s",
826
                  result.cmd, result.fail_reason, result.output)
827
    lines = [utils.SafeEncode(val)
828
             for val in utils.TailFile(logfile, lines=20)]
829
    _Fail("OS rename script failed (%s), last lines in the"
830
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
831

    
832

    
833
def _GetVGInfo(vg_name):
834
  """Get information about the volume group.
835

836
  @type vg_name: str
837
  @param vg_name: the volume group which we query
838
  @rtype: dict
839
  @return:
840
    A dictionary with the following keys:
841
      - C{vg_size} is the total size of the volume group in MiB
842
      - C{vg_free} is the free size of the volume group in MiB
843
      - C{pv_count} are the number of physical disks in that VG
844

845
    If an error occurs during gathering of data, we return the same dict
846
    with keys all set to None.
847

848
  """
849
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
850

    
851
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
852
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
853

    
854
  if retval.failed:
855
    logging.error("volume group %s not present", vg_name)
856
    return retdic
857
  valarr = retval.stdout.strip().rstrip(':').split(':')
858
  if len(valarr) == 3:
859
    try:
860
      retdic = {
861
        "vg_size": int(round(float(valarr[0]), 0)),
862
        "vg_free": int(round(float(valarr[1]), 0)),
863
        "pv_count": int(valarr[2]),
864
        }
865
    except ValueError, err:
866
      logging.exception("Fail to parse vgs output: %s", err)
867
  else:
868
    logging.error("vgs output has the wrong number of fields (expected"
869
                  " three): %s", str(valarr))
870
  return retdic
871

    
872

    
873
def _GetBlockDevSymlinkPath(instance_name, idx):
874
  return os.path.join(constants.DISK_LINKS_DIR,
875
                      "%s:%d" % (instance_name, idx))
876

    
877

    
878
def _SymlinkBlockDev(instance_name, device_path, idx):
879
  """Set up symlinks to a instance's block device.
880

881
  This is an auxiliary function run when an instance is start (on the primary
882
  node) or when an instance is migrated (on the target node).
883

884

885
  @param instance_name: the name of the target instance
886
  @param device_path: path of the physical block device, on the node
887
  @param idx: the disk index
888
  @return: absolute path to the disk's symlink
889

890
  """
891
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
892
  try:
893
    os.symlink(device_path, link_name)
894
  except OSError, err:
895
    if err.errno == errno.EEXIST:
896
      if (not os.path.islink(link_name) or
897
          os.readlink(link_name) != device_path):
898
        os.remove(link_name)
899
        os.symlink(device_path, link_name)
900
    else:
901
      raise
902

    
903
  return link_name
904

    
905

    
906
def _RemoveBlockDevLinks(instance_name, disks):
907
  """Remove the block device symlinks belonging to the given instance.
908

909
  """
910
  for idx, _ in enumerate(disks):
911
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
912
    if os.path.islink(link_name):
913
      try:
914
        os.remove(link_name)
915
      except OSError:
916
        logging.exception("Can't remove symlink '%s'", link_name)
917

    
918

    
919
def _GatherAndLinkBlockDevs(instance):
920
  """Set up an instance's block device(s).
921

922
  This is run on the primary node at instance startup. The block
923
  devices must be already assembled.
924

925
  @type instance: L{objects.Instance}
926
  @param instance: the instance whose disks we shoul assemble
927
  @rtype: list
928
  @return: list of (disk_object, device_path)
929

930
  """
931
  block_devices = []
932
  for idx, disk in enumerate(instance.disks):
933
    device = _RecursiveFindBD(disk)
934
    if device is None:
935
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
936
                                    str(disk))
937
    device.Open()
938
    try:
939
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
940
    except OSError, e:
941
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
942
                                    e.strerror)
943

    
944
    block_devices.append((disk, link_name))
945

    
946
  return block_devices
947

    
948

    
949
def StartInstance(instance):
950
  """Start an instance.
951

952
  @type instance: L{objects.Instance}
953
  @param instance: the instance object
954
  @rtype: None
955

956
  """
957
  running_instances = GetInstanceList([instance.hypervisor])
958

    
959
  if instance.name in running_instances:
960
    logging.info("Instance %s already running, not starting", instance.name)
961
    return
962

    
963
  try:
964
    block_devices = _GatherAndLinkBlockDevs(instance)
965
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
966
    hyper.StartInstance(instance, block_devices)
967
  except errors.BlockDeviceError, err:
968
    _Fail("Block device error: %s", err, exc=True)
969
  except errors.HypervisorError, err:
970
    _RemoveBlockDevLinks(instance.name, instance.disks)
971
    _Fail("Hypervisor error: %s", err, exc=True)
972

    
973

    
974
def InstanceShutdown(instance, timeout):
975
  """Shut an instance down.
976

977
  @note: this functions uses polling with a hardcoded timeout.
978

979
  @type instance: L{objects.Instance}
980
  @param instance: the instance object
981
  @type timeout: integer
982
  @param timeout: maximum timeout for soft shutdown
983
  @rtype: None
984

985
  """
986
  hv_name = instance.hypervisor
987
  hyper = hypervisor.GetHypervisor(hv_name)
988
  running_instances = hyper.ListInstances()
989
  iname = instance.name
990

    
991
  if iname not in running_instances:
992
    logging.info("Instance %s not running, doing nothing", iname)
993
    return
994

    
995
  start = time.time()
996
  end = start + timeout
997
  sleep_time = 5
998

    
999
  tried_once = False
1000
  while time.time() < end:
1001
    try:
1002
      hyper.StopInstance(instance, retry=tried_once)
1003
    except errors.HypervisorError, err:
1004
      _Fail("Failed to stop instance %s: %s", iname, err)
1005
    tried_once = True
1006
    time.sleep(sleep_time)
1007
    if instance.name not in hyper.ListInstances():
1008
      break
1009
  else:
1010
    # the shutdown did not succeed
1011
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1012

    
1013
    try:
1014
      hyper.StopInstance(instance, force=True)
1015
    except errors.HypervisorError, err:
1016
      _Fail("Failed to force stop instance %s: %s", iname, err)
1017

    
1018
    time.sleep(1)
1019
    if instance.name in GetInstanceList([hv_name]):
1020
      _Fail("Could not shutdown instance %s even by destroy", iname)
1021

    
1022
  _RemoveBlockDevLinks(iname, instance.disks)
1023

    
1024

    
1025
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1026
  """Reboot an instance.
1027

1028
  @type instance: L{objects.Instance}
1029
  @param instance: the instance object to reboot
1030
  @type reboot_type: str
1031
  @param reboot_type: the type of reboot, one the following
1032
    constants:
1033
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1034
        instance OS, do not recreate the VM
1035
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1036
        restart the VM (at the hypervisor level)
1037
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1038
        not accepted here, since that mode is handled differently, in
1039
        cmdlib, and translates into full stop and start of the
1040
        instance (instead of a call_instance_reboot RPC)
1041
  @type timeout: integer
1042
  @param timeout: maximum timeout for soft shutdown
1043
  @rtype: None
1044

1045
  """
1046
  running_instances = GetInstanceList([instance.hypervisor])
1047

    
1048
  if instance.name not in running_instances:
1049
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1050

    
1051
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1052
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1053
    try:
1054
      hyper.RebootInstance(instance)
1055
    except errors.HypervisorError, err:
1056
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1057
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1058
    try:
1059
      InstanceShutdown(instance, shutdown_timeout)
1060
      return StartInstance(instance)
1061
    except errors.HypervisorError, err:
1062
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1063
  else:
1064
    _Fail("Invalid reboot_type received: %s", reboot_type)
1065

    
1066

    
1067
def MigrationInfo(instance):
1068
  """Gather information about an instance to be migrated.
1069

1070
  @type instance: L{objects.Instance}
1071
  @param instance: the instance definition
1072

1073
  """
1074
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1075
  try:
1076
    info = hyper.MigrationInfo(instance)
1077
  except errors.HypervisorError, err:
1078
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1079
  return info
1080

    
1081

    
1082
def AcceptInstance(instance, info, target):
1083
  """Prepare the node to accept an instance.
1084

1085
  @type instance: L{objects.Instance}
1086
  @param instance: the instance definition
1087
  @type info: string/data (opaque)
1088
  @param info: migration information, from the source node
1089
  @type target: string
1090
  @param target: target host (usually ip), on this node
1091

1092
  """
1093
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1094
  try:
1095
    hyper.AcceptInstance(instance, info, target)
1096
  except errors.HypervisorError, err:
1097
    _Fail("Failed to accept instance: %s", err, exc=True)
1098

    
1099

    
1100
def FinalizeMigration(instance, info, success):
1101
  """Finalize any preparation to accept an instance.
1102

1103
  @type instance: L{objects.Instance}
1104
  @param instance: the instance definition
1105
  @type info: string/data (opaque)
1106
  @param info: migration information, from the source node
1107
  @type success: boolean
1108
  @param success: whether the migration was a success or a failure
1109

1110
  """
1111
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1112
  try:
1113
    hyper.FinalizeMigration(instance, info, success)
1114
  except errors.HypervisorError, err:
1115
    _Fail("Failed to finalize migration: %s", err, exc=True)
1116

    
1117

    
1118
def MigrateInstance(instance, target, live):
1119
  """Migrates an instance to another node.
1120

1121
  @type instance: L{objects.Instance}
1122
  @param instance: the instance definition
1123
  @type target: string
1124
  @param target: the target node name
1125
  @type live: boolean
1126
  @param live: whether the migration should be done live or not (the
1127
      interpretation of this parameter is left to the hypervisor)
1128
  @rtype: tuple
1129
  @return: a tuple of (success, msg) where:
1130
      - succes is a boolean denoting the success/failure of the operation
1131
      - msg is a string with details in case of failure
1132

1133
  """
1134
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1135

    
1136
  try:
1137
    hyper.MigrateInstance(instance.name, target, live)
1138
  except errors.HypervisorError, err:
1139
    _Fail("Failed to migrate instance: %s", err, exc=True)
1140

    
1141

    
1142
def BlockdevCreate(disk, size, owner, on_primary, info):
1143
  """Creates a block device for an instance.
1144

1145
  @type disk: L{objects.Disk}
1146
  @param disk: the object describing the disk we should create
1147
  @type size: int
1148
  @param size: the size of the physical underlying device, in MiB
1149
  @type owner: str
1150
  @param owner: the name of the instance for which disk is created,
1151
      used for device cache data
1152
  @type on_primary: boolean
1153
  @param on_primary:  indicates if it is the primary node or not
1154
  @type info: string
1155
  @param info: string that will be sent to the physical device
1156
      creation, used for example to set (LVM) tags on LVs
1157

1158
  @return: the new unique_id of the device (this can sometime be
1159
      computed only after creation), or None. On secondary nodes,
1160
      it's not required to return anything.
1161

1162
  """
1163
  clist = []
1164
  if disk.children:
1165
    for child in disk.children:
1166
      try:
1167
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1168
      except errors.BlockDeviceError, err:
1169
        _Fail("Can't assemble device %s: %s", child, err)
1170
      if on_primary or disk.AssembleOnSecondary():
1171
        # we need the children open in case the device itself has to
1172
        # be assembled
1173
        try:
1174
          crdev.Open()
1175
        except errors.BlockDeviceError, err:
1176
          _Fail("Can't make child '%s' read-write: %s", child, err)
1177
      clist.append(crdev)
1178

    
1179
  try:
1180
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1181
  except errors.BlockDeviceError, err:
1182
    _Fail("Can't create block device: %s", err)
1183

    
1184
  if on_primary or disk.AssembleOnSecondary():
1185
    try:
1186
      device.Assemble()
1187
    except errors.BlockDeviceError, err:
1188
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1189
    device.SetSyncSpeed(constants.SYNC_SPEED)
1190
    if on_primary or disk.OpenOnSecondary():
1191
      try:
1192
        device.Open(force=True)
1193
      except errors.BlockDeviceError, err:
1194
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1195
    DevCacheManager.UpdateCache(device.dev_path, owner,
1196
                                on_primary, disk.iv_name)
1197

    
1198
  device.SetInfo(info)
1199

    
1200
  return device.unique_id
1201

    
1202

    
1203
def BlockdevRemove(disk):
1204
  """Remove a block device.
1205

1206
  @note: This is intended to be called recursively.
1207

1208
  @type disk: L{objects.Disk}
1209
  @param disk: the disk object we should remove
1210
  @rtype: boolean
1211
  @return: the success of the operation
1212

1213
  """
1214
  msgs = []
1215
  try:
1216
    rdev = _RecursiveFindBD(disk)
1217
  except errors.BlockDeviceError, err:
1218
    # probably can't attach
1219
    logging.info("Can't attach to device %s in remove", disk)
1220
    rdev = None
1221
  if rdev is not None:
1222
    r_path = rdev.dev_path
1223
    try:
1224
      rdev.Remove()
1225
    except errors.BlockDeviceError, err:
1226
      msgs.append(str(err))
1227
    if not msgs:
1228
      DevCacheManager.RemoveCache(r_path)
1229

    
1230
  if disk.children:
1231
    for child in disk.children:
1232
      try:
1233
        BlockdevRemove(child)
1234
      except RPCFail, err:
1235
        msgs.append(str(err))
1236

    
1237
  if msgs:
1238
    _Fail("; ".join(msgs))
1239

    
1240

    
1241
def _RecursiveAssembleBD(disk, owner, as_primary):
1242
  """Activate a block device for an instance.
1243

1244
  This is run on the primary and secondary nodes for an instance.
1245

1246
  @note: this function is called recursively.
1247

1248
  @type disk: L{objects.Disk}
1249
  @param disk: the disk we try to assemble
1250
  @type owner: str
1251
  @param owner: the name of the instance which owns the disk
1252
  @type as_primary: boolean
1253
  @param as_primary: if we should make the block device
1254
      read/write
1255

1256
  @return: the assembled device or None (in case no device
1257
      was assembled)
1258
  @raise errors.BlockDeviceError: in case there is an error
1259
      during the activation of the children or the device
1260
      itself
1261

1262
  """
1263
  children = []
1264
  if disk.children:
1265
    mcn = disk.ChildrenNeeded()
1266
    if mcn == -1:
1267
      mcn = 0 # max number of Nones allowed
1268
    else:
1269
      mcn = len(disk.children) - mcn # max number of Nones
1270
    for chld_disk in disk.children:
1271
      try:
1272
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1273
      except errors.BlockDeviceError, err:
1274
        if children.count(None) >= mcn:
1275
          raise
1276
        cdev = None
1277
        logging.error("Error in child activation (but continuing): %s",
1278
                      str(err))
1279
      children.append(cdev)
1280

    
1281
  if as_primary or disk.AssembleOnSecondary():
1282
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1283
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1284
    result = r_dev
1285
    if as_primary or disk.OpenOnSecondary():
1286
      r_dev.Open()
1287
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1288
                                as_primary, disk.iv_name)
1289

    
1290
  else:
1291
    result = True
1292
  return result
1293

    
1294

    
1295
def BlockdevAssemble(disk, owner, as_primary):
1296
  """Activate a block device for an instance.
1297

1298
  This is a wrapper over _RecursiveAssembleBD.
1299

1300
  @rtype: str or boolean
1301
  @return: a C{/dev/...} path for primary nodes, and
1302
      C{True} for secondary nodes
1303

1304
  """
1305
  try:
1306
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1307
    if isinstance(result, bdev.BlockDev):
1308
      result = result.dev_path
1309
  except errors.BlockDeviceError, err:
1310
    _Fail("Error while assembling disk: %s", err, exc=True)
1311

    
1312
  return result
1313

    
1314

    
1315
def BlockdevShutdown(disk):
1316
  """Shut down a block device.
1317

1318
  First, if the device is assembled (Attach() is successful), then
1319
  the device is shutdown. Then the children of the device are
1320
  shutdown.
1321

1322
  This function is called recursively. Note that we don't cache the
1323
  children or such, as oppossed to assemble, shutdown of different
1324
  devices doesn't require that the upper device was active.
1325

1326
  @type disk: L{objects.Disk}
1327
  @param disk: the description of the disk we should
1328
      shutdown
1329
  @rtype: None
1330

1331
  """
1332
  msgs = []
1333
  r_dev = _RecursiveFindBD(disk)
1334
  if r_dev is not None:
1335
    r_path = r_dev.dev_path
1336
    try:
1337
      r_dev.Shutdown()
1338
      DevCacheManager.RemoveCache(r_path)
1339
    except errors.BlockDeviceError, err:
1340
      msgs.append(str(err))
1341

    
1342
  if disk.children:
1343
    for child in disk.children:
1344
      try:
1345
        BlockdevShutdown(child)
1346
      except RPCFail, err:
1347
        msgs.append(str(err))
1348

    
1349
  if msgs:
1350
    _Fail("; ".join(msgs))
1351

    
1352

    
1353
def BlockdevAddchildren(parent_cdev, new_cdevs):
1354
  """Extend a mirrored block device.
1355

1356
  @type parent_cdev: L{objects.Disk}
1357
  @param parent_cdev: the disk to which we should add children
1358
  @type new_cdevs: list of L{objects.Disk}
1359
  @param new_cdevs: the list of children which we should add
1360
  @rtype: None
1361

1362
  """
1363
  parent_bdev = _RecursiveFindBD(parent_cdev)
1364
  if parent_bdev is None:
1365
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1366
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1367
  if new_bdevs.count(None) > 0:
1368
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1369
  parent_bdev.AddChildren(new_bdevs)
1370

    
1371

    
1372
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1373
  """Shrink a mirrored block device.
1374

1375
  @type parent_cdev: L{objects.Disk}
1376
  @param parent_cdev: the disk from which we should remove children
1377
  @type new_cdevs: list of L{objects.Disk}
1378
  @param new_cdevs: the list of children which we should remove
1379
  @rtype: None
1380

1381
  """
1382
  parent_bdev = _RecursiveFindBD(parent_cdev)
1383
  if parent_bdev is None:
1384
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1385
  devs = []
1386
  for disk in new_cdevs:
1387
    rpath = disk.StaticDevPath()
1388
    if rpath is None:
1389
      bd = _RecursiveFindBD(disk)
1390
      if bd is None:
1391
        _Fail("Can't find device %s while removing children", disk)
1392
      else:
1393
        devs.append(bd.dev_path)
1394
    else:
1395
      devs.append(rpath)
1396
  parent_bdev.RemoveChildren(devs)
1397

    
1398

    
1399
def BlockdevGetmirrorstatus(disks):
1400
  """Get the mirroring status of a list of devices.
1401

1402
  @type disks: list of L{objects.Disk}
1403
  @param disks: the list of disks which we should query
1404
  @rtype: disk
1405
  @return:
1406
      a list of (mirror_done, estimated_time) tuples, which
1407
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1408
  @raise errors.BlockDeviceError: if any of the disks cannot be
1409
      found
1410

1411
  """
1412
  stats = []
1413
  for dsk in disks:
1414
    rbd = _RecursiveFindBD(dsk)
1415
    if rbd is None:
1416
      _Fail("Can't find device %s", dsk)
1417

    
1418
    stats.append(rbd.CombinedSyncStatus())
1419

    
1420
  return stats
1421

    
1422

    
1423
def _RecursiveFindBD(disk):
1424
  """Check if a device is activated.
1425

1426
  If so, return information about the real device.
1427

1428
  @type disk: L{objects.Disk}
1429
  @param disk: the disk object we need to find
1430

1431
  @return: None if the device can't be found,
1432
      otherwise the device instance
1433

1434
  """
1435
  children = []
1436
  if disk.children:
1437
    for chdisk in disk.children:
1438
      children.append(_RecursiveFindBD(chdisk))
1439

    
1440
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1441

    
1442

    
1443
def BlockdevFind(disk):
1444
  """Check if a device is activated.
1445

1446
  If it is, return information about the real device.
1447

1448
  @type disk: L{objects.Disk}
1449
  @param disk: the disk to find
1450
  @rtype: None or objects.BlockDevStatus
1451
  @return: None if the disk cannot be found, otherwise a the current
1452
           information
1453

1454
  """
1455
  try:
1456
    rbd = _RecursiveFindBD(disk)
1457
  except errors.BlockDeviceError, err:
1458
    _Fail("Failed to find device: %s", err, exc=True)
1459

    
1460
  if rbd is None:
1461
    return None
1462

    
1463
  return rbd.GetSyncStatus()
1464

    
1465

    
1466
def BlockdevGetsize(disks):
1467
  """Computes the size of the given disks.
1468

1469
  If a disk is not found, returns None instead.
1470

1471
  @type disks: list of L{objects.Disk}
1472
  @param disks: the list of disk to compute the size for
1473
  @rtype: list
1474
  @return: list with elements None if the disk cannot be found,
1475
      otherwise the size
1476

1477
  """
1478
  result = []
1479
  for cf in disks:
1480
    try:
1481
      rbd = _RecursiveFindBD(cf)
1482
    except errors.BlockDeviceError, err:
1483
      result.append(None)
1484
      continue
1485
    if rbd is None:
1486
      result.append(None)
1487
    else:
1488
      result.append(rbd.GetActualSize())
1489
  return result
1490

    
1491

    
1492
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1493
  """Export a block device to a remote node.
1494

1495
  @type disk: L{objects.Disk}
1496
  @param disk: the description of the disk to export
1497
  @type dest_node: str
1498
  @param dest_node: the destination node to export to
1499
  @type dest_path: str
1500
  @param dest_path: the destination path on the target node
1501
  @type cluster_name: str
1502
  @param cluster_name: the cluster name, needed for SSH hostalias
1503
  @rtype: None
1504

1505
  """
1506
  real_disk = _RecursiveFindBD(disk)
1507
  if real_disk is None:
1508
    _Fail("Block device '%s' is not set up", disk)
1509

    
1510
  real_disk.Open()
1511

    
1512
  # the block size on the read dd is 1MiB to match our units
1513
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1514
                               "dd if=%s bs=1048576 count=%s",
1515
                               real_disk.dev_path, str(disk.size))
1516

    
1517
  # we set here a smaller block size as, due to ssh buffering, more
1518
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1519
  # device is not already there or we pass a wrong path; we use
1520
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1521
  # to not buffer too much memory; this means that at best, we flush
1522
  # every 64k, which will not be very fast
1523
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1524
                                " oflag=dsync", dest_path)
1525

    
1526
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1527
                                                   constants.GANETI_RUNAS,
1528
                                                   destcmd)
1529

    
1530
  # all commands have been checked, so we're safe to combine them
1531
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1532

    
1533
  result = utils.RunCmd(["bash", "-c", command])
1534

    
1535
  if result.failed:
1536
    _Fail("Disk copy command '%s' returned error: %s"
1537
          " output: %s", command, result.fail_reason, result.output)
1538

    
1539

    
1540
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1541
  """Write a file to the filesystem.
1542

1543
  This allows the master to overwrite(!) a file. It will only perform
1544
  the operation if the file belongs to a list of configuration files.
1545

1546
  @type file_name: str
1547
  @param file_name: the target file name
1548
  @type data: str
1549
  @param data: the new contents of the file
1550
  @type mode: int
1551
  @param mode: the mode to give the file (can be None)
1552
  @type uid: int
1553
  @param uid: the owner of the file (can be -1 for default)
1554
  @type gid: int
1555
  @param gid: the group of the file (can be -1 for default)
1556
  @type atime: float
1557
  @param atime: the atime to set on the file (can be None)
1558
  @type mtime: float
1559
  @param mtime: the mtime to set on the file (can be None)
1560
  @rtype: None
1561

1562
  """
1563
  if not os.path.isabs(file_name):
1564
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1565

    
1566
  if file_name not in _ALLOWED_UPLOAD_FILES:
1567
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1568
          file_name)
1569

    
1570
  raw_data = _Decompress(data)
1571

    
1572
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1573
                  atime=atime, mtime=mtime)
1574

    
1575

    
1576
def WriteSsconfFiles(values):
1577
  """Update all ssconf files.
1578

1579
  Wrapper around the SimpleStore.WriteFiles.
1580

1581
  """
1582
  ssconf.SimpleStore().WriteFiles(values)
1583

    
1584

    
1585
def _ErrnoOrStr(err):
1586
  """Format an EnvironmentError exception.
1587

1588
  If the L{err} argument has an errno attribute, it will be looked up
1589
  and converted into a textual C{E...} description. Otherwise the
1590
  string representation of the error will be returned.
1591

1592
  @type err: L{EnvironmentError}
1593
  @param err: the exception to format
1594

1595
  """
1596
  if hasattr(err, 'errno'):
1597
    detail = errno.errorcode[err.errno]
1598
  else:
1599
    detail = str(err)
1600
  return detail
1601

    
1602

    
1603
def _OSOndiskAPIVersion(name, os_dir):
1604
  """Compute and return the API version of a given OS.
1605

1606
  This function will try to read the API version of the OS given by
1607
  the 'name' parameter and residing in the 'os_dir' directory.
1608

1609
  @type name: str
1610
  @param name: the OS name we should look for
1611
  @type os_dir: str
1612
  @param os_dir: the directory inwhich we should look for the OS
1613
  @rtype: tuple
1614
  @return: tuple (status, data) with status denoting the validity and
1615
      data holding either the vaid versions or an error message
1616

1617
  """
1618
  api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1619

    
1620
  try:
1621
    st = os.stat(api_file)
1622
  except EnvironmentError, err:
1623
    return False, ("Required file '%s' not found under path %s: %s" %
1624
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1625

    
1626
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1627
    return False, ("File '%s' in %s is not a regular file" %
1628
                   (constants.OS_API_FILE, os_dir))
1629

    
1630
  try:
1631
    api_versions = utils.ReadFile(api_file).splitlines()
1632
  except EnvironmentError, err:
1633
    return False, ("Error while reading the API version file at %s: %s" %
1634
                   (api_file, _ErrnoOrStr(err)))
1635

    
1636
  try:
1637
    api_versions = [int(version.strip()) for version in api_versions]
1638
  except (TypeError, ValueError), err:
1639
    return False, ("API version(s) can't be converted to integer: %s" %
1640
                   str(err))
1641

    
1642
  return True, api_versions
1643

    
1644

    
1645
def DiagnoseOS(top_dirs=None):
1646
  """Compute the validity for all OSes.
1647

1648
  @type top_dirs: list
1649
  @param top_dirs: the list of directories in which to
1650
      search (if not given defaults to
1651
      L{constants.OS_SEARCH_PATH})
1652
  @rtype: list of L{objects.OS}
1653
  @return: a list of tuples (name, path, status, diagnose, variants)
1654
      for all (potential) OSes under all search paths, where:
1655
          - name is the (potential) OS name
1656
          - path is the full path to the OS
1657
          - status True/False is the validity of the OS
1658
          - diagnose is the error message for an invalid OS, otherwise empty
1659
          - variants is a list of supported OS variants, if any
1660

1661
  """
1662
  if top_dirs is None:
1663
    top_dirs = constants.OS_SEARCH_PATH
1664

    
1665
  result = []
1666
  for dir_name in top_dirs:
1667
    if os.path.isdir(dir_name):
1668
      try:
1669
        f_names = utils.ListVisibleFiles(dir_name)
1670
      except EnvironmentError, err:
1671
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1672
        break
1673
      for name in f_names:
1674
        os_path = os.path.sep.join([dir_name, name])
1675
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1676
        if status:
1677
          diagnose = ""
1678
          variants = os_inst.supported_variants
1679
        else:
1680
          diagnose = os_inst
1681
          variants = []
1682
        result.append((name, os_path, status, diagnose, variants))
1683

    
1684
  return result
1685

    
1686

    
1687
def _TryOSFromDisk(name, base_dir=None):
1688
  """Create an OS instance from disk.
1689

1690
  This function will return an OS instance if the given name is a
1691
  valid OS name.
1692

1693
  @type base_dir: string
1694
  @keyword base_dir: Base directory containing OS installations.
1695
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1696
  @rtype: tuple
1697
  @return: success and either the OS instance if we find a valid one,
1698
      or error message
1699

1700
  """
1701
  if base_dir is None:
1702
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1703
    if os_dir is None:
1704
      return False, "Directory for OS %s not found in search path" % name
1705
  else:
1706
    os_dir = os.path.sep.join([base_dir, name])
1707

    
1708
  status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1709
  if not status:
1710
    # push the error up
1711
    return status, api_versions
1712

    
1713
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1714
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1715
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1716

    
1717
  # OS Files dictionary, we will populate it with the absolute path names
1718
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1719

    
1720
  if max(api_versions) >= constants.OS_API_V15:
1721
    os_files[constants.OS_VARIANTS_FILE] = ''
1722

    
1723
  for name in os_files:
1724
    os_files[name] = os.path.sep.join([os_dir, name])
1725

    
1726
    try:
1727
      st = os.stat(os_files[name])
1728
    except EnvironmentError, err:
1729
      return False, ("File '%s' under path '%s' is missing (%s)" %
1730
                     (name, os_dir, _ErrnoOrStr(err)))
1731

    
1732
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1733
      return False, ("File '%s' under path '%s' is not a regular file" %
1734
                     (name, os_dir))
1735

    
1736
    if name in constants.OS_SCRIPTS:
1737
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1738
        return False, ("File '%s' under path '%s' is not executable" %
1739
                       (name, os_dir))
1740

    
1741
  variants = None
1742
  if constants.OS_VARIANTS_FILE in os_files:
1743
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1744
    try:
1745
      variants = utils.ReadFile(variants_file).splitlines()
1746
    except EnvironmentError, err:
1747
      return False, ("Error while reading the OS variants file at %s: %s" %
1748
                     (variants_file, _ErrnoOrStr(err)))
1749
    if not variants:
1750
      return False, ("No supported os variant found")
1751

    
1752
  os_obj = objects.OS(name=name, path=os_dir,
1753
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1754
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1755
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1756
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1757
                      supported_variants=variants,
1758
                      api_versions=api_versions)
1759
  return True, os_obj
1760

    
1761

    
1762
def OSFromDisk(name, base_dir=None):
1763
  """Create an OS instance from disk.
1764

1765
  This function will return an OS instance if the given name is a
1766
  valid OS name. Otherwise, it will raise an appropriate
1767
  L{RPCFail} exception, detailing why this is not a valid OS.
1768

1769
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1770
  an exception but returns true/false status data.
1771

1772
  @type base_dir: string
1773
  @keyword base_dir: Base directory containing OS installations.
1774
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1775
  @rtype: L{objects.OS}
1776
  @return: the OS instance if we find a valid one
1777
  @raise RPCFail: if we don't find a valid OS
1778

1779
  """
1780
  name_only = name.split("+", 1)[0]
1781
  status, payload = _TryOSFromDisk(name_only, base_dir)
1782

    
1783
  if not status:
1784
    _Fail(payload)
1785

    
1786
  return payload
1787

    
1788

    
1789
def OSEnvironment(instance, os, debug=0):
1790
  """Calculate the environment for an os script.
1791

1792
  @type instance: L{objects.Instance}
1793
  @param instance: target instance for the os script run
1794
  @type os: L{objects.OS}
1795
  @param os: operating system for which the environment is being built
1796
  @type debug: integer
1797
  @param debug: debug level (0 or 1, for OS Api 10)
1798
  @rtype: dict
1799
  @return: dict of environment variables
1800
  @raise errors.BlockDeviceError: if the block device
1801
      cannot be found
1802

1803
  """
1804
  result = {}
1805
  api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1806
  result['OS_API_VERSION'] = '%d' % api_version
1807
  result['INSTANCE_NAME'] = instance.name
1808
  result['INSTANCE_OS'] = instance.os
1809
  result['HYPERVISOR'] = instance.hypervisor
1810
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1811
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1812
  result['DEBUG_LEVEL'] = '%d' % debug
1813
  if api_version >= constants.OS_API_V15:
1814
    try:
1815
      variant = instance.os.split('+', 1)[1]
1816
    except IndexError:
1817
      variant = os.supported_variants[0]
1818
    result['OS_VARIANT'] = variant
1819
  for idx, disk in enumerate(instance.disks):
1820
    real_disk = _RecursiveFindBD(disk)
1821
    if real_disk is None:
1822
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1823
                                    str(disk))
1824
    real_disk.Open()
1825
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1826
    result['DISK_%d_ACCESS' % idx] = disk.mode
1827
    if constants.HV_DISK_TYPE in instance.hvparams:
1828
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1829
        instance.hvparams[constants.HV_DISK_TYPE]
1830
    if disk.dev_type in constants.LDS_BLOCK:
1831
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1832
    elif disk.dev_type == constants.LD_FILE:
1833
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1834
        'file:%s' % disk.physical_id[0]
1835
  for idx, nic in enumerate(instance.nics):
1836
    result['NIC_%d_MAC' % idx] = nic.mac
1837
    if nic.ip:
1838
      result['NIC_%d_IP' % idx] = nic.ip
1839
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1840
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1841
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1842
    if nic.nicparams[constants.NIC_LINK]:
1843
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1844
    if constants.HV_NIC_TYPE in instance.hvparams:
1845
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1846
        instance.hvparams[constants.HV_NIC_TYPE]
1847

    
1848
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1849
    for key, value in source.items():
1850
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1851

    
1852
  return result
1853

    
1854
def BlockdevGrow(disk, amount):
1855
  """Grow a stack of block devices.
1856

1857
  This function is called recursively, with the childrens being the
1858
  first ones to resize.
1859

1860
  @type disk: L{objects.Disk}
1861
  @param disk: the disk to be grown
1862
  @rtype: (status, result)
1863
  @return: a tuple with the status of the operation
1864
      (True/False), and the errors message if status
1865
      is False
1866

1867
  """
1868
  r_dev = _RecursiveFindBD(disk)
1869
  if r_dev is None:
1870
    _Fail("Cannot find block device %s", disk)
1871

    
1872
  try:
1873
    r_dev.Grow(amount)
1874
  except errors.BlockDeviceError, err:
1875
    _Fail("Failed to grow block device: %s", err, exc=True)
1876

    
1877

    
1878
def BlockdevSnapshot(disk):
1879
  """Create a snapshot copy of a block device.
1880

1881
  This function is called recursively, and the snapshot is actually created
1882
  just for the leaf lvm backend device.
1883

1884
  @type disk: L{objects.Disk}
1885
  @param disk: the disk to be snapshotted
1886
  @rtype: string
1887
  @return: snapshot disk path
1888

1889
  """
1890
  if disk.children:
1891
    if len(disk.children) == 1:
1892
      # only one child, let's recurse on it
1893
      return BlockdevSnapshot(disk.children[0])
1894
    else:
1895
      # more than one child, choose one that matches
1896
      for child in disk.children:
1897
        if child.size == disk.size:
1898
          # return implies breaking the loop
1899
          return BlockdevSnapshot(child)
1900
  elif disk.dev_type == constants.LD_LV:
1901
    r_dev = _RecursiveFindBD(disk)
1902
    if r_dev is not None:
1903
      # let's stay on the safe side and ask for the full size, for now
1904
      return r_dev.Snapshot(disk.size)
1905
    else:
1906
      _Fail("Cannot find block device %s", disk)
1907
  else:
1908
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1909
          disk.unique_id, disk.dev_type)
1910

    
1911

    
1912
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1913
  """Export a block device snapshot to a remote node.
1914

1915
  @type disk: L{objects.Disk}
1916
  @param disk: the description of the disk to export
1917
  @type dest_node: str
1918
  @param dest_node: the destination node to export to
1919
  @type instance: L{objects.Instance}
1920
  @param instance: the instance object to whom the disk belongs
1921
  @type cluster_name: str
1922
  @param cluster_name: the cluster name, needed for SSH hostalias
1923
  @type idx: int
1924
  @param idx: the index of the disk in the instance's disk list,
1925
      used to export to the OS scripts environment
1926
  @rtype: None
1927

1928
  """
1929
  inst_os = OSFromDisk(instance.os)
1930
  export_env = OSEnvironment(instance, inst_os)
1931

    
1932
  export_script = inst_os.export_script
1933

    
1934
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1935
                                     instance.name, int(time.time()))
1936
  if not os.path.exists(constants.LOG_OS_DIR):
1937
    os.mkdir(constants.LOG_OS_DIR, 0750)
1938
  real_disk = _RecursiveFindBD(disk)
1939
  if real_disk is None:
1940
    _Fail("Block device '%s' is not set up", disk)
1941

    
1942
  real_disk.Open()
1943

    
1944
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1945
  export_env['EXPORT_INDEX'] = str(idx)
1946

    
1947
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1948
  destfile = disk.physical_id[1]
1949

    
1950
  # the target command is built out of three individual commands,
1951
  # which are joined by pipes; we check each individual command for
1952
  # valid parameters
1953
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1954
                               inst_os.path, export_script, logfile)
1955

    
1956
  comprcmd = "gzip"
1957

    
1958
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1959
                                destdir, destdir, destfile)
1960
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1961
                                                   constants.GANETI_RUNAS,
1962
                                                   destcmd)
1963

    
1964
  # all commands have been checked, so we're safe to combine them
1965
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1966

    
1967
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1968

    
1969
  if result.failed:
1970
    _Fail("OS snapshot export command '%s' returned error: %s"
1971
          " output: %s", command, result.fail_reason, result.output)
1972

    
1973

    
1974
def FinalizeExport(instance, snap_disks):
1975
  """Write out the export configuration information.
1976

1977
  @type instance: L{objects.Instance}
1978
  @param instance: the instance which we export, used for
1979
      saving configuration
1980
  @type snap_disks: list of L{objects.Disk}
1981
  @param snap_disks: list of snapshot block devices, which
1982
      will be used to get the actual name of the dump file
1983

1984
  @rtype: None
1985

1986
  """
1987
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1988
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1989

    
1990
  config = objects.SerializableConfigParser()
1991

    
1992
  config.add_section(constants.INISECT_EXP)
1993
  config.set(constants.INISECT_EXP, 'version', '0')
1994
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
1995
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
1996
  config.set(constants.INISECT_EXP, 'os', instance.os)
1997
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
1998

    
1999
  config.add_section(constants.INISECT_INS)
2000
  config.set(constants.INISECT_INS, 'name', instance.name)
2001
  config.set(constants.INISECT_INS, 'memory', '%d' %
2002
             instance.beparams[constants.BE_MEMORY])
2003
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2004
             instance.beparams[constants.BE_VCPUS])
2005
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2006

    
2007
  nic_total = 0
2008
  for nic_count, nic in enumerate(instance.nics):
2009
    nic_total += 1
2010
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2011
               nic_count, '%s' % nic.mac)
2012
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2013
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2014
               '%s' % nic.bridge)
2015
  # TODO: redundant: on load can read nics until it doesn't exist
2016
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2017

    
2018
  disk_total = 0
2019
  for disk_count, disk in enumerate(snap_disks):
2020
    if disk:
2021
      disk_total += 1
2022
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2023
                 ('%s' % disk.iv_name))
2024
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2025
                 ('%s' % disk.physical_id[1]))
2026
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2027
                 ('%d' % disk.size))
2028

    
2029
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2030

    
2031
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2032
                  data=config.Dumps())
2033
  shutil.rmtree(finaldestdir, True)
2034
  shutil.move(destdir, finaldestdir)
2035

    
2036

    
2037
def ExportInfo(dest):
2038
  """Get export configuration information.
2039

2040
  @type dest: str
2041
  @param dest: directory containing the export
2042

2043
  @rtype: L{objects.SerializableConfigParser}
2044
  @return: a serializable config file containing the
2045
      export info
2046

2047
  """
2048
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2049

    
2050
  config = objects.SerializableConfigParser()
2051
  config.read(cff)
2052

    
2053
  if (not config.has_section(constants.INISECT_EXP) or
2054
      not config.has_section(constants.INISECT_INS)):
2055
    _Fail("Export info file doesn't have the required fields")
2056

    
2057
  return config.Dumps()
2058

    
2059

    
2060
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2061
  """Import an os image into an instance.
2062

2063
  @type instance: L{objects.Instance}
2064
  @param instance: instance to import the disks into
2065
  @type src_node: string
2066
  @param src_node: source node for the disk images
2067
  @type src_images: list of string
2068
  @param src_images: absolute paths of the disk images
2069
  @rtype: list of boolean
2070
  @return: each boolean represent the success of importing the n-th disk
2071

2072
  """
2073
  inst_os = OSFromDisk(instance.os)
2074
  import_env = OSEnvironment(instance, inst_os)
2075
  import_script = inst_os.import_script
2076

    
2077
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2078
                                        instance.name, int(time.time()))
2079
  if not os.path.exists(constants.LOG_OS_DIR):
2080
    os.mkdir(constants.LOG_OS_DIR, 0750)
2081

    
2082
  comprcmd = "gunzip"
2083
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2084
                               import_script, logfile)
2085

    
2086
  final_result = []
2087
  for idx, image in enumerate(src_images):
2088
    if image:
2089
      destcmd = utils.BuildShellCmd('cat %s', image)
2090
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2091
                                                       constants.GANETI_RUNAS,
2092
                                                       destcmd)
2093
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2094
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2095
      import_env['IMPORT_INDEX'] = str(idx)
2096
      result = utils.RunCmd(command, env=import_env)
2097
      if result.failed:
2098
        logging.error("Disk import command '%s' returned error: %s"
2099
                      " output: %s", command, result.fail_reason,
2100
                      result.output)
2101
        final_result.append("error importing disk %d: %s, %s" %
2102
                            (idx, result.fail_reason, result.output[-100]))
2103

    
2104
  if final_result:
2105
    _Fail("; ".join(final_result), log=False)
2106

    
2107

    
2108
def ListExports():
2109
  """Return a list of exports currently available on this machine.
2110

2111
  @rtype: list
2112
  @return: list of the exports
2113

2114
  """
2115
  if os.path.isdir(constants.EXPORT_DIR):
2116
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2117
  else:
2118
    _Fail("No exports directory")
2119

    
2120

    
2121
def RemoveExport(export):
2122
  """Remove an existing export from the node.
2123

2124
  @type export: str
2125
  @param export: the name of the export to remove
2126
  @rtype: None
2127

2128
  """
2129
  target = os.path.join(constants.EXPORT_DIR, export)
2130

    
2131
  try:
2132
    shutil.rmtree(target)
2133
  except EnvironmentError, err:
2134
    _Fail("Error while removing the export: %s", err, exc=True)
2135

    
2136

    
2137
def BlockdevRename(devlist):
2138
  """Rename a list of block devices.
2139

2140
  @type devlist: list of tuples
2141
  @param devlist: list of tuples of the form  (disk,
2142
      new_logical_id, new_physical_id); disk is an
2143
      L{objects.Disk} object describing the current disk,
2144
      and new logical_id/physical_id is the name we
2145
      rename it to
2146
  @rtype: boolean
2147
  @return: True if all renames succeeded, False otherwise
2148

2149
  """
2150
  msgs = []
2151
  result = True
2152
  for disk, unique_id in devlist:
2153
    dev = _RecursiveFindBD(disk)
2154
    if dev is None:
2155
      msgs.append("Can't find device %s in rename" % str(disk))
2156
      result = False
2157
      continue
2158
    try:
2159
      old_rpath = dev.dev_path
2160
      dev.Rename(unique_id)
2161
      new_rpath = dev.dev_path
2162
      if old_rpath != new_rpath:
2163
        DevCacheManager.RemoveCache(old_rpath)
2164
        # FIXME: we should add the new cache information here, like:
2165
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2166
        # but we don't have the owner here - maybe parse from existing
2167
        # cache? for now, we only lose lvm data when we rename, which
2168
        # is less critical than DRBD or MD
2169
    except errors.BlockDeviceError, err:
2170
      msgs.append("Can't rename device '%s' to '%s': %s" %
2171
                  (dev, unique_id, err))
2172
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2173
      result = False
2174
  if not result:
2175
    _Fail("; ".join(msgs))
2176

    
2177

    
2178
def _TransformFileStorageDir(file_storage_dir):
2179
  """Checks whether given file_storage_dir is valid.
2180

2181
  Checks wheter the given file_storage_dir is within the cluster-wide
2182
  default file_storage_dir stored in SimpleStore. Only paths under that
2183
  directory are allowed.
2184

2185
  @type file_storage_dir: str
2186
  @param file_storage_dir: the path to check
2187

2188
  @return: the normalized path if valid, None otherwise
2189

2190
  """
2191
  cfg = _GetConfig()
2192
  file_storage_dir = os.path.normpath(file_storage_dir)
2193
  base_file_storage_dir = cfg.GetFileStorageDir()
2194
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2195
      base_file_storage_dir):
2196
    _Fail("File storage directory '%s' is not under base file"
2197
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2198
  return file_storage_dir
2199

    
2200

    
2201
def CreateFileStorageDir(file_storage_dir):
2202
  """Create file storage directory.
2203

2204
  @type file_storage_dir: str
2205
  @param file_storage_dir: directory to create
2206

2207
  @rtype: tuple
2208
  @return: tuple with first element a boolean indicating wheter dir
2209
      creation was successful or not
2210

2211
  """
2212
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2213
  if os.path.exists(file_storage_dir):
2214
    if not os.path.isdir(file_storage_dir):
2215
      _Fail("Specified storage dir '%s' is not a directory",
2216
            file_storage_dir)
2217
  else:
2218
    try:
2219
      os.makedirs(file_storage_dir, 0750)
2220
    except OSError, err:
2221
      _Fail("Cannot create file storage directory '%s': %s",
2222
            file_storage_dir, err, exc=True)
2223

    
2224

    
2225
def RemoveFileStorageDir(file_storage_dir):
2226
  """Remove file storage directory.
2227

2228
  Remove it only if it's empty. If not log an error and return.
2229

2230
  @type file_storage_dir: str
2231
  @param file_storage_dir: the directory we should cleanup
2232
  @rtype: tuple (success,)
2233
  @return: tuple of one element, C{success}, denoting
2234
      whether the operation was successful
2235

2236
  """
2237
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2238
  if os.path.exists(file_storage_dir):
2239
    if not os.path.isdir(file_storage_dir):
2240
      _Fail("Specified Storage directory '%s' is not a directory",
2241
            file_storage_dir)
2242
    # deletes dir only if empty, otherwise we want to fail the rpc call
2243
    try:
2244
      os.rmdir(file_storage_dir)
2245
    except OSError, err:
2246
      _Fail("Cannot remove file storage directory '%s': %s",
2247
            file_storage_dir, err)
2248

    
2249

    
2250
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2251
  """Rename the file storage directory.
2252

2253
  @type old_file_storage_dir: str
2254
  @param old_file_storage_dir: the current path
2255
  @type new_file_storage_dir: str
2256
  @param new_file_storage_dir: the name we should rename to
2257
  @rtype: tuple (success,)
2258
  @return: tuple of one element, C{success}, denoting
2259
      whether the operation was successful
2260

2261
  """
2262
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2263
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2264
  if not os.path.exists(new_file_storage_dir):
2265
    if os.path.isdir(old_file_storage_dir):
2266
      try:
2267
        os.rename(old_file_storage_dir, new_file_storage_dir)
2268
      except OSError, err:
2269
        _Fail("Cannot rename '%s' to '%s': %s",
2270
              old_file_storage_dir, new_file_storage_dir, err)
2271
    else:
2272
      _Fail("Specified storage dir '%s' is not a directory",
2273
            old_file_storage_dir)
2274
  else:
2275
    if os.path.exists(old_file_storage_dir):
2276
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2277
            old_file_storage_dir, new_file_storage_dir)
2278

    
2279

    
2280
def _EnsureJobQueueFile(file_name):
2281
  """Checks whether the given filename is in the queue directory.
2282

2283
  @type file_name: str
2284
  @param file_name: the file name we should check
2285
  @rtype: None
2286
  @raises RPCFail: if the file is not valid
2287

2288
  """
2289
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2290
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2291

    
2292
  if not result:
2293
    _Fail("Passed job queue file '%s' does not belong to"
2294
          " the queue directory '%s'", file_name, queue_dir)
2295

    
2296

    
2297
def JobQueueUpdate(file_name, content):
2298
  """Updates a file in the queue directory.
2299

2300
  This is just a wrapper over L{utils.WriteFile}, with proper
2301
  checking.
2302

2303
  @type file_name: str
2304
  @param file_name: the job file name
2305
  @type content: str
2306
  @param content: the new job contents
2307
  @rtype: boolean
2308
  @return: the success of the operation
2309

2310
  """
2311
  _EnsureJobQueueFile(file_name)
2312

    
2313
  # Write and replace the file atomically
2314
  utils.WriteFile(file_name, data=_Decompress(content))
2315

    
2316

    
2317
def JobQueueRename(old, new):
2318
  """Renames a job queue file.
2319

2320
  This is just a wrapper over os.rename with proper checking.
2321

2322
  @type old: str
2323
  @param old: the old (actual) file name
2324
  @type new: str
2325
  @param new: the desired file name
2326
  @rtype: tuple
2327
  @return: the success of the operation and payload
2328

2329
  """
2330
  _EnsureJobQueueFile(old)
2331
  _EnsureJobQueueFile(new)
2332

    
2333
  utils.RenameFile(old, new, mkdir=True)
2334

    
2335

    
2336
def JobQueueSetDrainFlag(drain_flag):
2337
  """Set the drain flag for the queue.
2338

2339
  This will set or unset the queue drain flag.
2340

2341
  @type drain_flag: boolean
2342
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2343
  @rtype: truple
2344
  @return: always True, None
2345
  @warning: the function always returns True
2346

2347
  """
2348
  if drain_flag:
2349
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2350
  else:
2351
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2352

    
2353

    
2354
def BlockdevClose(instance_name, disks):
2355
  """Closes the given block devices.
2356

2357
  This means they will be switched to secondary mode (in case of
2358
  DRBD).
2359

2360
  @param instance_name: if the argument is not empty, the symlinks
2361
      of this instance will be removed
2362
  @type disks: list of L{objects.Disk}
2363
  @param disks: the list of disks to be closed
2364
  @rtype: tuple (success, message)
2365
  @return: a tuple of success and message, where success
2366
      indicates the succes of the operation, and message
2367
      which will contain the error details in case we
2368
      failed
2369

2370
  """
2371
  bdevs = []
2372
  for cf in disks:
2373
    rd = _RecursiveFindBD(cf)
2374
    if rd is None:
2375
      _Fail("Can't find device %s", cf)
2376
    bdevs.append(rd)
2377

    
2378
  msg = []
2379
  for rd in bdevs:
2380
    try:
2381
      rd.Close()
2382
    except errors.BlockDeviceError, err:
2383
      msg.append(str(err))
2384
  if msg:
2385
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2386
  else:
2387
    if instance_name:
2388
      _RemoveBlockDevLinks(instance_name, disks)
2389

    
2390

    
2391
def ValidateHVParams(hvname, hvparams):
2392
  """Validates the given hypervisor parameters.
2393

2394
  @type hvname: string
2395
  @param hvname: the hypervisor name
2396
  @type hvparams: dict
2397
  @param hvparams: the hypervisor parameters to be validated
2398
  @rtype: None
2399

2400
  """
2401
  try:
2402
    hv_type = hypervisor.GetHypervisor(hvname)
2403
    hv_type.ValidateParameters(hvparams)
2404
  except errors.HypervisorError, err:
2405
    _Fail(str(err), log=False)
2406

    
2407

    
2408
def DemoteFromMC():
2409
  """Demotes the current node from master candidate role.
2410

2411
  """
2412
  # try to ensure we're not the master by mistake
2413
  master, myself = ssconf.GetMasterAndMyself()
2414
  if master == myself:
2415
    _Fail("ssconf status shows I'm the master node, will not demote")
2416
  pid_file = utils.DaemonPidFileName(constants.MASTERD)
2417
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2418
    _Fail("The master daemon is running, will not demote")
2419
  try:
2420
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2421
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2422
  except EnvironmentError, err:
2423
    if err.errno != errno.ENOENT:
2424
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2425
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2426

    
2427

    
2428
def _FindDisks(nodes_ip, disks):
2429
  """Sets the physical ID on disks and returns the block devices.
2430

2431
  """
2432
  # set the correct physical ID
2433
  my_name = utils.HostInfo().name
2434
  for cf in disks:
2435
    cf.SetPhysicalID(my_name, nodes_ip)
2436

    
2437
  bdevs = []
2438

    
2439
  for cf in disks:
2440
    rd = _RecursiveFindBD(cf)
2441
    if rd is None:
2442
      _Fail("Can't find device %s", cf)
2443
    bdevs.append(rd)
2444
  return bdevs
2445

    
2446

    
2447
def DrbdDisconnectNet(nodes_ip, disks):
2448
  """Disconnects the network on a list of drbd devices.
2449

2450
  """
2451
  bdevs = _FindDisks(nodes_ip, disks)
2452

    
2453
  # disconnect disks
2454
  for rd in bdevs:
2455
    try:
2456
      rd.DisconnectNet()
2457
    except errors.BlockDeviceError, err:
2458
      _Fail("Can't change network configuration to standalone mode: %s",
2459
            err, exc=True)
2460

    
2461

    
2462
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2463
  """Attaches the network on a list of drbd devices.
2464

2465
  """
2466
  bdevs = _FindDisks(nodes_ip, disks)
2467

    
2468
  if multimaster:
2469
    for idx, rd in enumerate(bdevs):
2470
      try:
2471
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2472
      except EnvironmentError, err:
2473
        _Fail("Can't create symlink: %s", err)
2474
  # reconnect disks, switch to new master configuration and if
2475
  # needed primary mode
2476
  for rd in bdevs:
2477
    try:
2478
      rd.AttachNet(multimaster)
2479
    except errors.BlockDeviceError, err:
2480
      _Fail("Can't change network configuration: %s", err)
2481
  # wait until the disks are connected; we need to retry the re-attach
2482
  # if the device becomes standalone, as this might happen if the one
2483
  # node disconnects and reconnects in a different mode before the
2484
  # other node reconnects; in this case, one or both of the nodes will
2485
  # decide it has wrong configuration and switch to standalone
2486
  RECONNECT_TIMEOUT = 2 * 60
2487
  sleep_time = 0.100 # start with 100 miliseconds
2488
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2489
  while time.time() < timeout_limit:
2490
    all_connected = True
2491
    for rd in bdevs:
2492
      stats = rd.GetProcStatus()
2493
      if not (stats.is_connected or stats.is_in_resync):
2494
        all_connected = False
2495
      if stats.is_standalone:
2496
        # peer had different config info and this node became
2497
        # standalone, even though this should not happen with the
2498
        # new staged way of changing disk configs
2499
        try:
2500
          rd.AttachNet(multimaster)
2501
        except errors.BlockDeviceError, err:
2502
          _Fail("Can't change network configuration: %s", err)
2503
    if all_connected:
2504
      break
2505
    time.sleep(sleep_time)
2506
    sleep_time = min(5, sleep_time * 1.5)
2507
  if not all_connected:
2508
    _Fail("Timeout in disk reconnecting")
2509
  if multimaster:
2510
    # change to primary mode
2511
    for rd in bdevs:
2512
      try:
2513
        rd.Open()
2514
      except errors.BlockDeviceError, err:
2515
        _Fail("Can't change to primary mode: %s", err)
2516

    
2517

    
2518
def DrbdWaitSync(nodes_ip, disks):
2519
  """Wait until DRBDs have synchronized.
2520

2521
  """
2522
  bdevs = _FindDisks(nodes_ip, disks)
2523

    
2524
  min_resync = 100
2525
  alldone = True
2526
  for rd in bdevs:
2527
    stats = rd.GetProcStatus()
2528
    if not (stats.is_connected or stats.is_in_resync):
2529
      _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2530
    alldone = alldone and (not stats.is_in_resync)
2531
    if stats.sync_percent is not None:
2532
      min_resync = min(min_resync, stats.sync_percent)
2533

    
2534
  return (alldone, min_resync)
2535

    
2536

    
2537
def PowercycleNode(hypervisor_type):
2538
  """Hard-powercycle the node.
2539

2540
  Because we need to return first, and schedule the powercycle in the
2541
  background, we won't be able to report failures nicely.
2542

2543
  """
2544
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2545
  try:
2546
    pid = os.fork()
2547
  except OSError:
2548
    # if we can't fork, we'll pretend that we're in the child process
2549
    pid = 0
2550
  if pid > 0:
2551
    return "Reboot scheduled in 5 seconds"
2552
  time.sleep(5)
2553
  hyper.PowercycleNode()
2554

    
2555

    
2556
class HooksRunner(object):
2557
  """Hook runner.
2558

2559
  This class is instantiated on the node side (ganeti-noded) and not
2560
  on the master side.
2561

2562
  """
2563
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2564

    
2565
  def __init__(self, hooks_base_dir=None):
2566
    """Constructor for hooks runner.
2567

2568
    @type hooks_base_dir: str or None
2569
    @param hooks_base_dir: if not None, this overrides the
2570
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2571

2572
    """
2573
    if hooks_base_dir is None:
2574
      hooks_base_dir = constants.HOOKS_BASE_DIR
2575
    self._BASE_DIR = hooks_base_dir
2576

    
2577
  @staticmethod
2578
  def ExecHook(script, env):
2579
    """Exec one hook script.
2580

2581
    @type script: str
2582
    @param script: the full path to the script
2583
    @type env: dict
2584
    @param env: the environment with which to exec the script
2585
    @rtype: tuple (success, message)
2586
    @return: a tuple of success and message, where success
2587
        indicates the succes of the operation, and message
2588
        which will contain the error details in case we
2589
        failed
2590

2591
    """
2592
    # exec the process using subprocess and log the output
2593
    fdstdin = None
2594
    try:
2595
      fdstdin = open("/dev/null", "r")
2596
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2597
                               stderr=subprocess.STDOUT, close_fds=True,
2598
                               shell=False, cwd="/", env=env)
2599
      output = ""
2600
      try:
2601
        output = child.stdout.read(4096)
2602
        child.stdout.close()
2603
      except EnvironmentError, err:
2604
        output += "Hook script error: %s" % str(err)
2605

    
2606
      while True:
2607
        try:
2608
          result = child.wait()
2609
          break
2610
        except EnvironmentError, err:
2611
          if err.errno == errno.EINTR:
2612
            continue
2613
          raise
2614
    finally:
2615
      # try not to leak fds
2616
      for fd in (fdstdin, ):
2617
        if fd is not None:
2618
          try:
2619
            fd.close()
2620
          except EnvironmentError, err:
2621
            # just log the error
2622
            #logging.exception("Error while closing fd %s", fd)
2623
            pass
2624

    
2625
    return result == 0, utils.SafeEncode(output.strip())
2626

    
2627
  def RunHooks(self, hpath, phase, env):
2628
    """Run the scripts in the hooks directory.
2629

2630
    @type hpath: str
2631
    @param hpath: the path to the hooks directory which
2632
        holds the scripts
2633
    @type phase: str
2634
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2635
        L{constants.HOOKS_PHASE_POST}
2636
    @type env: dict
2637
    @param env: dictionary with the environment for the hook
2638
    @rtype: list
2639
    @return: list of 3-element tuples:
2640
      - script path
2641
      - script result, either L{constants.HKR_SUCCESS} or
2642
        L{constants.HKR_FAIL}
2643
      - output of the script
2644

2645
    @raise errors.ProgrammerError: for invalid input
2646
        parameters
2647

2648
    """
2649
    if phase == constants.HOOKS_PHASE_PRE:
2650
      suffix = "pre"
2651
    elif phase == constants.HOOKS_PHASE_POST:
2652
      suffix = "post"
2653
    else:
2654
      _Fail("Unknown hooks phase '%s'", phase)
2655

    
2656
    rr = []
2657

    
2658
    subdir = "%s-%s.d" % (hpath, suffix)
2659
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2660
    try:
2661
      dir_contents = utils.ListVisibleFiles(dir_name)
2662
    except OSError:
2663
      # FIXME: must log output in case of failures
2664
      return rr
2665

    
2666
    # we use the standard python sort order,
2667
    # so 00name is the recommended naming scheme
2668
    dir_contents.sort()
2669
    for relname in dir_contents:
2670
      fname = os.path.join(dir_name, relname)
2671
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2672
          self.RE_MASK.match(relname) is not None):
2673
        rrval = constants.HKR_SKIP
2674
        output = ""
2675
      else:
2676
        result, output = self.ExecHook(fname, env)
2677
        if not result:
2678
          rrval = constants.HKR_FAIL
2679
        else:
2680
          rrval = constants.HKR_SUCCESS
2681
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2682

    
2683
    return rr
2684

    
2685

    
2686
class IAllocatorRunner(object):
2687
  """IAllocator runner.
2688

2689
  This class is instantiated on the node side (ganeti-noded) and not on
2690
  the master side.
2691

2692
  """
2693
  def Run(self, name, idata):
2694
    """Run an iallocator script.
2695

2696
    @type name: str
2697
    @param name: the iallocator script name
2698
    @type idata: str
2699
    @param idata: the allocator input data
2700

2701
    @rtype: tuple
2702
    @return: two element tuple of:
2703
       - status
2704
       - either error message or stdout of allocator (for success)
2705

2706
    """
2707
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2708
                                  os.path.isfile)
2709
    if alloc_script is None:
2710
      _Fail("iallocator module '%s' not found in the search path", name)
2711

    
2712
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2713
    try:
2714
      os.write(fd, idata)
2715
      os.close(fd)
2716
      result = utils.RunCmd([alloc_script, fin_name])
2717
      if result.failed:
2718
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2719
              name, result.fail_reason, result.output)
2720
    finally:
2721
      os.unlink(fin_name)
2722

    
2723
    return result.stdout
2724

    
2725

    
2726
class DevCacheManager(object):
2727
  """Simple class for managing a cache of block device information.
2728

2729
  """
2730
  _DEV_PREFIX = "/dev/"
2731
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2732

    
2733
  @classmethod
2734
  def _ConvertPath(cls, dev_path):
2735
    """Converts a /dev/name path to the cache file name.
2736

2737
    This replaces slashes with underscores and strips the /dev
2738
    prefix. It then returns the full path to the cache file.
2739

2740
    @type dev_path: str
2741
    @param dev_path: the C{/dev/} path name
2742
    @rtype: str
2743
    @return: the converted path name
2744

2745
    """
2746
    if dev_path.startswith(cls._DEV_PREFIX):
2747
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2748
    dev_path = dev_path.replace("/", "_")
2749
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2750
    return fpath
2751

    
2752
  @classmethod
2753
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2754
    """Updates the cache information for a given device.
2755

2756
    @type dev_path: str
2757
    @param dev_path: the pathname of the device
2758
    @type owner: str
2759
    @param owner: the owner (instance name) of the device
2760
    @type on_primary: bool
2761
    @param on_primary: whether this is the primary
2762
        node nor not
2763
    @type iv_name: str
2764
    @param iv_name: the instance-visible name of the
2765
        device, as in objects.Disk.iv_name
2766

2767
    @rtype: None
2768

2769
    """
2770
    if dev_path is None:
2771
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2772
      return
2773
    fpath = cls._ConvertPath(dev_path)
2774
    if on_primary:
2775
      state = "primary"
2776
    else:
2777
      state = "secondary"
2778
    if iv_name is None:
2779
      iv_name = "not_visible"
2780
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2781
    try:
2782
      utils.WriteFile(fpath, data=fdata)
2783
    except EnvironmentError, err:
2784
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2785

    
2786
  @classmethod
2787
  def RemoveCache(cls, dev_path):
2788
    """Remove data for a dev_path.
2789

2790
    This is just a wrapper over L{utils.RemoveFile} with a converted
2791
    path name and logging.
2792

2793
    @type dev_path: str
2794
    @param dev_path: the pathname of the device
2795

2796
    @rtype: None
2797

2798
    """
2799
    if dev_path is None:
2800
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2801
      return
2802
    fpath = cls._ConvertPath(dev_path)
2803
    try:
2804
      utils.RemoveFile(fpath)
2805
    except EnvironmentError, err:
2806
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)