Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ db8667b7

History | View | Annotate | Download (86.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55

    
56

    
57
class RPCFail(Exception):
58
  """Class denoting RPC failure.
59

60
  Its argument is the error message.
61

62
  """
63

    
64

    
65
def _Fail(msg, *args, **kwargs):
66
  """Log an error and the raise an RPCFail exception.
67

68
  This exception is then handled specially in the ganeti daemon and
69
  turned into a 'failed' return type. As such, this function is a
70
  useful shortcut for logging the error and returning it to the master
71
  daemon.
72

73
  @type msg: string
74
  @param msg: the text of the exception
75
  @raise RPCFail
76

77
  """
78
  if args:
79
    msg = msg % args
80
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
81
    if "exc" in kwargs and kwargs["exc"]:
82
      logging.exception(msg)
83
    else:
84
      logging.error(msg)
85
  raise RPCFail(msg)
86

    
87

    
88
def _GetConfig():
89
  """Simple wrapper to return a SimpleStore.
90

91
  @rtype: L{ssconf.SimpleStore}
92
  @return: a SimpleStore instance
93

94
  """
95
  return ssconf.SimpleStore()
96

    
97

    
98
def _GetSshRunner(cluster_name):
99
  """Simple wrapper to return an SshRunner.
100

101
  @type cluster_name: str
102
  @param cluster_name: the cluster name, which is needed
103
      by the SshRunner constructor
104
  @rtype: L{ssh.SshRunner}
105
  @return: an SshRunner instance
106

107
  """
108
  return ssh.SshRunner(cluster_name)
109

    
110

    
111
def _Decompress(data):
112
  """Unpacks data compressed by the RPC client.
113

114
  @type data: list or tuple
115
  @param data: Data sent by RPC client
116
  @rtype: str
117
  @return: Decompressed data
118

119
  """
120
  assert isinstance(data, (list, tuple))
121
  assert len(data) == 2
122
  (encoding, content) = data
123
  if encoding == constants.RPC_ENCODING_NONE:
124
    return content
125
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126
    return zlib.decompress(base64.b64decode(content))
127
  else:
128
    raise AssertionError("Unknown data encoding")
129

    
130

    
131
def _CleanDirectory(path, exclude=None):
132
  """Removes all regular files in a directory.
133

134
  @type path: str
135
  @param path: the directory to clean
136
  @type exclude: list
137
  @param exclude: list of files to be excluded, defaults
138
      to the empty list
139

140
  """
141
  if not os.path.isdir(path):
142
    return
143
  if exclude is None:
144
    exclude = []
145
  else:
146
    # Normalize excluded paths
147
    exclude = [os.path.normpath(i) for i in exclude]
148

    
149
  for rel_name in utils.ListVisibleFiles(path):
150
    full_name = os.path.normpath(os.path.join(path, rel_name))
151
    if full_name in exclude:
152
      continue
153
    if os.path.isfile(full_name) and not os.path.islink(full_name):
154
      utils.RemoveFile(full_name)
155

    
156

    
157
def _BuildUploadFileList():
158
  """Build the list of allowed upload files.
159

160
  This is abstracted so that it's built only once at module import time.
161

162
  """
163
  allowed_files = set([
164
    constants.CLUSTER_CONF_FILE,
165
    constants.ETC_HOSTS,
166
    constants.SSH_KNOWN_HOSTS_FILE,
167
    constants.VNC_PASSWORD_FILE,
168
    constants.RAPI_CERT_FILE,
169
    constants.RAPI_USERS_FILE,
170
    constants.HMAC_CLUSTER_KEY,
171
    ])
172

    
173
  for hv_name in constants.HYPER_TYPES:
174
    hv_class = hypervisor.GetHypervisorClass(hv_name)
175
    allowed_files.update(hv_class.GetAncillaryFiles())
176

    
177
  return frozenset(allowed_files)
178

    
179

    
180
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181

    
182

    
183
def JobQueuePurge():
184
  """Removes job queue files and archived jobs.
185

186
  @rtype: tuple
187
  @return: True, None
188

189
  """
190
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192

    
193

    
194
def GetMasterInfo():
195
  """Returns master information.
196

197
  This is an utility function to compute master information, either
198
  for consumption here or from the node daemon.
199

200
  @rtype: tuple
201
  @return: master_netdev, master_ip, master_name
202
  @raise RPCFail: in case of errors
203

204
  """
205
  try:
206
    cfg = _GetConfig()
207
    master_netdev = cfg.GetMasterNetdev()
208
    master_ip = cfg.GetMasterIP()
209
    master_node = cfg.GetMasterNode()
210
  except errors.ConfigurationError, err:
211
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
212
  return (master_netdev, master_ip, master_node)
213

    
214

    
215
def StartMaster(start_daemons, no_voting):
216
  """Activate local node as master node.
217

218
  The function will always try activate the IP address of the master
219
  (unless someone else has it). It will also start the master daemons,
220
  based on the start_daemons parameter.
221

222
  @type start_daemons: boolean
223
  @param start_daemons: whether to also start the master
224
      daemons (ganeti-masterd and ganeti-rapi)
225
  @type no_voting: boolean
226
  @param no_voting: whether to start ganeti-masterd without a node vote
227
      (if start_daemons is True), but still non-interactively
228
  @rtype: None
229

230
  """
231
  # GetMasterInfo will raise an exception if not able to return data
232
  master_netdev, master_ip, _ = GetMasterInfo()
233

    
234
  err_msgs = []
235
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236
    if utils.OwnIpAddress(master_ip):
237
      # we already have the ip:
238
      logging.debug("Master IP already configured, doing nothing")
239
    else:
240
      msg = "Someone else has the master ip, not activating"
241
      logging.error(msg)
242
      err_msgs.append(msg)
243
  else:
244
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245
                           "dev", master_netdev, "label",
246
                           "%s:0" % master_netdev])
247
    if result.failed:
248
      msg = "Can't activate master IP: %s" % result.output
249
      logging.error(msg)
250
      err_msgs.append(msg)
251

    
252
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253
                           "-s", master_ip, master_ip])
254
    # we'll ignore the exit code of arping
255

    
256
  # and now start the master and rapi daemons
257
  if start_daemons:
258
    daemons_params = {
259
        'ganeti-masterd': [],
260
        'ganeti-rapi': [],
261
        }
262
    if no_voting:
263
      daemons_params['ganeti-masterd'].append('--no-voting')
264
      daemons_params['ganeti-masterd'].append('--yes-do-it')
265
    for daemon in daemons_params:
266
      cmd = [daemon]
267
      cmd.extend(daemons_params[daemon])
268
      result = utils.RunCmd(cmd)
269
      if result.failed:
270
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
271
        logging.error(msg)
272
        err_msgs.append(msg)
273

    
274
  if err_msgs:
275
    _Fail("; ".join(err_msgs))
276

    
277

    
278
def StopMaster(stop_daemons):
279
  """Deactivate this node as master.
280

281
  The function will always try to deactivate the IP address of the
282
  master. It will also stop the master daemons depending on the
283
  stop_daemons parameter.
284

285
  @type stop_daemons: boolean
286
  @param stop_daemons: whether to also stop the master daemons
287
      (ganeti-masterd and ganeti-rapi)
288
  @rtype: None
289

290
  """
291
  # TODO: log and report back to the caller the error failures; we
292
  # need to decide in which case we fail the RPC for this
293

    
294
  # GetMasterInfo will raise an exception if not able to return data
295
  master_netdev, master_ip, _ = GetMasterInfo()
296

    
297
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298
                         "dev", master_netdev])
299
  if result.failed:
300
    logging.error("Can't remove the master IP, error: %s", result.output)
301
    # but otherwise ignore the failure
302

    
303
  if stop_daemons:
304
    # stop/kill the rapi and the master daemon
305
    for daemon in constants.RAPI, constants.MASTERD:
306
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307

    
308

    
309
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310
  """Joins this node to the cluster.
311

312
  This does the following:
313
      - updates the hostkeys of the machine (rsa and dsa)
314
      - adds the ssh private key to the user
315
      - adds the ssh public key to the users' authorized_keys file
316

317
  @type dsa: str
318
  @param dsa: the DSA private key to write
319
  @type dsapub: str
320
  @param dsapub: the DSA public key to write
321
  @type rsa: str
322
  @param rsa: the RSA private key to write
323
  @type rsapub: str
324
  @param rsapub: the RSA public key to write
325
  @type sshkey: str
326
  @param sshkey: the SSH private key to write
327
  @type sshpub: str
328
  @param sshpub: the SSH public key to write
329
  @rtype: boolean
330
  @return: the success of the operation
331

332
  """
333
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337
  for name, content, mode in sshd_keys:
338
    utils.WriteFile(name, data=content, mode=mode)
339

    
340
  try:
341
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342
                                                    mkdir=True)
343
  except errors.OpExecError, err:
344
    _Fail("Error while processing user ssh files: %s", err, exc=True)
345

    
346
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347
    utils.WriteFile(name, data=content, mode=0600)
348

    
349
  utils.AddAuthorizedKey(auth_keys, sshpub)
350

    
351
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352

    
353

    
354
def LeaveCluster(modify_ssh_setup):
355
  """Cleans up and remove the current node.
356

357
  This function cleans up and prepares the current node to be removed
358
  from the cluster.
359

360
  If processing is successful, then it raises an
361
  L{errors.QuitGanetiException} which is used as a special case to
362
  shutdown the node daemon.
363

364
  @param modify_ssh_setup: boolean
365

366
  """
367
  _CleanDirectory(constants.DATA_DIR)
368
  JobQueuePurge()
369

    
370
  if modify_ssh_setup:
371
    try:
372
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
373

    
374
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
375

    
376
      utils.RemoveFile(priv_key)
377
      utils.RemoveFile(pub_key)
378
    except errors.OpExecError:
379
      logging.exception("Error while processing ssh files")
380

    
381
  try:
382
    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
383
    utils.RemoveFile(constants.RAPI_CERT_FILE)
384
    utils.RemoveFile(constants.SSL_CERT_FILE)
385
  except:
386
    logging.exception("Error while removing cluster secrets")
387

    
388
  confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
389

    
390
  if confd_pid:
391
    utils.KillProcess(confd_pid, timeout=2)
392

    
393
  # Raise a custom exception (handled in ganeti-noded)
394
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
395

    
396

    
397
def GetNodeInfo(vgname, hypervisor_type):
398
  """Gives back a hash with different information about the node.
399

400
  @type vgname: C{string}
401
  @param vgname: the name of the volume group to ask for disk space information
402
  @type hypervisor_type: C{str}
403
  @param hypervisor_type: the name of the hypervisor to ask for
404
      memory information
405
  @rtype: C{dict}
406
  @return: dictionary with the following keys:
407
      - vg_size is the size of the configured volume group in MiB
408
      - vg_free is the free size of the volume group in MiB
409
      - memory_dom0 is the memory allocated for domain0 in MiB
410
      - memory_free is the currently available (free) ram in MiB
411
      - memory_total is the total number of ram in MiB
412

413
  """
414
  outputarray = {}
415
  vginfo = _GetVGInfo(vgname)
416
  outputarray['vg_size'] = vginfo['vg_size']
417
  outputarray['vg_free'] = vginfo['vg_free']
418

    
419
  hyper = hypervisor.GetHypervisor(hypervisor_type)
420
  hyp_info = hyper.GetNodeInfo()
421
  if hyp_info is not None:
422
    outputarray.update(hyp_info)
423

    
424
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
425

    
426
  return outputarray
427

    
428

    
429
def VerifyNode(what, cluster_name):
430
  """Verify the status of the local node.
431

432
  Based on the input L{what} parameter, various checks are done on the
433
  local node.
434

435
  If the I{filelist} key is present, this list of
436
  files is checksummed and the file/checksum pairs are returned.
437

438
  If the I{nodelist} key is present, we check that we have
439
  connectivity via ssh with the target nodes (and check the hostname
440
  report).
441

442
  If the I{node-net-test} key is present, we check that we have
443
  connectivity to the given nodes via both primary IP and, if
444
  applicable, secondary IPs.
445

446
  @type what: C{dict}
447
  @param what: a dictionary of things to check:
448
      - filelist: list of files for which to compute checksums
449
      - nodelist: list of nodes we should check ssh communication with
450
      - node-net-test: list of nodes we should check node daemon port
451
        connectivity with
452
      - hypervisor: list with hypervisors to run the verify for
453
  @rtype: dict
454
  @return: a dictionary with the same keys as the input dict, and
455
      values representing the result of the checks
456

457
  """
458
  result = {}
459

    
460
  if constants.NV_HYPERVISOR in what:
461
    result[constants.NV_HYPERVISOR] = tmp = {}
462
    for hv_name in what[constants.NV_HYPERVISOR]:
463
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
464

    
465
  if constants.NV_FILELIST in what:
466
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
467
      what[constants.NV_FILELIST])
468

    
469
  if constants.NV_NODELIST in what:
470
    result[constants.NV_NODELIST] = tmp = {}
471
    random.shuffle(what[constants.NV_NODELIST])
472
    for node in what[constants.NV_NODELIST]:
473
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
474
      if not success:
475
        tmp[node] = message
476

    
477
  if constants.NV_NODENETTEST in what:
478
    result[constants.NV_NODENETTEST] = tmp = {}
479
    my_name = utils.HostInfo().name
480
    my_pip = my_sip = None
481
    for name, pip, sip in what[constants.NV_NODENETTEST]:
482
      if name == my_name:
483
        my_pip = pip
484
        my_sip = sip
485
        break
486
    if not my_pip:
487
      tmp[my_name] = ("Can't find my own primary/secondary IP"
488
                      " in the node list")
489
    else:
490
      port = utils.GetDaemonPort(constants.NODED)
491
      for name, pip, sip in what[constants.NV_NODENETTEST]:
492
        fail = []
493
        if not utils.TcpPing(pip, port, source=my_pip):
494
          fail.append("primary")
495
        if sip != pip:
496
          if not utils.TcpPing(sip, port, source=my_sip):
497
            fail.append("secondary")
498
        if fail:
499
          tmp[name] = ("failure using the %s interface(s)" %
500
                       " and ".join(fail))
501

    
502
  if constants.NV_LVLIST in what:
503
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
504

    
505
  if constants.NV_INSTANCELIST in what:
506
    result[constants.NV_INSTANCELIST] = GetInstanceList(
507
      what[constants.NV_INSTANCELIST])
508

    
509
  if constants.NV_VGLIST in what:
510
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
511

    
512
  if constants.NV_PVLIST in what:
513
    result[constants.NV_PVLIST] = \
514
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
515
                                   filter_allocatable=False)
516

    
517
  if constants.NV_VERSION in what:
518
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
519
                                    constants.RELEASE_VERSION)
520

    
521
  if constants.NV_HVINFO in what:
522
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
523
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
524

    
525
  if constants.NV_DRBDLIST in what:
526
    try:
527
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
528
    except errors.BlockDeviceError, err:
529
      logging.warning("Can't get used minors list", exc_info=True)
530
      used_minors = str(err)
531
    result[constants.NV_DRBDLIST] = used_minors
532

    
533
  if constants.NV_NODESETUP in what:
534
    result[constants.NV_NODESETUP] = tmpr = []
535
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
536
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
537
                  " under /sys, missing required directories /sys/block"
538
                  " and /sys/class/net")
539
    if (not os.path.isdir("/proc/sys") or
540
        not os.path.isfile("/proc/sysrq-trigger")):
541
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
542
                  " under /proc, missing required directory /proc/sys and"
543
                  " the file /proc/sysrq-trigger")
544
  return result
545

    
546

    
547
def GetVolumeList(vg_name):
548
  """Compute list of logical volumes and their size.
549

550
  @type vg_name: str
551
  @param vg_name: the volume group whose LVs we should list
552
  @rtype: dict
553
  @return:
554
      dictionary of all partions (key) with value being a tuple of
555
      their size (in MiB), inactive and online status::
556

557
        {'test1': ('20.06', True, True)}
558

559
      in case of errors, a string is returned with the error
560
      details.
561

562
  """
563
  lvs = {}
564
  sep = '|'
565
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
566
                         "--separator=%s" % sep,
567
                         "-olv_name,lv_size,lv_attr", vg_name])
568
  if result.failed:
569
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
570

    
571
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
572
  for line in result.stdout.splitlines():
573
    line = line.strip()
574
    match = valid_line_re.match(line)
575
    if not match:
576
      logging.error("Invalid line returned from lvs output: '%s'", line)
577
      continue
578
    name, size, attr = match.groups()
579
    inactive = attr[4] == '-'
580
    online = attr[5] == 'o'
581
    virtual = attr[0] == 'v'
582
    if virtual:
583
      # we don't want to report such volumes as existing, since they
584
      # don't really hold data
585
      continue
586
    lvs[name] = (size, inactive, online)
587

    
588
  return lvs
589

    
590

    
591
def ListVolumeGroups():
592
  """List the volume groups and their size.
593

594
  @rtype: dict
595
  @return: dictionary with keys volume name and values the
596
      size of the volume
597

598
  """
599
  return utils.ListVolumeGroups()
600

    
601

    
602
def NodeVolumes():
603
  """List all volumes on this node.
604

605
  @rtype: list
606
  @return:
607
    A list of dictionaries, each having four keys:
608
      - name: the logical volume name,
609
      - size: the size of the logical volume
610
      - dev: the physical device on which the LV lives
611
      - vg: the volume group to which it belongs
612

613
    In case of errors, we return an empty list and log the
614
    error.
615

616
    Note that since a logical volume can live on multiple physical
617
    volumes, the resulting list might include a logical volume
618
    multiple times.
619

620
  """
621
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
622
                         "--separator=|",
623
                         "--options=lv_name,lv_size,devices,vg_name"])
624
  if result.failed:
625
    _Fail("Failed to list logical volumes, lvs output: %s",
626
          result.output)
627

    
628
  def parse_dev(dev):
629
    if '(' in dev:
630
      return dev.split('(')[0]
631
    else:
632
      return dev
633

    
634
  def map_line(line):
635
    return {
636
      'name': line[0].strip(),
637
      'size': line[1].strip(),
638
      'dev': parse_dev(line[2].strip()),
639
      'vg': line[3].strip(),
640
    }
641

    
642
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
643
          if line.count('|') >= 3]
644

    
645

    
646
def BridgesExist(bridges_list):
647
  """Check if a list of bridges exist on the current node.
648

649
  @rtype: boolean
650
  @return: C{True} if all of them exist, C{False} otherwise
651

652
  """
653
  missing = []
654
  for bridge in bridges_list:
655
    if not utils.BridgeExists(bridge):
656
      missing.append(bridge)
657

    
658
  if missing:
659
    _Fail("Missing bridges %s", ", ".join(missing))
660

    
661

    
662
def GetInstanceList(hypervisor_list):
663
  """Provides a list of instances.
664

665
  @type hypervisor_list: list
666
  @param hypervisor_list: the list of hypervisors to query information
667

668
  @rtype: list
669
  @return: a list of all running instances on the current node
670
    - instance1.example.com
671
    - instance2.example.com
672

673
  """
674
  results = []
675
  for hname in hypervisor_list:
676
    try:
677
      names = hypervisor.GetHypervisor(hname).ListInstances()
678
      results.extend(names)
679
    except errors.HypervisorError, err:
680
      _Fail("Error enumerating instances (hypervisor %s): %s",
681
            hname, err, exc=True)
682

    
683
  return results
684

    
685

    
686
def GetInstanceInfo(instance, hname):
687
  """Gives back the information about an instance as a dictionary.
688

689
  @type instance: string
690
  @param instance: the instance name
691
  @type hname: string
692
  @param hname: the hypervisor type of the instance
693

694
  @rtype: dict
695
  @return: dictionary with the following keys:
696
      - memory: memory size of instance (int)
697
      - state: xen state of instance (string)
698
      - time: cpu time of instance (float)
699

700
  """
701
  output = {}
702

    
703
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
704
  if iinfo is not None:
705
    output['memory'] = iinfo[2]
706
    output['state'] = iinfo[4]
707
    output['time'] = iinfo[5]
708

    
709
  return output
710

    
711

    
712
def GetInstanceMigratable(instance):
713
  """Gives whether an instance can be migrated.
714

715
  @type instance: L{objects.Instance}
716
  @param instance: object representing the instance to be checked.
717

718
  @rtype: tuple
719
  @return: tuple of (result, description) where:
720
      - result: whether the instance can be migrated or not
721
      - description: a description of the issue, if relevant
722

723
  """
724
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
725
  iname = instance.name
726
  if iname not in hyper.ListInstances():
727
    _Fail("Instance %s is not running", iname)
728

    
729
  for idx in range(len(instance.disks)):
730
    link_name = _GetBlockDevSymlinkPath(iname, idx)
731
    if not os.path.islink(link_name):
732
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
733

    
734

    
735
def GetAllInstancesInfo(hypervisor_list):
736
  """Gather data about all instances.
737

738
  This is the equivalent of L{GetInstanceInfo}, except that it
739
  computes data for all instances at once, thus being faster if one
740
  needs data about more than one instance.
741

742
  @type hypervisor_list: list
743
  @param hypervisor_list: list of hypervisors to query for instance data
744

745
  @rtype: dict
746
  @return: dictionary of instance: data, with data having the following keys:
747
      - memory: memory size of instance (int)
748
      - state: xen state of instance (string)
749
      - time: cpu time of instance (float)
750
      - vcpus: the number of vcpus
751

752
  """
753
  output = {}
754

    
755
  for hname in hypervisor_list:
756
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
757
    if iinfo:
758
      for name, _, memory, vcpus, state, times in iinfo:
759
        value = {
760
          'memory': memory,
761
          'vcpus': vcpus,
762
          'state': state,
763
          'time': times,
764
          }
765
        if name in output:
766
          # we only check static parameters, like memory and vcpus,
767
          # and not state and time which can change between the
768
          # invocations of the different hypervisors
769
          for key in 'memory', 'vcpus':
770
            if value[key] != output[name][key]:
771
              _Fail("Instance %s is running twice"
772
                    " with different parameters", name)
773
        output[name] = value
774

    
775
  return output
776

    
777

    
778
def InstanceOsAdd(instance, reinstall):
779
  """Add an OS to an instance.
780

781
  @type instance: L{objects.Instance}
782
  @param instance: Instance whose OS is to be installed
783
  @type reinstall: boolean
784
  @param reinstall: whether this is an instance reinstall
785
  @rtype: None
786

787
  """
788
  inst_os = OSFromDisk(instance.os)
789

    
790
  create_env = OSEnvironment(instance, inst_os)
791
  if reinstall:
792
    create_env['INSTANCE_REINSTALL'] = "1"
793

    
794
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
795
                                     instance.name, int(time.time()))
796

    
797
  result = utils.RunCmd([inst_os.create_script], env=create_env,
798
                        cwd=inst_os.path, output=logfile,)
799
  if result.failed:
800
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
801
                  " output: %s", result.cmd, result.fail_reason, logfile,
802
                  result.output)
803
    lines = [utils.SafeEncode(val)
804
             for val in utils.TailFile(logfile, lines=20)]
805
    _Fail("OS create script failed (%s), last lines in the"
806
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
807

    
808

    
809
def RunRenameInstance(instance, old_name):
810
  """Run the OS rename script for an instance.
811

812
  @type instance: L{objects.Instance}
813
  @param instance: Instance whose OS is to be installed
814
  @type old_name: string
815
  @param old_name: previous instance name
816
  @rtype: boolean
817
  @return: the success of the operation
818

819
  """
820
  inst_os = OSFromDisk(instance.os)
821

    
822
  rename_env = OSEnvironment(instance, inst_os)
823
  rename_env['OLD_INSTANCE_NAME'] = old_name
824

    
825
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
826
                                           old_name,
827
                                           instance.name, int(time.time()))
828

    
829
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
830
                        cwd=inst_os.path, output=logfile)
831

    
832
  if result.failed:
833
    logging.error("os create command '%s' returned error: %s output: %s",
834
                  result.cmd, result.fail_reason, result.output)
835
    lines = [utils.SafeEncode(val)
836
             for val in utils.TailFile(logfile, lines=20)]
837
    _Fail("OS rename script failed (%s), last lines in the"
838
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
839

    
840

    
841
def _GetVGInfo(vg_name):
842
  """Get information about the volume group.
843

844
  @type vg_name: str
845
  @param vg_name: the volume group which we query
846
  @rtype: dict
847
  @return:
848
    A dictionary with the following keys:
849
      - C{vg_size} is the total size of the volume group in MiB
850
      - C{vg_free} is the free size of the volume group in MiB
851
      - C{pv_count} are the number of physical disks in that VG
852

853
    If an error occurs during gathering of data, we return the same dict
854
    with keys all set to None.
855

856
  """
857
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
858

    
859
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
860
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
861

    
862
  if retval.failed:
863
    logging.error("volume group %s not present", vg_name)
864
    return retdic
865
  valarr = retval.stdout.strip().rstrip(':').split(':')
866
  if len(valarr) == 3:
867
    try:
868
      retdic = {
869
        "vg_size": int(round(float(valarr[0]), 0)),
870
        "vg_free": int(round(float(valarr[1]), 0)),
871
        "pv_count": int(valarr[2]),
872
        }
873
    except ValueError, err:
874
      logging.exception("Fail to parse vgs output: %s", err)
875
  else:
876
    logging.error("vgs output has the wrong number of fields (expected"
877
                  " three): %s", str(valarr))
878
  return retdic
879

    
880

    
881
def _GetBlockDevSymlinkPath(instance_name, idx):
882
  return os.path.join(constants.DISK_LINKS_DIR,
883
                      "%s:%d" % (instance_name, idx))
884

    
885

    
886
def _SymlinkBlockDev(instance_name, device_path, idx):
887
  """Set up symlinks to a instance's block device.
888

889
  This is an auxiliary function run when an instance is start (on the primary
890
  node) or when an instance is migrated (on the target node).
891

892

893
  @param instance_name: the name of the target instance
894
  @param device_path: path of the physical block device, on the node
895
  @param idx: the disk index
896
  @return: absolute path to the disk's symlink
897

898
  """
899
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
900
  try:
901
    os.symlink(device_path, link_name)
902
  except OSError, err:
903
    if err.errno == errno.EEXIST:
904
      if (not os.path.islink(link_name) or
905
          os.readlink(link_name) != device_path):
906
        os.remove(link_name)
907
        os.symlink(device_path, link_name)
908
    else:
909
      raise
910

    
911
  return link_name
912

    
913

    
914
def _RemoveBlockDevLinks(instance_name, disks):
915
  """Remove the block device symlinks belonging to the given instance.
916

917
  """
918
  for idx, _ in enumerate(disks):
919
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
920
    if os.path.islink(link_name):
921
      try:
922
        os.remove(link_name)
923
      except OSError:
924
        logging.exception("Can't remove symlink '%s'", link_name)
925

    
926

    
927
def _GatherAndLinkBlockDevs(instance):
928
  """Set up an instance's block device(s).
929

930
  This is run on the primary node at instance startup. The block
931
  devices must be already assembled.
932

933
  @type instance: L{objects.Instance}
934
  @param instance: the instance whose disks we shoul assemble
935
  @rtype: list
936
  @return: list of (disk_object, device_path)
937

938
  """
939
  block_devices = []
940
  for idx, disk in enumerate(instance.disks):
941
    device = _RecursiveFindBD(disk)
942
    if device is None:
943
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
944
                                    str(disk))
945
    device.Open()
946
    try:
947
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
948
    except OSError, e:
949
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
950
                                    e.strerror)
951

    
952
    block_devices.append((disk, link_name))
953

    
954
  return block_devices
955

    
956

    
957
def StartInstance(instance):
958
  """Start an instance.
959

960
  @type instance: L{objects.Instance}
961
  @param instance: the instance object
962
  @rtype: None
963

964
  """
965
  running_instances = GetInstanceList([instance.hypervisor])
966

    
967
  if instance.name in running_instances:
968
    logging.info("Instance %s already running, not starting", instance.name)
969
    return
970

    
971
  try:
972
    block_devices = _GatherAndLinkBlockDevs(instance)
973
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
974
    hyper.StartInstance(instance, block_devices)
975
  except errors.BlockDeviceError, err:
976
    _Fail("Block device error: %s", err, exc=True)
977
  except errors.HypervisorError, err:
978
    _RemoveBlockDevLinks(instance.name, instance.disks)
979
    _Fail("Hypervisor error: %s", err, exc=True)
980

    
981

    
982
def InstanceShutdown(instance, timeout):
983
  """Shut an instance down.
984

985
  @note: this functions uses polling with a hardcoded timeout.
986

987
  @type instance: L{objects.Instance}
988
  @param instance: the instance object
989
  @type timeout: integer
990
  @param timeout: maximum timeout for soft shutdown
991
  @rtype: None
992

993
  """
994
  hv_name = instance.hypervisor
995
  hyper = hypervisor.GetHypervisor(hv_name)
996
  iname = instance.name
997

    
998
  if instance.name not in hyper.ListInstances():
999
    logging.info("Instance %s not running, doing nothing", iname)
1000
    return
1001

    
1002
  class _TryShutdown:
1003
    def __init__(self):
1004
      self.tried_once = False
1005

    
1006
    def __call__(self):
1007
      if iname not in hyper.ListInstances():
1008
        return
1009

    
1010
      try:
1011
        hyper.StopInstance(instance, retry=self.tried_once)
1012
      except errors.HypervisorError, err:
1013
        if iname not in hyper.ListInstances():
1014
          # if the instance is no longer existing, consider this a
1015
          # success and go to cleanup
1016
          return
1017

    
1018
        _Fail("Failed to stop instance %s: %s", iname, err)
1019

    
1020
      self.tried_once = True
1021

    
1022
      raise utils.RetryAgain()
1023

    
1024
  try:
1025
    utils.Retry(_TryShutdown(), 5, timeout)
1026
  except utils.RetryTimeout:
1027
    # the shutdown did not succeed
1028
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1029

    
1030
    try:
1031
      hyper.StopInstance(instance, force=True)
1032
    except errors.HypervisorError, err:
1033
      if iname in hyper.ListInstances():
1034
        # only raise an error if the instance still exists, otherwise
1035
        # the error could simply be "instance ... unknown"!
1036
        _Fail("Failed to force stop instance %s: %s", iname, err)
1037

    
1038
    time.sleep(1)
1039

    
1040
    if iname in hyper.ListInstances():
1041
      _Fail("Could not shutdown instance %s even by destroy", iname)
1042

    
1043
  _RemoveBlockDevLinks(iname, instance.disks)
1044

    
1045

    
1046
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1047
  """Reboot an instance.
1048

1049
  @type instance: L{objects.Instance}
1050
  @param instance: the instance object to reboot
1051
  @type reboot_type: str
1052
  @param reboot_type: the type of reboot, one the following
1053
    constants:
1054
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1055
        instance OS, do not recreate the VM
1056
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1057
        restart the VM (at the hypervisor level)
1058
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1059
        not accepted here, since that mode is handled differently, in
1060
        cmdlib, and translates into full stop and start of the
1061
        instance (instead of a call_instance_reboot RPC)
1062
  @type shutdown_timeout: integer
1063
  @param shutdown_timeout: maximum timeout for soft shutdown
1064
  @rtype: None
1065

1066
  """
1067
  running_instances = GetInstanceList([instance.hypervisor])
1068

    
1069
  if instance.name not in running_instances:
1070
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1071

    
1072
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1073
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1074
    try:
1075
      hyper.RebootInstance(instance)
1076
    except errors.HypervisorError, err:
1077
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1078
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1079
    try:
1080
      InstanceShutdown(instance, shutdown_timeout)
1081
      return StartInstance(instance)
1082
    except errors.HypervisorError, err:
1083
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1084
  else:
1085
    _Fail("Invalid reboot_type received: %s", reboot_type)
1086

    
1087

    
1088
def MigrationInfo(instance):
1089
  """Gather information about an instance to be migrated.
1090

1091
  @type instance: L{objects.Instance}
1092
  @param instance: the instance definition
1093

1094
  """
1095
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1096
  try:
1097
    info = hyper.MigrationInfo(instance)
1098
  except errors.HypervisorError, err:
1099
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1100
  return info
1101

    
1102

    
1103
def AcceptInstance(instance, info, target):
1104
  """Prepare the node to accept an instance.
1105

1106
  @type instance: L{objects.Instance}
1107
  @param instance: the instance definition
1108
  @type info: string/data (opaque)
1109
  @param info: migration information, from the source node
1110
  @type target: string
1111
  @param target: target host (usually ip), on this node
1112

1113
  """
1114
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1115
  try:
1116
    hyper.AcceptInstance(instance, info, target)
1117
  except errors.HypervisorError, err:
1118
    _Fail("Failed to accept instance: %s", err, exc=True)
1119

    
1120

    
1121
def FinalizeMigration(instance, info, success):
1122
  """Finalize any preparation to accept an instance.
1123

1124
  @type instance: L{objects.Instance}
1125
  @param instance: the instance definition
1126
  @type info: string/data (opaque)
1127
  @param info: migration information, from the source node
1128
  @type success: boolean
1129
  @param success: whether the migration was a success or a failure
1130

1131
  """
1132
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1133
  try:
1134
    hyper.FinalizeMigration(instance, info, success)
1135
  except errors.HypervisorError, err:
1136
    _Fail("Failed to finalize migration: %s", err, exc=True)
1137

    
1138

    
1139
def MigrateInstance(instance, target, live):
1140
  """Migrates an instance to another node.
1141

1142
  @type instance: L{objects.Instance}
1143
  @param instance: the instance definition
1144
  @type target: string
1145
  @param target: the target node name
1146
  @type live: boolean
1147
  @param live: whether the migration should be done live or not (the
1148
      interpretation of this parameter is left to the hypervisor)
1149
  @rtype: tuple
1150
  @return: a tuple of (success, msg) where:
1151
      - succes is a boolean denoting the success/failure of the operation
1152
      - msg is a string with details in case of failure
1153

1154
  """
1155
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1156

    
1157
  try:
1158
    hyper.MigrateInstance(instance.name, target, live)
1159
  except errors.HypervisorError, err:
1160
    _Fail("Failed to migrate instance: %s", err, exc=True)
1161

    
1162

    
1163
def BlockdevCreate(disk, size, owner, on_primary, info):
1164
  """Creates a block device for an instance.
1165

1166
  @type disk: L{objects.Disk}
1167
  @param disk: the object describing the disk we should create
1168
  @type size: int
1169
  @param size: the size of the physical underlying device, in MiB
1170
  @type owner: str
1171
  @param owner: the name of the instance for which disk is created,
1172
      used for device cache data
1173
  @type on_primary: boolean
1174
  @param on_primary:  indicates if it is the primary node or not
1175
  @type info: string
1176
  @param info: string that will be sent to the physical device
1177
      creation, used for example to set (LVM) tags on LVs
1178

1179
  @return: the new unique_id of the device (this can sometime be
1180
      computed only after creation), or None. On secondary nodes,
1181
      it's not required to return anything.
1182

1183
  """
1184
  clist = []
1185
  if disk.children:
1186
    for child in disk.children:
1187
      try:
1188
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1189
      except errors.BlockDeviceError, err:
1190
        _Fail("Can't assemble device %s: %s", child, err)
1191
      if on_primary or disk.AssembleOnSecondary():
1192
        # we need the children open in case the device itself has to
1193
        # be assembled
1194
        try:
1195
          crdev.Open()
1196
        except errors.BlockDeviceError, err:
1197
          _Fail("Can't make child '%s' read-write: %s", child, err)
1198
      clist.append(crdev)
1199

    
1200
  try:
1201
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1202
  except errors.BlockDeviceError, err:
1203
    _Fail("Can't create block device: %s", err)
1204

    
1205
  if on_primary or disk.AssembleOnSecondary():
1206
    try:
1207
      device.Assemble()
1208
    except errors.BlockDeviceError, err:
1209
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1210
    device.SetSyncSpeed(constants.SYNC_SPEED)
1211
    if on_primary or disk.OpenOnSecondary():
1212
      try:
1213
        device.Open(force=True)
1214
      except errors.BlockDeviceError, err:
1215
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1216
    DevCacheManager.UpdateCache(device.dev_path, owner,
1217
                                on_primary, disk.iv_name)
1218

    
1219
  device.SetInfo(info)
1220

    
1221
  return device.unique_id
1222

    
1223

    
1224
def BlockdevRemove(disk):
1225
  """Remove a block device.
1226

1227
  @note: This is intended to be called recursively.
1228

1229
  @type disk: L{objects.Disk}
1230
  @param disk: the disk object we should remove
1231
  @rtype: boolean
1232
  @return: the success of the operation
1233

1234
  """
1235
  msgs = []
1236
  try:
1237
    rdev = _RecursiveFindBD(disk)
1238
  except errors.BlockDeviceError, err:
1239
    # probably can't attach
1240
    logging.info("Can't attach to device %s in remove", disk)
1241
    rdev = None
1242
  if rdev is not None:
1243
    r_path = rdev.dev_path
1244
    try:
1245
      rdev.Remove()
1246
    except errors.BlockDeviceError, err:
1247
      msgs.append(str(err))
1248
    if not msgs:
1249
      DevCacheManager.RemoveCache(r_path)
1250

    
1251
  if disk.children:
1252
    for child in disk.children:
1253
      try:
1254
        BlockdevRemove(child)
1255
      except RPCFail, err:
1256
        msgs.append(str(err))
1257

    
1258
  if msgs:
1259
    _Fail("; ".join(msgs))
1260

    
1261

    
1262
def _RecursiveAssembleBD(disk, owner, as_primary):
1263
  """Activate a block device for an instance.
1264

1265
  This is run on the primary and secondary nodes for an instance.
1266

1267
  @note: this function is called recursively.
1268

1269
  @type disk: L{objects.Disk}
1270
  @param disk: the disk we try to assemble
1271
  @type owner: str
1272
  @param owner: the name of the instance which owns the disk
1273
  @type as_primary: boolean
1274
  @param as_primary: if we should make the block device
1275
      read/write
1276

1277
  @return: the assembled device or None (in case no device
1278
      was assembled)
1279
  @raise errors.BlockDeviceError: in case there is an error
1280
      during the activation of the children or the device
1281
      itself
1282

1283
  """
1284
  children = []
1285
  if disk.children:
1286
    mcn = disk.ChildrenNeeded()
1287
    if mcn == -1:
1288
      mcn = 0 # max number of Nones allowed
1289
    else:
1290
      mcn = len(disk.children) - mcn # max number of Nones
1291
    for chld_disk in disk.children:
1292
      try:
1293
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1294
      except errors.BlockDeviceError, err:
1295
        if children.count(None) >= mcn:
1296
          raise
1297
        cdev = None
1298
        logging.error("Error in child activation (but continuing): %s",
1299
                      str(err))
1300
      children.append(cdev)
1301

    
1302
  if as_primary or disk.AssembleOnSecondary():
1303
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1304
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1305
    result = r_dev
1306
    if as_primary or disk.OpenOnSecondary():
1307
      r_dev.Open()
1308
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1309
                                as_primary, disk.iv_name)
1310

    
1311
  else:
1312
    result = True
1313
  return result
1314

    
1315

    
1316
def BlockdevAssemble(disk, owner, as_primary):
1317
  """Activate a block device for an instance.
1318

1319
  This is a wrapper over _RecursiveAssembleBD.
1320

1321
  @rtype: str or boolean
1322
  @return: a C{/dev/...} path for primary nodes, and
1323
      C{True} for secondary nodes
1324

1325
  """
1326
  try:
1327
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1328
    if isinstance(result, bdev.BlockDev):
1329
      result = result.dev_path
1330
  except errors.BlockDeviceError, err:
1331
    _Fail("Error while assembling disk: %s", err, exc=True)
1332

    
1333
  return result
1334

    
1335

    
1336
def BlockdevShutdown(disk):
1337
  """Shut down a block device.
1338

1339
  First, if the device is assembled (Attach() is successful), then
1340
  the device is shutdown. Then the children of the device are
1341
  shutdown.
1342

1343
  This function is called recursively. Note that we don't cache the
1344
  children or such, as oppossed to assemble, shutdown of different
1345
  devices doesn't require that the upper device was active.
1346

1347
  @type disk: L{objects.Disk}
1348
  @param disk: the description of the disk we should
1349
      shutdown
1350
  @rtype: None
1351

1352
  """
1353
  msgs = []
1354
  r_dev = _RecursiveFindBD(disk)
1355
  if r_dev is not None:
1356
    r_path = r_dev.dev_path
1357
    try:
1358
      r_dev.Shutdown()
1359
      DevCacheManager.RemoveCache(r_path)
1360
    except errors.BlockDeviceError, err:
1361
      msgs.append(str(err))
1362

    
1363
  if disk.children:
1364
    for child in disk.children:
1365
      try:
1366
        BlockdevShutdown(child)
1367
      except RPCFail, err:
1368
        msgs.append(str(err))
1369

    
1370
  if msgs:
1371
    _Fail("; ".join(msgs))
1372

    
1373

    
1374
def BlockdevAddchildren(parent_cdev, new_cdevs):
1375
  """Extend a mirrored block device.
1376

1377
  @type parent_cdev: L{objects.Disk}
1378
  @param parent_cdev: the disk to which we should add children
1379
  @type new_cdevs: list of L{objects.Disk}
1380
  @param new_cdevs: the list of children which we should add
1381
  @rtype: None
1382

1383
  """
1384
  parent_bdev = _RecursiveFindBD(parent_cdev)
1385
  if parent_bdev is None:
1386
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1387
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1388
  if new_bdevs.count(None) > 0:
1389
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1390
  parent_bdev.AddChildren(new_bdevs)
1391

    
1392

    
1393
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1394
  """Shrink a mirrored block device.
1395

1396
  @type parent_cdev: L{objects.Disk}
1397
  @param parent_cdev: the disk from which we should remove children
1398
  @type new_cdevs: list of L{objects.Disk}
1399
  @param new_cdevs: the list of children which we should remove
1400
  @rtype: None
1401

1402
  """
1403
  parent_bdev = _RecursiveFindBD(parent_cdev)
1404
  if parent_bdev is None:
1405
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1406
  devs = []
1407
  for disk in new_cdevs:
1408
    rpath = disk.StaticDevPath()
1409
    if rpath is None:
1410
      bd = _RecursiveFindBD(disk)
1411
      if bd is None:
1412
        _Fail("Can't find device %s while removing children", disk)
1413
      else:
1414
        devs.append(bd.dev_path)
1415
    else:
1416
      devs.append(rpath)
1417
  parent_bdev.RemoveChildren(devs)
1418

    
1419

    
1420
def BlockdevGetmirrorstatus(disks):
1421
  """Get the mirroring status of a list of devices.
1422

1423
  @type disks: list of L{objects.Disk}
1424
  @param disks: the list of disks which we should query
1425
  @rtype: disk
1426
  @return:
1427
      a list of (mirror_done, estimated_time) tuples, which
1428
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1429
  @raise errors.BlockDeviceError: if any of the disks cannot be
1430
      found
1431

1432
  """
1433
  stats = []
1434
  for dsk in disks:
1435
    rbd = _RecursiveFindBD(dsk)
1436
    if rbd is None:
1437
      _Fail("Can't find device %s", dsk)
1438

    
1439
    stats.append(rbd.CombinedSyncStatus())
1440

    
1441
  return stats
1442

    
1443

    
1444
def _RecursiveFindBD(disk):
1445
  """Check if a device is activated.
1446

1447
  If so, return information about the real device.
1448

1449
  @type disk: L{objects.Disk}
1450
  @param disk: the disk object we need to find
1451

1452
  @return: None if the device can't be found,
1453
      otherwise the device instance
1454

1455
  """
1456
  children = []
1457
  if disk.children:
1458
    for chdisk in disk.children:
1459
      children.append(_RecursiveFindBD(chdisk))
1460

    
1461
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1462

    
1463

    
1464
def BlockdevFind(disk):
1465
  """Check if a device is activated.
1466

1467
  If it is, return information about the real device.
1468

1469
  @type disk: L{objects.Disk}
1470
  @param disk: the disk to find
1471
  @rtype: None or objects.BlockDevStatus
1472
  @return: None if the disk cannot be found, otherwise a the current
1473
           information
1474

1475
  """
1476
  try:
1477
    rbd = _RecursiveFindBD(disk)
1478
  except errors.BlockDeviceError, err:
1479
    _Fail("Failed to find device: %s", err, exc=True)
1480

    
1481
  if rbd is None:
1482
    return None
1483

    
1484
  return rbd.GetSyncStatus()
1485

    
1486

    
1487
def BlockdevGetsize(disks):
1488
  """Computes the size of the given disks.
1489

1490
  If a disk is not found, returns None instead.
1491

1492
  @type disks: list of L{objects.Disk}
1493
  @param disks: the list of disk to compute the size for
1494
  @rtype: list
1495
  @return: list with elements None if the disk cannot be found,
1496
      otherwise the size
1497

1498
  """
1499
  result = []
1500
  for cf in disks:
1501
    try:
1502
      rbd = _RecursiveFindBD(cf)
1503
    except errors.BlockDeviceError, err:
1504
      result.append(None)
1505
      continue
1506
    if rbd is None:
1507
      result.append(None)
1508
    else:
1509
      result.append(rbd.GetActualSize())
1510
  return result
1511

    
1512

    
1513
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1514
  """Export a block device to a remote node.
1515

1516
  @type disk: L{objects.Disk}
1517
  @param disk: the description of the disk to export
1518
  @type dest_node: str
1519
  @param dest_node: the destination node to export to
1520
  @type dest_path: str
1521
  @param dest_path: the destination path on the target node
1522
  @type cluster_name: str
1523
  @param cluster_name: the cluster name, needed for SSH hostalias
1524
  @rtype: None
1525

1526
  """
1527
  real_disk = _RecursiveFindBD(disk)
1528
  if real_disk is None:
1529
    _Fail("Block device '%s' is not set up", disk)
1530

    
1531
  real_disk.Open()
1532

    
1533
  # the block size on the read dd is 1MiB to match our units
1534
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1535
                               "dd if=%s bs=1048576 count=%s",
1536
                               real_disk.dev_path, str(disk.size))
1537

    
1538
  # we set here a smaller block size as, due to ssh buffering, more
1539
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1540
  # device is not already there or we pass a wrong path; we use
1541
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1542
  # to not buffer too much memory; this means that at best, we flush
1543
  # every 64k, which will not be very fast
1544
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1545
                                " oflag=dsync", dest_path)
1546

    
1547
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1548
                                                   constants.GANETI_RUNAS,
1549
                                                   destcmd)
1550

    
1551
  # all commands have been checked, so we're safe to combine them
1552
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1553

    
1554
  result = utils.RunCmd(["bash", "-c", command])
1555

    
1556
  if result.failed:
1557
    _Fail("Disk copy command '%s' returned error: %s"
1558
          " output: %s", command, result.fail_reason, result.output)
1559

    
1560

    
1561
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1562
  """Write a file to the filesystem.
1563

1564
  This allows the master to overwrite(!) a file. It will only perform
1565
  the operation if the file belongs to a list of configuration files.
1566

1567
  @type file_name: str
1568
  @param file_name: the target file name
1569
  @type data: str
1570
  @param data: the new contents of the file
1571
  @type mode: int
1572
  @param mode: the mode to give the file (can be None)
1573
  @type uid: int
1574
  @param uid: the owner of the file (can be -1 for default)
1575
  @type gid: int
1576
  @param gid: the group of the file (can be -1 for default)
1577
  @type atime: float
1578
  @param atime: the atime to set on the file (can be None)
1579
  @type mtime: float
1580
  @param mtime: the mtime to set on the file (can be None)
1581
  @rtype: None
1582

1583
  """
1584
  if not os.path.isabs(file_name):
1585
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1586

    
1587
  if file_name not in _ALLOWED_UPLOAD_FILES:
1588
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1589
          file_name)
1590

    
1591
  raw_data = _Decompress(data)
1592

    
1593
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1594
                  atime=atime, mtime=mtime)
1595

    
1596

    
1597
def WriteSsconfFiles(values):
1598
  """Update all ssconf files.
1599

1600
  Wrapper around the SimpleStore.WriteFiles.
1601

1602
  """
1603
  ssconf.SimpleStore().WriteFiles(values)
1604

    
1605

    
1606
def _ErrnoOrStr(err):
1607
  """Format an EnvironmentError exception.
1608

1609
  If the L{err} argument has an errno attribute, it will be looked up
1610
  and converted into a textual C{E...} description. Otherwise the
1611
  string representation of the error will be returned.
1612

1613
  @type err: L{EnvironmentError}
1614
  @param err: the exception to format
1615

1616
  """
1617
  if hasattr(err, 'errno'):
1618
    detail = errno.errorcode[err.errno]
1619
  else:
1620
    detail = str(err)
1621
  return detail
1622

    
1623

    
1624
def _OSOndiskAPIVersion(name, os_dir):
1625
  """Compute and return the API version of a given OS.
1626

1627
  This function will try to read the API version of the OS given by
1628
  the 'name' parameter and residing in the 'os_dir' directory.
1629

1630
  @type name: str
1631
  @param name: the OS name we should look for
1632
  @type os_dir: str
1633
  @param os_dir: the directory inwhich we should look for the OS
1634
  @rtype: tuple
1635
  @return: tuple (status, data) with status denoting the validity and
1636
      data holding either the vaid versions or an error message
1637

1638
  """
1639
  api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1640

    
1641
  try:
1642
    st = os.stat(api_file)
1643
  except EnvironmentError, err:
1644
    return False, ("Required file '%s' not found under path %s: %s" %
1645
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1646

    
1647
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1648
    return False, ("File '%s' in %s is not a regular file" %
1649
                   (constants.OS_API_FILE, os_dir))
1650

    
1651
  try:
1652
    api_versions = utils.ReadFile(api_file).splitlines()
1653
  except EnvironmentError, err:
1654
    return False, ("Error while reading the API version file at %s: %s" %
1655
                   (api_file, _ErrnoOrStr(err)))
1656

    
1657
  try:
1658
    api_versions = [int(version.strip()) for version in api_versions]
1659
  except (TypeError, ValueError), err:
1660
    return False, ("API version(s) can't be converted to integer: %s" %
1661
                   str(err))
1662

    
1663
  return True, api_versions
1664

    
1665

    
1666
def DiagnoseOS(top_dirs=None):
1667
  """Compute the validity for all OSes.
1668

1669
  @type top_dirs: list
1670
  @param top_dirs: the list of directories in which to
1671
      search (if not given defaults to
1672
      L{constants.OS_SEARCH_PATH})
1673
  @rtype: list of L{objects.OS}
1674
  @return: a list of tuples (name, path, status, diagnose, variants)
1675
      for all (potential) OSes under all search paths, where:
1676
          - name is the (potential) OS name
1677
          - path is the full path to the OS
1678
          - status True/False is the validity of the OS
1679
          - diagnose is the error message for an invalid OS, otherwise empty
1680
          - variants is a list of supported OS variants, if any
1681

1682
  """
1683
  if top_dirs is None:
1684
    top_dirs = constants.OS_SEARCH_PATH
1685

    
1686
  result = []
1687
  for dir_name in top_dirs:
1688
    if os.path.isdir(dir_name):
1689
      try:
1690
        f_names = utils.ListVisibleFiles(dir_name)
1691
      except EnvironmentError, err:
1692
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1693
        break
1694
      for name in f_names:
1695
        os_path = os.path.sep.join([dir_name, name])
1696
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1697
        if status:
1698
          diagnose = ""
1699
          variants = os_inst.supported_variants
1700
        else:
1701
          diagnose = os_inst
1702
          variants = []
1703
        result.append((name, os_path, status, diagnose, variants))
1704

    
1705
  return result
1706

    
1707

    
1708
def _TryOSFromDisk(name, base_dir=None):
1709
  """Create an OS instance from disk.
1710

1711
  This function will return an OS instance if the given name is a
1712
  valid OS name.
1713

1714
  @type base_dir: string
1715
  @keyword base_dir: Base directory containing OS installations.
1716
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1717
  @rtype: tuple
1718
  @return: success and either the OS instance if we find a valid one,
1719
      or error message
1720

1721
  """
1722
  if base_dir is None:
1723
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1724
    if os_dir is None:
1725
      return False, "Directory for OS %s not found in search path" % name
1726
  else:
1727
    os_dir = os.path.sep.join([base_dir, name])
1728

    
1729
  status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1730
  if not status:
1731
    # push the error up
1732
    return status, api_versions
1733

    
1734
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1735
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1736
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1737

    
1738
  # OS Files dictionary, we will populate it with the absolute path names
1739
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1740

    
1741
  if max(api_versions) >= constants.OS_API_V15:
1742
    os_files[constants.OS_VARIANTS_FILE] = ''
1743

    
1744
  for filename in os_files:
1745
    os_files[filename] = os.path.sep.join([os_dir, filename])
1746

    
1747
    try:
1748
      st = os.stat(os_files[filename])
1749
    except EnvironmentError, err:
1750
      return False, ("File '%s' under path '%s' is missing (%s)" %
1751
                     (filename, os_dir, _ErrnoOrStr(err)))
1752

    
1753
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1754
      return False, ("File '%s' under path '%s' is not a regular file" %
1755
                     (filename, os_dir))
1756

    
1757
    if filename in constants.OS_SCRIPTS:
1758
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1759
        return False, ("File '%s' under path '%s' is not executable" %
1760
                       (filename, os_dir))
1761

    
1762
  variants = None
1763
  if constants.OS_VARIANTS_FILE in os_files:
1764
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1765
    try:
1766
      variants = utils.ReadFile(variants_file).splitlines()
1767
    except EnvironmentError, err:
1768
      return False, ("Error while reading the OS variants file at %s: %s" %
1769
                     (variants_file, _ErrnoOrStr(err)))
1770
    if not variants:
1771
      return False, ("No supported os variant found")
1772

    
1773
  os_obj = objects.OS(name=name, path=os_dir,
1774
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1775
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1776
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1777
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1778
                      supported_variants=variants,
1779
                      api_versions=api_versions)
1780
  return True, os_obj
1781

    
1782

    
1783
def OSFromDisk(name, base_dir=None):
1784
  """Create an OS instance from disk.
1785

1786
  This function will return an OS instance if the given name is a
1787
  valid OS name. Otherwise, it will raise an appropriate
1788
  L{RPCFail} exception, detailing why this is not a valid OS.
1789

1790
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1791
  an exception but returns true/false status data.
1792

1793
  @type base_dir: string
1794
  @keyword base_dir: Base directory containing OS installations.
1795
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1796
  @rtype: L{objects.OS}
1797
  @return: the OS instance if we find a valid one
1798
  @raise RPCFail: if we don't find a valid OS
1799

1800
  """
1801
  name_only = name.split("+", 1)[0]
1802
  status, payload = _TryOSFromDisk(name_only, base_dir)
1803

    
1804
  if not status:
1805
    _Fail(payload)
1806

    
1807
  return payload
1808

    
1809

    
1810
def OSEnvironment(instance, inst_os, debug=0):
1811
  """Calculate the environment for an os script.
1812

1813
  @type instance: L{objects.Instance}
1814
  @param instance: target instance for the os script run
1815
  @type inst_os: L{objects.OS}
1816
  @param inst_os: operating system for which the environment is being built
1817
  @type debug: integer
1818
  @param debug: debug level (0 or 1, for OS Api 10)
1819
  @rtype: dict
1820
  @return: dict of environment variables
1821
  @raise errors.BlockDeviceError: if the block device
1822
      cannot be found
1823

1824
  """
1825
  result = {}
1826
  api_version = \
1827
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1828
  result['OS_API_VERSION'] = '%d' % api_version
1829
  result['INSTANCE_NAME'] = instance.name
1830
  result['INSTANCE_OS'] = instance.os
1831
  result['HYPERVISOR'] = instance.hypervisor
1832
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1833
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1834
  result['DEBUG_LEVEL'] = '%d' % debug
1835
  if api_version >= constants.OS_API_V15:
1836
    try:
1837
      variant = instance.os.split('+', 1)[1]
1838
    except IndexError:
1839
      variant = inst_os.supported_variants[0]
1840
    result['OS_VARIANT'] = variant
1841
  for idx, disk in enumerate(instance.disks):
1842
    real_disk = _RecursiveFindBD(disk)
1843
    if real_disk is None:
1844
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1845
                                    str(disk))
1846
    real_disk.Open()
1847
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1848
    result['DISK_%d_ACCESS' % idx] = disk.mode
1849
    if constants.HV_DISK_TYPE in instance.hvparams:
1850
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1851
        instance.hvparams[constants.HV_DISK_TYPE]
1852
    if disk.dev_type in constants.LDS_BLOCK:
1853
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1854
    elif disk.dev_type == constants.LD_FILE:
1855
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1856
        'file:%s' % disk.physical_id[0]
1857
  for idx, nic in enumerate(instance.nics):
1858
    result['NIC_%d_MAC' % idx] = nic.mac
1859
    if nic.ip:
1860
      result['NIC_%d_IP' % idx] = nic.ip
1861
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1862
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1863
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1864
    if nic.nicparams[constants.NIC_LINK]:
1865
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1866
    if constants.HV_NIC_TYPE in instance.hvparams:
1867
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1868
        instance.hvparams[constants.HV_NIC_TYPE]
1869

    
1870
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1871
    for key, value in source.items():
1872
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1873

    
1874
  return result
1875

    
1876
def BlockdevGrow(disk, amount):
1877
  """Grow a stack of block devices.
1878

1879
  This function is called recursively, with the childrens being the
1880
  first ones to resize.
1881

1882
  @type disk: L{objects.Disk}
1883
  @param disk: the disk to be grown
1884
  @rtype: (status, result)
1885
  @return: a tuple with the status of the operation
1886
      (True/False), and the errors message if status
1887
      is False
1888

1889
  """
1890
  r_dev = _RecursiveFindBD(disk)
1891
  if r_dev is None:
1892
    _Fail("Cannot find block device %s", disk)
1893

    
1894
  try:
1895
    r_dev.Grow(amount)
1896
  except errors.BlockDeviceError, err:
1897
    _Fail("Failed to grow block device: %s", err, exc=True)
1898

    
1899

    
1900
def BlockdevSnapshot(disk):
1901
  """Create a snapshot copy of a block device.
1902

1903
  This function is called recursively, and the snapshot is actually created
1904
  just for the leaf lvm backend device.
1905

1906
  @type disk: L{objects.Disk}
1907
  @param disk: the disk to be snapshotted
1908
  @rtype: string
1909
  @return: snapshot disk path
1910

1911
  """
1912
  if disk.children:
1913
    if len(disk.children) == 1:
1914
      # only one child, let's recurse on it
1915
      return BlockdevSnapshot(disk.children[0])
1916
    else:
1917
      # more than one child, choose one that matches
1918
      for child in disk.children:
1919
        if child.size == disk.size:
1920
          # return implies breaking the loop
1921
          return BlockdevSnapshot(child)
1922
  elif disk.dev_type == constants.LD_LV:
1923
    r_dev = _RecursiveFindBD(disk)
1924
    if r_dev is not None:
1925
      # let's stay on the safe side and ask for the full size, for now
1926
      return r_dev.Snapshot(disk.size)
1927
    else:
1928
      _Fail("Cannot find block device %s", disk)
1929
  else:
1930
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1931
          disk.unique_id, disk.dev_type)
1932

    
1933

    
1934
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1935
  """Export a block device snapshot to a remote node.
1936

1937
  @type disk: L{objects.Disk}
1938
  @param disk: the description of the disk to export
1939
  @type dest_node: str
1940
  @param dest_node: the destination node to export to
1941
  @type instance: L{objects.Instance}
1942
  @param instance: the instance object to whom the disk belongs
1943
  @type cluster_name: str
1944
  @param cluster_name: the cluster name, needed for SSH hostalias
1945
  @type idx: int
1946
  @param idx: the index of the disk in the instance's disk list,
1947
      used to export to the OS scripts environment
1948
  @rtype: None
1949

1950
  """
1951
  inst_os = OSFromDisk(instance.os)
1952
  export_env = OSEnvironment(instance, inst_os)
1953

    
1954
  export_script = inst_os.export_script
1955

    
1956
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1957
                                     instance.name, int(time.time()))
1958
  if not os.path.exists(constants.LOG_OS_DIR):
1959
    os.mkdir(constants.LOG_OS_DIR, 0750)
1960
  real_disk = _RecursiveFindBD(disk)
1961
  if real_disk is None:
1962
    _Fail("Block device '%s' is not set up", disk)
1963

    
1964
  real_disk.Open()
1965

    
1966
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1967
  export_env['EXPORT_INDEX'] = str(idx)
1968

    
1969
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1970
  destfile = disk.physical_id[1]
1971

    
1972
  # the target command is built out of three individual commands,
1973
  # which are joined by pipes; we check each individual command for
1974
  # valid parameters
1975
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1976
                               inst_os.path, export_script, logfile)
1977

    
1978
  comprcmd = "gzip"
1979

    
1980
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1981
                                destdir, destdir, destfile)
1982
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1983
                                                   constants.GANETI_RUNAS,
1984
                                                   destcmd)
1985

    
1986
  # all commands have been checked, so we're safe to combine them
1987
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1988

    
1989
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1990

    
1991
  if result.failed:
1992
    _Fail("OS snapshot export command '%s' returned error: %s"
1993
          " output: %s", command, result.fail_reason, result.output)
1994

    
1995

    
1996
def FinalizeExport(instance, snap_disks):
1997
  """Write out the export configuration information.
1998

1999
  @type instance: L{objects.Instance}
2000
  @param instance: the instance which we export, used for
2001
      saving configuration
2002
  @type snap_disks: list of L{objects.Disk}
2003
  @param snap_disks: list of snapshot block devices, which
2004
      will be used to get the actual name of the dump file
2005

2006
  @rtype: None
2007

2008
  """
2009
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
2010
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
2011

    
2012
  config = objects.SerializableConfigParser()
2013

    
2014
  config.add_section(constants.INISECT_EXP)
2015
  config.set(constants.INISECT_EXP, 'version', '0')
2016
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2017
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2018
  config.set(constants.INISECT_EXP, 'os', instance.os)
2019
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2020

    
2021
  config.add_section(constants.INISECT_INS)
2022
  config.set(constants.INISECT_INS, 'name', instance.name)
2023
  config.set(constants.INISECT_INS, 'memory', '%d' %
2024
             instance.beparams[constants.BE_MEMORY])
2025
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2026
             instance.beparams[constants.BE_VCPUS])
2027
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2028

    
2029
  nic_total = 0
2030
  for nic_count, nic in enumerate(instance.nics):
2031
    nic_total += 1
2032
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2033
               nic_count, '%s' % nic.mac)
2034
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2035
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2036
               '%s' % nic.bridge)
2037
  # TODO: redundant: on load can read nics until it doesn't exist
2038
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2039

    
2040
  disk_total = 0
2041
  for disk_count, disk in enumerate(snap_disks):
2042
    if disk:
2043
      disk_total += 1
2044
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2045
                 ('%s' % disk.iv_name))
2046
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2047
                 ('%s' % disk.physical_id[1]))
2048
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2049
                 ('%d' % disk.size))
2050

    
2051
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2052

    
2053
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2054
                  data=config.Dumps())
2055
  shutil.rmtree(finaldestdir, True)
2056
  shutil.move(destdir, finaldestdir)
2057

    
2058

    
2059
def ExportInfo(dest):
2060
  """Get export configuration information.
2061

2062
  @type dest: str
2063
  @param dest: directory containing the export
2064

2065
  @rtype: L{objects.SerializableConfigParser}
2066
  @return: a serializable config file containing the
2067
      export info
2068

2069
  """
2070
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2071

    
2072
  config = objects.SerializableConfigParser()
2073
  config.read(cff)
2074

    
2075
  if (not config.has_section(constants.INISECT_EXP) or
2076
      not config.has_section(constants.INISECT_INS)):
2077
    _Fail("Export info file doesn't have the required fields")
2078

    
2079
  return config.Dumps()
2080

    
2081

    
2082
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2083
  """Import an os image into an instance.
2084

2085
  @type instance: L{objects.Instance}
2086
  @param instance: instance to import the disks into
2087
  @type src_node: string
2088
  @param src_node: source node for the disk images
2089
  @type src_images: list of string
2090
  @param src_images: absolute paths of the disk images
2091
  @rtype: list of boolean
2092
  @return: each boolean represent the success of importing the n-th disk
2093

2094
  """
2095
  inst_os = OSFromDisk(instance.os)
2096
  import_env = OSEnvironment(instance, inst_os)
2097
  import_script = inst_os.import_script
2098

    
2099
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2100
                                        instance.name, int(time.time()))
2101
  if not os.path.exists(constants.LOG_OS_DIR):
2102
    os.mkdir(constants.LOG_OS_DIR, 0750)
2103

    
2104
  comprcmd = "gunzip"
2105
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2106
                               import_script, logfile)
2107

    
2108
  final_result = []
2109
  for idx, image in enumerate(src_images):
2110
    if image:
2111
      destcmd = utils.BuildShellCmd('cat %s', image)
2112
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2113
                                                       constants.GANETI_RUNAS,
2114
                                                       destcmd)
2115
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2116
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2117
      import_env['IMPORT_INDEX'] = str(idx)
2118
      result = utils.RunCmd(command, env=import_env)
2119
      if result.failed:
2120
        logging.error("Disk import command '%s' returned error: %s"
2121
                      " output: %s", command, result.fail_reason,
2122
                      result.output)
2123
        final_result.append("error importing disk %d: %s, %s" %
2124
                            (idx, result.fail_reason, result.output[-100]))
2125

    
2126
  if final_result:
2127
    _Fail("; ".join(final_result), log=False)
2128

    
2129

    
2130
def ListExports():
2131
  """Return a list of exports currently available on this machine.
2132

2133
  @rtype: list
2134
  @return: list of the exports
2135

2136
  """
2137
  if os.path.isdir(constants.EXPORT_DIR):
2138
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2139
  else:
2140
    _Fail("No exports directory")
2141

    
2142

    
2143
def RemoveExport(export):
2144
  """Remove an existing export from the node.
2145

2146
  @type export: str
2147
  @param export: the name of the export to remove
2148
  @rtype: None
2149

2150
  """
2151
  target = os.path.join(constants.EXPORT_DIR, export)
2152

    
2153
  try:
2154
    shutil.rmtree(target)
2155
  except EnvironmentError, err:
2156
    _Fail("Error while removing the export: %s", err, exc=True)
2157

    
2158

    
2159
def BlockdevRename(devlist):
2160
  """Rename a list of block devices.
2161

2162
  @type devlist: list of tuples
2163
  @param devlist: list of tuples of the form  (disk,
2164
      new_logical_id, new_physical_id); disk is an
2165
      L{objects.Disk} object describing the current disk,
2166
      and new logical_id/physical_id is the name we
2167
      rename it to
2168
  @rtype: boolean
2169
  @return: True if all renames succeeded, False otherwise
2170

2171
  """
2172
  msgs = []
2173
  result = True
2174
  for disk, unique_id in devlist:
2175
    dev = _RecursiveFindBD(disk)
2176
    if dev is None:
2177
      msgs.append("Can't find device %s in rename" % str(disk))
2178
      result = False
2179
      continue
2180
    try:
2181
      old_rpath = dev.dev_path
2182
      dev.Rename(unique_id)
2183
      new_rpath = dev.dev_path
2184
      if old_rpath != new_rpath:
2185
        DevCacheManager.RemoveCache(old_rpath)
2186
        # FIXME: we should add the new cache information here, like:
2187
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2188
        # but we don't have the owner here - maybe parse from existing
2189
        # cache? for now, we only lose lvm data when we rename, which
2190
        # is less critical than DRBD or MD
2191
    except errors.BlockDeviceError, err:
2192
      msgs.append("Can't rename device '%s' to '%s': %s" %
2193
                  (dev, unique_id, err))
2194
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2195
      result = False
2196
  if not result:
2197
    _Fail("; ".join(msgs))
2198

    
2199

    
2200
def _TransformFileStorageDir(file_storage_dir):
2201
  """Checks whether given file_storage_dir is valid.
2202

2203
  Checks wheter the given file_storage_dir is within the cluster-wide
2204
  default file_storage_dir stored in SimpleStore. Only paths under that
2205
  directory are allowed.
2206

2207
  @type file_storage_dir: str
2208
  @param file_storage_dir: the path to check
2209

2210
  @return: the normalized path if valid, None otherwise
2211

2212
  """
2213
  cfg = _GetConfig()
2214
  file_storage_dir = os.path.normpath(file_storage_dir)
2215
  base_file_storage_dir = cfg.GetFileStorageDir()
2216
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2217
      base_file_storage_dir):
2218
    _Fail("File storage directory '%s' is not under base file"
2219
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2220
  return file_storage_dir
2221

    
2222

    
2223
def CreateFileStorageDir(file_storage_dir):
2224
  """Create file storage directory.
2225

2226
  @type file_storage_dir: str
2227
  @param file_storage_dir: directory to create
2228

2229
  @rtype: tuple
2230
  @return: tuple with first element a boolean indicating wheter dir
2231
      creation was successful or not
2232

2233
  """
2234
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2235
  if os.path.exists(file_storage_dir):
2236
    if not os.path.isdir(file_storage_dir):
2237
      _Fail("Specified storage dir '%s' is not a directory",
2238
            file_storage_dir)
2239
  else:
2240
    try:
2241
      os.makedirs(file_storage_dir, 0750)
2242
    except OSError, err:
2243
      _Fail("Cannot create file storage directory '%s': %s",
2244
            file_storage_dir, err, exc=True)
2245

    
2246

    
2247
def RemoveFileStorageDir(file_storage_dir):
2248
  """Remove file storage directory.
2249

2250
  Remove it only if it's empty. If not log an error and return.
2251

2252
  @type file_storage_dir: str
2253
  @param file_storage_dir: the directory we should cleanup
2254
  @rtype: tuple (success,)
2255
  @return: tuple of one element, C{success}, denoting
2256
      whether the operation was successful
2257

2258
  """
2259
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2260
  if os.path.exists(file_storage_dir):
2261
    if not os.path.isdir(file_storage_dir):
2262
      _Fail("Specified Storage directory '%s' is not a directory",
2263
            file_storage_dir)
2264
    # deletes dir only if empty, otherwise we want to fail the rpc call
2265
    try:
2266
      os.rmdir(file_storage_dir)
2267
    except OSError, err:
2268
      _Fail("Cannot remove file storage directory '%s': %s",
2269
            file_storage_dir, err)
2270

    
2271

    
2272
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2273
  """Rename the file storage directory.
2274

2275
  @type old_file_storage_dir: str
2276
  @param old_file_storage_dir: the current path
2277
  @type new_file_storage_dir: str
2278
  @param new_file_storage_dir: the name we should rename to
2279
  @rtype: tuple (success,)
2280
  @return: tuple of one element, C{success}, denoting
2281
      whether the operation was successful
2282

2283
  """
2284
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2285
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2286
  if not os.path.exists(new_file_storage_dir):
2287
    if os.path.isdir(old_file_storage_dir):
2288
      try:
2289
        os.rename(old_file_storage_dir, new_file_storage_dir)
2290
      except OSError, err:
2291
        _Fail("Cannot rename '%s' to '%s': %s",
2292
              old_file_storage_dir, new_file_storage_dir, err)
2293
    else:
2294
      _Fail("Specified storage dir '%s' is not a directory",
2295
            old_file_storage_dir)
2296
  else:
2297
    if os.path.exists(old_file_storage_dir):
2298
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2299
            old_file_storage_dir, new_file_storage_dir)
2300

    
2301

    
2302
def _EnsureJobQueueFile(file_name):
2303
  """Checks whether the given filename is in the queue directory.
2304

2305
  @type file_name: str
2306
  @param file_name: the file name we should check
2307
  @rtype: None
2308
  @raises RPCFail: if the file is not valid
2309

2310
  """
2311
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2312
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2313

    
2314
  if not result:
2315
    _Fail("Passed job queue file '%s' does not belong to"
2316
          " the queue directory '%s'", file_name, queue_dir)
2317

    
2318

    
2319
def JobQueueUpdate(file_name, content):
2320
  """Updates a file in the queue directory.
2321

2322
  This is just a wrapper over L{utils.WriteFile}, with proper
2323
  checking.
2324

2325
  @type file_name: str
2326
  @param file_name: the job file name
2327
  @type content: str
2328
  @param content: the new job contents
2329
  @rtype: boolean
2330
  @return: the success of the operation
2331

2332
  """
2333
  _EnsureJobQueueFile(file_name)
2334

    
2335
  # Write and replace the file atomically
2336
  utils.WriteFile(file_name, data=_Decompress(content))
2337

    
2338

    
2339
def JobQueueRename(old, new):
2340
  """Renames a job queue file.
2341

2342
  This is just a wrapper over os.rename with proper checking.
2343

2344
  @type old: str
2345
  @param old: the old (actual) file name
2346
  @type new: str
2347
  @param new: the desired file name
2348
  @rtype: tuple
2349
  @return: the success of the operation and payload
2350

2351
  """
2352
  _EnsureJobQueueFile(old)
2353
  _EnsureJobQueueFile(new)
2354

    
2355
  utils.RenameFile(old, new, mkdir=True)
2356

    
2357

    
2358
def JobQueueSetDrainFlag(drain_flag):
2359
  """Set the drain flag for the queue.
2360

2361
  This will set or unset the queue drain flag.
2362

2363
  @type drain_flag: boolean
2364
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2365
  @rtype: truple
2366
  @return: always True, None
2367
  @warning: the function always returns True
2368

2369
  """
2370
  if drain_flag:
2371
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2372
  else:
2373
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2374

    
2375

    
2376
def BlockdevClose(instance_name, disks):
2377
  """Closes the given block devices.
2378

2379
  This means they will be switched to secondary mode (in case of
2380
  DRBD).
2381

2382
  @param instance_name: if the argument is not empty, the symlinks
2383
      of this instance will be removed
2384
  @type disks: list of L{objects.Disk}
2385
  @param disks: the list of disks to be closed
2386
  @rtype: tuple (success, message)
2387
  @return: a tuple of success and message, where success
2388
      indicates the succes of the operation, and message
2389
      which will contain the error details in case we
2390
      failed
2391

2392
  """
2393
  bdevs = []
2394
  for cf in disks:
2395
    rd = _RecursiveFindBD(cf)
2396
    if rd is None:
2397
      _Fail("Can't find device %s", cf)
2398
    bdevs.append(rd)
2399

    
2400
  msg = []
2401
  for rd in bdevs:
2402
    try:
2403
      rd.Close()
2404
    except errors.BlockDeviceError, err:
2405
      msg.append(str(err))
2406
  if msg:
2407
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2408
  else:
2409
    if instance_name:
2410
      _RemoveBlockDevLinks(instance_name, disks)
2411

    
2412

    
2413
def ValidateHVParams(hvname, hvparams):
2414
  """Validates the given hypervisor parameters.
2415

2416
  @type hvname: string
2417
  @param hvname: the hypervisor name
2418
  @type hvparams: dict
2419
  @param hvparams: the hypervisor parameters to be validated
2420
  @rtype: None
2421

2422
  """
2423
  try:
2424
    hv_type = hypervisor.GetHypervisor(hvname)
2425
    hv_type.ValidateParameters(hvparams)
2426
  except errors.HypervisorError, err:
2427
    _Fail(str(err), log=False)
2428

    
2429

    
2430
def DemoteFromMC():
2431
  """Demotes the current node from master candidate role.
2432

2433
  """
2434
  # try to ensure we're not the master by mistake
2435
  master, myself = ssconf.GetMasterAndMyself()
2436
  if master == myself:
2437
    _Fail("ssconf status shows I'm the master node, will not demote")
2438
  pid_file = utils.DaemonPidFileName(constants.MASTERD)
2439
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2440
    _Fail("The master daemon is running, will not demote")
2441
  try:
2442
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2443
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2444
  except EnvironmentError, err:
2445
    if err.errno != errno.ENOENT:
2446
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2447
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2448

    
2449

    
2450
def _FindDisks(nodes_ip, disks):
2451
  """Sets the physical ID on disks and returns the block devices.
2452

2453
  """
2454
  # set the correct physical ID
2455
  my_name = utils.HostInfo().name
2456
  for cf in disks:
2457
    cf.SetPhysicalID(my_name, nodes_ip)
2458

    
2459
  bdevs = []
2460

    
2461
  for cf in disks:
2462
    rd = _RecursiveFindBD(cf)
2463
    if rd is None:
2464
      _Fail("Can't find device %s", cf)
2465
    bdevs.append(rd)
2466
  return bdevs
2467

    
2468

    
2469
def DrbdDisconnectNet(nodes_ip, disks):
2470
  """Disconnects the network on a list of drbd devices.
2471

2472
  """
2473
  bdevs = _FindDisks(nodes_ip, disks)
2474

    
2475
  # disconnect disks
2476
  for rd in bdevs:
2477
    try:
2478
      rd.DisconnectNet()
2479
    except errors.BlockDeviceError, err:
2480
      _Fail("Can't change network configuration to standalone mode: %s",
2481
            err, exc=True)
2482

    
2483

    
2484
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2485
  """Attaches the network on a list of drbd devices.
2486

2487
  """
2488
  bdevs = _FindDisks(nodes_ip, disks)
2489

    
2490
  if multimaster:
2491
    for idx, rd in enumerate(bdevs):
2492
      try:
2493
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2494
      except EnvironmentError, err:
2495
        _Fail("Can't create symlink: %s", err)
2496
  # reconnect disks, switch to new master configuration and if
2497
  # needed primary mode
2498
  for rd in bdevs:
2499
    try:
2500
      rd.AttachNet(multimaster)
2501
    except errors.BlockDeviceError, err:
2502
      _Fail("Can't change network configuration: %s", err)
2503

    
2504
  # wait until the disks are connected; we need to retry the re-attach
2505
  # if the device becomes standalone, as this might happen if the one
2506
  # node disconnects and reconnects in a different mode before the
2507
  # other node reconnects; in this case, one or both of the nodes will
2508
  # decide it has wrong configuration and switch to standalone
2509

    
2510
  def _Attach():
2511
    all_connected = True
2512

    
2513
    for rd in bdevs:
2514
      stats = rd.GetProcStatus()
2515

    
2516
      all_connected = (all_connected and
2517
                       (stats.is_connected or stats.is_in_resync))
2518

    
2519
      if stats.is_standalone:
2520
        # peer had different config info and this node became
2521
        # standalone, even though this should not happen with the
2522
        # new staged way of changing disk configs
2523
        try:
2524
          rd.AttachNet(multimaster)
2525
        except errors.BlockDeviceError, err:
2526
          _Fail("Can't change network configuration: %s", err)
2527

    
2528
    if not all_connected:
2529
      raise utils.RetryAgain()
2530

    
2531
  try:
2532
    # Start with a delay of 100 miliseconds and go up to 5 seconds
2533
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2534
  except utils.RetryTimeout:
2535
    _Fail("Timeout in disk reconnecting")
2536

    
2537
  if multimaster:
2538
    # change to primary mode
2539
    for rd in bdevs:
2540
      try:
2541
        rd.Open()
2542
      except errors.BlockDeviceError, err:
2543
        _Fail("Can't change to primary mode: %s", err)
2544

    
2545

    
2546
def DrbdWaitSync(nodes_ip, disks):
2547
  """Wait until DRBDs have synchronized.
2548

2549
  """
2550
  def _helper(rd):
2551
    stats = rd.GetProcStatus()
2552
    if not (stats.is_connected or stats.is_in_resync):
2553
      raise utils.RetryAgain()
2554
    return stats
2555

    
2556
  bdevs = _FindDisks(nodes_ip, disks)
2557

    
2558
  min_resync = 100
2559
  alldone = True
2560
  for rd in bdevs:
2561
    try:
2562
      # poll each second for 15 seconds
2563
      stats = utils.Retry(_helper, 1, 15, args=[rd])
2564
    except utils.RetryTimeout:
2565
      stats = rd.GetProcStatus()
2566
      # last check
2567
      if not (stats.is_connected or stats.is_in_resync):
2568
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2569
    alldone = alldone and (not stats.is_in_resync)
2570
    if stats.sync_percent is not None:
2571
      min_resync = min(min_resync, stats.sync_percent)
2572

    
2573
  return (alldone, min_resync)
2574

    
2575

    
2576
def PowercycleNode(hypervisor_type):
2577
  """Hard-powercycle the node.
2578

2579
  Because we need to return first, and schedule the powercycle in the
2580
  background, we won't be able to report failures nicely.
2581

2582
  """
2583
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2584
  try:
2585
    pid = os.fork()
2586
  except OSError:
2587
    # if we can't fork, we'll pretend that we're in the child process
2588
    pid = 0
2589
  if pid > 0:
2590
    return "Reboot scheduled in 5 seconds"
2591
  time.sleep(5)
2592
  hyper.PowercycleNode()
2593

    
2594

    
2595
class HooksRunner(object):
2596
  """Hook runner.
2597

2598
  This class is instantiated on the node side (ganeti-noded) and not
2599
  on the master side.
2600

2601
  """
2602
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2603

    
2604
  def __init__(self, hooks_base_dir=None):
2605
    """Constructor for hooks runner.
2606

2607
    @type hooks_base_dir: str or None
2608
    @param hooks_base_dir: if not None, this overrides the
2609
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2610

2611
    """
2612
    if hooks_base_dir is None:
2613
      hooks_base_dir = constants.HOOKS_BASE_DIR
2614
    self._BASE_DIR = hooks_base_dir
2615

    
2616
  @staticmethod
2617
  def ExecHook(script, env):
2618
    """Exec one hook script.
2619

2620
    @type script: str
2621
    @param script: the full path to the script
2622
    @type env: dict
2623
    @param env: the environment with which to exec the script
2624
    @rtype: tuple (success, message)
2625
    @return: a tuple of success and message, where success
2626
        indicates the succes of the operation, and message
2627
        which will contain the error details in case we
2628
        failed
2629

2630
    """
2631
    # exec the process using subprocess and log the output
2632
    fdstdin = None
2633
    try:
2634
      fdstdin = open("/dev/null", "r")
2635
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2636
                               stderr=subprocess.STDOUT, close_fds=True,
2637
                               shell=False, cwd="/", env=env)
2638
      output = ""
2639
      try:
2640
        output = child.stdout.read(4096)
2641
        child.stdout.close()
2642
      except EnvironmentError, err:
2643
        output += "Hook script error: %s" % str(err)
2644

    
2645
      while True:
2646
        try:
2647
          result = child.wait()
2648
          break
2649
        except EnvironmentError, err:
2650
          if err.errno == errno.EINTR:
2651
            continue
2652
          raise
2653
    finally:
2654
      # try not to leak fds
2655
      for fd in (fdstdin, ):
2656
        if fd is not None:
2657
          try:
2658
            fd.close()
2659
          except EnvironmentError, err:
2660
            # just log the error
2661
            #logging.exception("Error while closing fd %s", fd)
2662
            pass
2663

    
2664
    return result == 0, utils.SafeEncode(output.strip())
2665

    
2666
  def RunHooks(self, hpath, phase, env):
2667
    """Run the scripts in the hooks directory.
2668

2669
    @type hpath: str
2670
    @param hpath: the path to the hooks directory which
2671
        holds the scripts
2672
    @type phase: str
2673
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2674
        L{constants.HOOKS_PHASE_POST}
2675
    @type env: dict
2676
    @param env: dictionary with the environment for the hook
2677
    @rtype: list
2678
    @return: list of 3-element tuples:
2679
      - script path
2680
      - script result, either L{constants.HKR_SUCCESS} or
2681
        L{constants.HKR_FAIL}
2682
      - output of the script
2683

2684
    @raise errors.ProgrammerError: for invalid input
2685
        parameters
2686

2687
    """
2688
    if phase == constants.HOOKS_PHASE_PRE:
2689
      suffix = "pre"
2690
    elif phase == constants.HOOKS_PHASE_POST:
2691
      suffix = "post"
2692
    else:
2693
      _Fail("Unknown hooks phase '%s'", phase)
2694

    
2695
    rr = []
2696

    
2697
    subdir = "%s-%s.d" % (hpath, suffix)
2698
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2699
    try:
2700
      dir_contents = utils.ListVisibleFiles(dir_name)
2701
    except OSError:
2702
      # FIXME: must log output in case of failures
2703
      return rr
2704

    
2705
    # we use the standard python sort order,
2706
    # so 00name is the recommended naming scheme
2707
    dir_contents.sort()
2708
    for relname in dir_contents:
2709
      fname = os.path.join(dir_name, relname)
2710
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2711
          self.RE_MASK.match(relname) is not None):
2712
        rrval = constants.HKR_SKIP
2713
        output = ""
2714
      else:
2715
        result, output = self.ExecHook(fname, env)
2716
        if not result:
2717
          rrval = constants.HKR_FAIL
2718
        else:
2719
          rrval = constants.HKR_SUCCESS
2720
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2721

    
2722
    return rr
2723

    
2724

    
2725
class IAllocatorRunner(object):
2726
  """IAllocator runner.
2727

2728
  This class is instantiated on the node side (ganeti-noded) and not on
2729
  the master side.
2730

2731
  """
2732
  def Run(self, name, idata):
2733
    """Run an iallocator script.
2734

2735
    @type name: str
2736
    @param name: the iallocator script name
2737
    @type idata: str
2738
    @param idata: the allocator input data
2739

2740
    @rtype: tuple
2741
    @return: two element tuple of:
2742
       - status
2743
       - either error message or stdout of allocator (for success)
2744

2745
    """
2746
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2747
                                  os.path.isfile)
2748
    if alloc_script is None:
2749
      _Fail("iallocator module '%s' not found in the search path", name)
2750

    
2751
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2752
    try:
2753
      os.write(fd, idata)
2754
      os.close(fd)
2755
      result = utils.RunCmd([alloc_script, fin_name])
2756
      if result.failed:
2757
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2758
              name, result.fail_reason, result.output)
2759
    finally:
2760
      os.unlink(fin_name)
2761

    
2762
    return result.stdout
2763

    
2764

    
2765
class DevCacheManager(object):
2766
  """Simple class for managing a cache of block device information.
2767

2768
  """
2769
  _DEV_PREFIX = "/dev/"
2770
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2771

    
2772
  @classmethod
2773
  def _ConvertPath(cls, dev_path):
2774
    """Converts a /dev/name path to the cache file name.
2775

2776
    This replaces slashes with underscores and strips the /dev
2777
    prefix. It then returns the full path to the cache file.
2778

2779
    @type dev_path: str
2780
    @param dev_path: the C{/dev/} path name
2781
    @rtype: str
2782
    @return: the converted path name
2783

2784
    """
2785
    if dev_path.startswith(cls._DEV_PREFIX):
2786
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2787
    dev_path = dev_path.replace("/", "_")
2788
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2789
    return fpath
2790

    
2791
  @classmethod
2792
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2793
    """Updates the cache information for a given device.
2794

2795
    @type dev_path: str
2796
    @param dev_path: the pathname of the device
2797
    @type owner: str
2798
    @param owner: the owner (instance name) of the device
2799
    @type on_primary: bool
2800
    @param on_primary: whether this is the primary
2801
        node nor not
2802
    @type iv_name: str
2803
    @param iv_name: the instance-visible name of the
2804
        device, as in objects.Disk.iv_name
2805

2806
    @rtype: None
2807

2808
    """
2809
    if dev_path is None:
2810
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2811
      return
2812
    fpath = cls._ConvertPath(dev_path)
2813
    if on_primary:
2814
      state = "primary"
2815
    else:
2816
      state = "secondary"
2817
    if iv_name is None:
2818
      iv_name = "not_visible"
2819
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2820
    try:
2821
      utils.WriteFile(fpath, data=fdata)
2822
    except EnvironmentError, err:
2823
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2824

    
2825
  @classmethod
2826
  def RemoveCache(cls, dev_path):
2827
    """Remove data for a dev_path.
2828

2829
    This is just a wrapper over L{utils.RemoveFile} with a converted
2830
    path name and logging.
2831

2832
    @type dev_path: str
2833
    @param dev_path: the pathname of the device
2834

2835
    @rtype: None
2836

2837
    """
2838
    if dev_path is None:
2839
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2840
      return
2841
    fpath = cls._ConvertPath(dev_path)
2842
    try:
2843
      utils.RemoveFile(fpath)
2844
    except EnvironmentError, err:
2845
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)