Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 6c881c52

History | View | Annotate | Download (86.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29
# pylint: disable-msg=E1103
30

    
31
# E1103: %s %r has no %r member (but some types could not be
32
# inferred), because the _TryOSFromDisk returns either (True, os_obj)
33
# or (False, "string") which confuses pylint
34

    
35

    
36
import os
37
import os.path
38
import shutil
39
import time
40
import stat
41
import errno
42
import re
43
import subprocess
44
import random
45
import logging
46
import tempfile
47
import zlib
48
import base64
49

    
50
from ganeti import errors
51
from ganeti import utils
52
from ganeti import ssh
53
from ganeti import hypervisor
54
from ganeti import constants
55
from ganeti import bdev
56
from ganeti import objects
57
from ganeti import ssconf
58

    
59

    
60
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
61

    
62

    
63
class RPCFail(Exception):
64
  """Class denoting RPC failure.
65

66
  Its argument is the error message.
67

68
  """
69

    
70

    
71
def _Fail(msg, *args, **kwargs):
72
  """Log an error and the raise an RPCFail exception.
73

74
  This exception is then handled specially in the ganeti daemon and
75
  turned into a 'failed' return type. As such, this function is a
76
  useful shortcut for logging the error and returning it to the master
77
  daemon.
78

79
  @type msg: string
80
  @param msg: the text of the exception
81
  @raise RPCFail
82

83
  """
84
  if args:
85
    msg = msg % args
86
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
87
    if "exc" in kwargs and kwargs["exc"]:
88
      logging.exception(msg)
89
    else:
90
      logging.error(msg)
91
  raise RPCFail(msg)
92

    
93

    
94
def _GetConfig():
95
  """Simple wrapper to return a SimpleStore.
96

97
  @rtype: L{ssconf.SimpleStore}
98
  @return: a SimpleStore instance
99

100
  """
101
  return ssconf.SimpleStore()
102

    
103

    
104
def _GetSshRunner(cluster_name):
105
  """Simple wrapper to return an SshRunner.
106

107
  @type cluster_name: str
108
  @param cluster_name: the cluster name, which is needed
109
      by the SshRunner constructor
110
  @rtype: L{ssh.SshRunner}
111
  @return: an SshRunner instance
112

113
  """
114
  return ssh.SshRunner(cluster_name)
115

    
116

    
117
def _Decompress(data):
118
  """Unpacks data compressed by the RPC client.
119

120
  @type data: list or tuple
121
  @param data: Data sent by RPC client
122
  @rtype: str
123
  @return: Decompressed data
124

125
  """
126
  assert isinstance(data, (list, tuple))
127
  assert len(data) == 2
128
  (encoding, content) = data
129
  if encoding == constants.RPC_ENCODING_NONE:
130
    return content
131
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
132
    return zlib.decompress(base64.b64decode(content))
133
  else:
134
    raise AssertionError("Unknown data encoding")
135

    
136

    
137
def _CleanDirectory(path, exclude=None):
138
  """Removes all regular files in a directory.
139

140
  @type path: str
141
  @param path: the directory to clean
142
  @type exclude: list
143
  @param exclude: list of files to be excluded, defaults
144
      to the empty list
145

146
  """
147
  if not os.path.isdir(path):
148
    return
149
  if exclude is None:
150
    exclude = []
151
  else:
152
    # Normalize excluded paths
153
    exclude = [os.path.normpath(i) for i in exclude]
154

    
155
  for rel_name in utils.ListVisibleFiles(path):
156
    full_name = os.path.normpath(os.path.join(path, rel_name))
157
    if full_name in exclude:
158
      continue
159
    if os.path.isfile(full_name) and not os.path.islink(full_name):
160
      utils.RemoveFile(full_name)
161

    
162

    
163
def _BuildUploadFileList():
164
  """Build the list of allowed upload files.
165

166
  This is abstracted so that it's built only once at module import time.
167

168
  """
169
  allowed_files = set([
170
    constants.CLUSTER_CONF_FILE,
171
    constants.ETC_HOSTS,
172
    constants.SSH_KNOWN_HOSTS_FILE,
173
    constants.VNC_PASSWORD_FILE,
174
    constants.RAPI_CERT_FILE,
175
    constants.RAPI_USERS_FILE,
176
    constants.HMAC_CLUSTER_KEY,
177
    ])
178

    
179
  for hv_name in constants.HYPER_TYPES:
180
    hv_class = hypervisor.GetHypervisorClass(hv_name)
181
    allowed_files.update(hv_class.GetAncillaryFiles())
182

    
183
  return frozenset(allowed_files)
184

    
185

    
186
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
187

    
188

    
189
def JobQueuePurge():
190
  """Removes job queue files and archived jobs.
191

192
  @rtype: tuple
193
  @return: True, None
194

195
  """
196
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
197
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
198

    
199

    
200
def GetMasterInfo():
201
  """Returns master information.
202

203
  This is an utility function to compute master information, either
204
  for consumption here or from the node daemon.
205

206
  @rtype: tuple
207
  @return: master_netdev, master_ip, master_name
208
  @raise RPCFail: in case of errors
209

210
  """
211
  try:
212
    cfg = _GetConfig()
213
    master_netdev = cfg.GetMasterNetdev()
214
    master_ip = cfg.GetMasterIP()
215
    master_node = cfg.GetMasterNode()
216
  except errors.ConfigurationError, err:
217
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
218
  return (master_netdev, master_ip, master_node)
219

    
220

    
221
def StartMaster(start_daemons, no_voting):
222
  """Activate local node as master node.
223

224
  The function will always try activate the IP address of the master
225
  (unless someone else has it). It will also start the master daemons,
226
  based on the start_daemons parameter.
227

228
  @type start_daemons: boolean
229
  @param start_daemons: whether to also start the master
230
      daemons (ganeti-masterd and ganeti-rapi)
231
  @type no_voting: boolean
232
  @param no_voting: whether to start ganeti-masterd without a node vote
233
      (if start_daemons is True), but still non-interactively
234
  @rtype: None
235

236
  """
237
  # GetMasterInfo will raise an exception if not able to return data
238
  master_netdev, master_ip, _ = GetMasterInfo()
239

    
240
  err_msgs = []
241
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
242
    if utils.OwnIpAddress(master_ip):
243
      # we already have the ip:
244
      logging.debug("Master IP already configured, doing nothing")
245
    else:
246
      msg = "Someone else has the master ip, not activating"
247
      logging.error(msg)
248
      err_msgs.append(msg)
249
  else:
250
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
251
                           "dev", master_netdev, "label",
252
                           "%s:0" % master_netdev])
253
    if result.failed:
254
      msg = "Can't activate master IP: %s" % result.output
255
      logging.error(msg)
256
      err_msgs.append(msg)
257

    
258
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
259
                           "-s", master_ip, master_ip])
260
    # we'll ignore the exit code of arping
261

    
262
  # and now start the master and rapi daemons
263
  if start_daemons:
264
    if no_voting:
265
      masterd_args = "--no-voting --yes-do-it"
266
    else:
267
      masterd_args = ""
268

    
269
    env = {
270
      "EXTRA_MASTERD_ARGS": masterd_args,
271
      }
272

    
273
    result = utils.RunCmd([constants.DAEMON_UTIL, "start-master"], env=env)
274
    if result.failed:
275
      msg = "Can't start Ganeti master: %s" % result.output
276
      logging.error(msg)
277
      err_msgs.append(msg)
278

    
279
  if err_msgs:
280
    _Fail("; ".join(err_msgs))
281

    
282

    
283
def StopMaster(stop_daemons):
284
  """Deactivate this node as master.
285

286
  The function will always try to deactivate the IP address of the
287
  master. It will also stop the master daemons depending on the
288
  stop_daemons parameter.
289

290
  @type stop_daemons: boolean
291
  @param stop_daemons: whether to also stop the master daemons
292
      (ganeti-masterd and ganeti-rapi)
293
  @rtype: None
294

295
  """
296
  # TODO: log and report back to the caller the error failures; we
297
  # need to decide in which case we fail the RPC for this
298

    
299
  # GetMasterInfo will raise an exception if not able to return data
300
  master_netdev, master_ip, _ = GetMasterInfo()
301

    
302
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
303
                         "dev", master_netdev])
304
  if result.failed:
305
    logging.error("Can't remove the master IP, error: %s", result.output)
306
    # but otherwise ignore the failure
307

    
308
  if stop_daemons:
309
    result = utils.RunCmd([constants.DAEMON_UTIL, "stop-master"])
310
    if result.failed:
311
      logging.error("Could not stop Ganeti master, command %s had exitcode %s"
312
                    " and error %s",
313
                    result.cmd, result.exit_code, result.output)
314

    
315

    
316
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
317
  """Joins this node to the cluster.
318

319
  This does the following:
320
      - updates the hostkeys of the machine (rsa and dsa)
321
      - adds the ssh private key to the user
322
      - adds the ssh public key to the users' authorized_keys file
323

324
  @type dsa: str
325
  @param dsa: the DSA private key to write
326
  @type dsapub: str
327
  @param dsapub: the DSA public key to write
328
  @type rsa: str
329
  @param rsa: the RSA private key to write
330
  @type rsapub: str
331
  @param rsapub: the RSA public key to write
332
  @type sshkey: str
333
  @param sshkey: the SSH private key to write
334
  @type sshpub: str
335
  @param sshpub: the SSH public key to write
336
  @rtype: boolean
337
  @return: the success of the operation
338

339
  """
340
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
341
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
342
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
343
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
344
  for name, content, mode in sshd_keys:
345
    utils.WriteFile(name, data=content, mode=mode)
346

    
347
  try:
348
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
349
                                                    mkdir=True)
350
  except errors.OpExecError, err:
351
    _Fail("Error while processing user ssh files: %s", err, exc=True)
352

    
353
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
354
    utils.WriteFile(name, data=content, mode=0600)
355

    
356
  utils.AddAuthorizedKey(auth_keys, sshpub)
357

    
358
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
359

    
360

    
361
def LeaveCluster(modify_ssh_setup):
362
  """Cleans up and remove the current node.
363

364
  This function cleans up and prepares the current node to be removed
365
  from the cluster.
366

367
  If processing is successful, then it raises an
368
  L{errors.QuitGanetiException} which is used as a special case to
369
  shutdown the node daemon.
370

371
  @param modify_ssh_setup: boolean
372

373
  """
374
  _CleanDirectory(constants.DATA_DIR)
375
  JobQueuePurge()
376

    
377
  if modify_ssh_setup:
378
    try:
379
      priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
380

    
381
      utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
382

    
383
      utils.RemoveFile(priv_key)
384
      utils.RemoveFile(pub_key)
385
    except errors.OpExecError:
386
      logging.exception("Error while processing ssh files")
387

    
388
  try:
389
    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
390
    utils.RemoveFile(constants.RAPI_CERT_FILE)
391
    utils.RemoveFile(constants.SSL_CERT_FILE)
392
  except:
393
    logging.exception("Error while removing cluster secrets")
394

    
395
  result = utils.RunCmd([constants.DAEMON_UTIL, "stop", constants.CONFD])
396
  if result.failed:
397
    logging.error("Command %s failed with exitcode %s and error %s",
398
                  result.cmd, result.exit_code, result.output)
399

    
400
  # Raise a custom exception (handled in ganeti-noded)
401
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
402

    
403

    
404
def GetNodeInfo(vgname, hypervisor_type):
405
  """Gives back a hash with different information about the node.
406

407
  @type vgname: C{string}
408
  @param vgname: the name of the volume group to ask for disk space information
409
  @type hypervisor_type: C{str}
410
  @param hypervisor_type: the name of the hypervisor to ask for
411
      memory information
412
  @rtype: C{dict}
413
  @return: dictionary with the following keys:
414
      - vg_size is the size of the configured volume group in MiB
415
      - vg_free is the free size of the volume group in MiB
416
      - memory_dom0 is the memory allocated for domain0 in MiB
417
      - memory_free is the currently available (free) ram in MiB
418
      - memory_total is the total number of ram in MiB
419

420
  """
421
  outputarray = {}
422
  vginfo = _GetVGInfo(vgname)
423
  outputarray['vg_size'] = vginfo['vg_size']
424
  outputarray['vg_free'] = vginfo['vg_free']
425

    
426
  hyper = hypervisor.GetHypervisor(hypervisor_type)
427
  hyp_info = hyper.GetNodeInfo()
428
  if hyp_info is not None:
429
    outputarray.update(hyp_info)
430

    
431
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
432

    
433
  return outputarray
434

    
435

    
436
def VerifyNode(what, cluster_name):
437
  """Verify the status of the local node.
438

439
  Based on the input L{what} parameter, various checks are done on the
440
  local node.
441

442
  If the I{filelist} key is present, this list of
443
  files is checksummed and the file/checksum pairs are returned.
444

445
  If the I{nodelist} key is present, we check that we have
446
  connectivity via ssh with the target nodes (and check the hostname
447
  report).
448

449
  If the I{node-net-test} key is present, we check that we have
450
  connectivity to the given nodes via both primary IP and, if
451
  applicable, secondary IPs.
452

453
  @type what: C{dict}
454
  @param what: a dictionary of things to check:
455
      - filelist: list of files for which to compute checksums
456
      - nodelist: list of nodes we should check ssh communication with
457
      - node-net-test: list of nodes we should check node daemon port
458
        connectivity with
459
      - hypervisor: list with hypervisors to run the verify for
460
  @rtype: dict
461
  @return: a dictionary with the same keys as the input dict, and
462
      values representing the result of the checks
463

464
  """
465
  result = {}
466

    
467
  if constants.NV_HYPERVISOR in what:
468
    result[constants.NV_HYPERVISOR] = tmp = {}
469
    for hv_name in what[constants.NV_HYPERVISOR]:
470
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
471

    
472
  if constants.NV_FILELIST in what:
473
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
474
      what[constants.NV_FILELIST])
475

    
476
  if constants.NV_NODELIST in what:
477
    result[constants.NV_NODELIST] = tmp = {}
478
    random.shuffle(what[constants.NV_NODELIST])
479
    for node in what[constants.NV_NODELIST]:
480
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
481
      if not success:
482
        tmp[node] = message
483

    
484
  if constants.NV_NODENETTEST in what:
485
    result[constants.NV_NODENETTEST] = tmp = {}
486
    my_name = utils.HostInfo().name
487
    my_pip = my_sip = None
488
    for name, pip, sip in what[constants.NV_NODENETTEST]:
489
      if name == my_name:
490
        my_pip = pip
491
        my_sip = sip
492
        break
493
    if not my_pip:
494
      tmp[my_name] = ("Can't find my own primary/secondary IP"
495
                      " in the node list")
496
    else:
497
      port = utils.GetDaemonPort(constants.NODED)
498
      for name, pip, sip in what[constants.NV_NODENETTEST]:
499
        fail = []
500
        if not utils.TcpPing(pip, port, source=my_pip):
501
          fail.append("primary")
502
        if sip != pip:
503
          if not utils.TcpPing(sip, port, source=my_sip):
504
            fail.append("secondary")
505
        if fail:
506
          tmp[name] = ("failure using the %s interface(s)" %
507
                       " and ".join(fail))
508

    
509
  if constants.NV_LVLIST in what:
510
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
511

    
512
  if constants.NV_INSTANCELIST in what:
513
    result[constants.NV_INSTANCELIST] = GetInstanceList(
514
      what[constants.NV_INSTANCELIST])
515

    
516
  if constants.NV_VGLIST in what:
517
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
518

    
519
  if constants.NV_PVLIST in what:
520
    result[constants.NV_PVLIST] = \
521
      bdev.LogicalVolume.GetPVInfo(what[constants.NV_PVLIST],
522
                                   filter_allocatable=False)
523

    
524
  if constants.NV_VERSION in what:
525
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
526
                                    constants.RELEASE_VERSION)
527

    
528
  if constants.NV_HVINFO in what:
529
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
530
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
531

    
532
  if constants.NV_DRBDLIST in what:
533
    try:
534
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
535
    except errors.BlockDeviceError, err:
536
      logging.warning("Can't get used minors list", exc_info=True)
537
      used_minors = str(err)
538
    result[constants.NV_DRBDLIST] = used_minors
539

    
540
  if constants.NV_NODESETUP in what:
541
    result[constants.NV_NODESETUP] = tmpr = []
542
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
543
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
544
                  " under /sys, missing required directories /sys/block"
545
                  " and /sys/class/net")
546
    if (not os.path.isdir("/proc/sys") or
547
        not os.path.isfile("/proc/sysrq-trigger")):
548
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
549
                  " under /proc, missing required directory /proc/sys and"
550
                  " the file /proc/sysrq-trigger")
551
  return result
552

    
553

    
554
def GetVolumeList(vg_name):
555
  """Compute list of logical volumes and their size.
556

557
  @type vg_name: str
558
  @param vg_name: the volume group whose LVs we should list
559
  @rtype: dict
560
  @return:
561
      dictionary of all partions (key) with value being a tuple of
562
      their size (in MiB), inactive and online status::
563

564
        {'test1': ('20.06', True, True)}
565

566
      in case of errors, a string is returned with the error
567
      details.
568

569
  """
570
  lvs = {}
571
  sep = '|'
572
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
573
                         "--separator=%s" % sep,
574
                         "-olv_name,lv_size,lv_attr", vg_name])
575
  if result.failed:
576
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
577

    
578
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
579
  for line in result.stdout.splitlines():
580
    line = line.strip()
581
    match = valid_line_re.match(line)
582
    if not match:
583
      logging.error("Invalid line returned from lvs output: '%s'", line)
584
      continue
585
    name, size, attr = match.groups()
586
    inactive = attr[4] == '-'
587
    online = attr[5] == 'o'
588
    virtual = attr[0] == 'v'
589
    if virtual:
590
      # we don't want to report such volumes as existing, since they
591
      # don't really hold data
592
      continue
593
    lvs[name] = (size, inactive, online)
594

    
595
  return lvs
596

    
597

    
598
def ListVolumeGroups():
599
  """List the volume groups and their size.
600

601
  @rtype: dict
602
  @return: dictionary with keys volume name and values the
603
      size of the volume
604

605
  """
606
  return utils.ListVolumeGroups()
607

    
608

    
609
def NodeVolumes():
610
  """List all volumes on this node.
611

612
  @rtype: list
613
  @return:
614
    A list of dictionaries, each having four keys:
615
      - name: the logical volume name,
616
      - size: the size of the logical volume
617
      - dev: the physical device on which the LV lives
618
      - vg: the volume group to which it belongs
619

620
    In case of errors, we return an empty list and log the
621
    error.
622

623
    Note that since a logical volume can live on multiple physical
624
    volumes, the resulting list might include a logical volume
625
    multiple times.
626

627
  """
628
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
629
                         "--separator=|",
630
                         "--options=lv_name,lv_size,devices,vg_name"])
631
  if result.failed:
632
    _Fail("Failed to list logical volumes, lvs output: %s",
633
          result.output)
634

    
635
  def parse_dev(dev):
636
    if '(' in dev:
637
      return dev.split('(')[0]
638
    else:
639
      return dev
640

    
641
  def map_line(line):
642
    return {
643
      'name': line[0].strip(),
644
      'size': line[1].strip(),
645
      'dev': parse_dev(line[2].strip()),
646
      'vg': line[3].strip(),
647
    }
648

    
649
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
650
          if line.count('|') >= 3]
651

    
652

    
653
def BridgesExist(bridges_list):
654
  """Check if a list of bridges exist on the current node.
655

656
  @rtype: boolean
657
  @return: C{True} if all of them exist, C{False} otherwise
658

659
  """
660
  missing = []
661
  for bridge in bridges_list:
662
    if not utils.BridgeExists(bridge):
663
      missing.append(bridge)
664

    
665
  if missing:
666
    _Fail("Missing bridges %s", ", ".join(missing))
667

    
668

    
669
def GetInstanceList(hypervisor_list):
670
  """Provides a list of instances.
671

672
  @type hypervisor_list: list
673
  @param hypervisor_list: the list of hypervisors to query information
674

675
  @rtype: list
676
  @return: a list of all running instances on the current node
677
    - instance1.example.com
678
    - instance2.example.com
679

680
  """
681
  results = []
682
  for hname in hypervisor_list:
683
    try:
684
      names = hypervisor.GetHypervisor(hname).ListInstances()
685
      results.extend(names)
686
    except errors.HypervisorError, err:
687
      _Fail("Error enumerating instances (hypervisor %s): %s",
688
            hname, err, exc=True)
689

    
690
  return results
691

    
692

    
693
def GetInstanceInfo(instance, hname):
694
  """Gives back the information about an instance as a dictionary.
695

696
  @type instance: string
697
  @param instance: the instance name
698
  @type hname: string
699
  @param hname: the hypervisor type of the instance
700

701
  @rtype: dict
702
  @return: dictionary with the following keys:
703
      - memory: memory size of instance (int)
704
      - state: xen state of instance (string)
705
      - time: cpu time of instance (float)
706

707
  """
708
  output = {}
709

    
710
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
711
  if iinfo is not None:
712
    output['memory'] = iinfo[2]
713
    output['state'] = iinfo[4]
714
    output['time'] = iinfo[5]
715

    
716
  return output
717

    
718

    
719
def GetInstanceMigratable(instance):
720
  """Gives whether an instance can be migrated.
721

722
  @type instance: L{objects.Instance}
723
  @param instance: object representing the instance to be checked.
724

725
  @rtype: tuple
726
  @return: tuple of (result, description) where:
727
      - result: whether the instance can be migrated or not
728
      - description: a description of the issue, if relevant
729

730
  """
731
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
732
  iname = instance.name
733
  if iname not in hyper.ListInstances():
734
    _Fail("Instance %s is not running", iname)
735

    
736
  for idx in range(len(instance.disks)):
737
    link_name = _GetBlockDevSymlinkPath(iname, idx)
738
    if not os.path.islink(link_name):
739
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
740

    
741

    
742
def GetAllInstancesInfo(hypervisor_list):
743
  """Gather data about all instances.
744

745
  This is the equivalent of L{GetInstanceInfo}, except that it
746
  computes data for all instances at once, thus being faster if one
747
  needs data about more than one instance.
748

749
  @type hypervisor_list: list
750
  @param hypervisor_list: list of hypervisors to query for instance data
751

752
  @rtype: dict
753
  @return: dictionary of instance: data, with data having the following keys:
754
      - memory: memory size of instance (int)
755
      - state: xen state of instance (string)
756
      - time: cpu time of instance (float)
757
      - vcpus: the number of vcpus
758

759
  """
760
  output = {}
761

    
762
  for hname in hypervisor_list:
763
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
764
    if iinfo:
765
      for name, _, memory, vcpus, state, times in iinfo:
766
        value = {
767
          'memory': memory,
768
          'vcpus': vcpus,
769
          'state': state,
770
          'time': times,
771
          }
772
        if name in output:
773
          # we only check static parameters, like memory and vcpus,
774
          # and not state and time which can change between the
775
          # invocations of the different hypervisors
776
          for key in 'memory', 'vcpus':
777
            if value[key] != output[name][key]:
778
              _Fail("Instance %s is running twice"
779
                    " with different parameters", name)
780
        output[name] = value
781

    
782
  return output
783

    
784

    
785
def InstanceOsAdd(instance, reinstall):
786
  """Add an OS to an instance.
787

788
  @type instance: L{objects.Instance}
789
  @param instance: Instance whose OS is to be installed
790
  @type reinstall: boolean
791
  @param reinstall: whether this is an instance reinstall
792
  @rtype: None
793

794
  """
795
  inst_os = OSFromDisk(instance.os)
796

    
797
  create_env = OSEnvironment(instance, inst_os)
798
  if reinstall:
799
    create_env['INSTANCE_REINSTALL'] = "1"
800

    
801
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
802
                                     instance.name, int(time.time()))
803

    
804
  result = utils.RunCmd([inst_os.create_script], env=create_env,
805
                        cwd=inst_os.path, output=logfile,)
806
  if result.failed:
807
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
808
                  " output: %s", result.cmd, result.fail_reason, logfile,
809
                  result.output)
810
    lines = [utils.SafeEncode(val)
811
             for val in utils.TailFile(logfile, lines=20)]
812
    _Fail("OS create script failed (%s), last lines in the"
813
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
814

    
815

    
816
def RunRenameInstance(instance, old_name):
817
  """Run the OS rename script for an instance.
818

819
  @type instance: L{objects.Instance}
820
  @param instance: Instance whose OS is to be installed
821
  @type old_name: string
822
  @param old_name: previous instance name
823
  @rtype: boolean
824
  @return: the success of the operation
825

826
  """
827
  inst_os = OSFromDisk(instance.os)
828

    
829
  rename_env = OSEnvironment(instance, inst_os)
830
  rename_env['OLD_INSTANCE_NAME'] = old_name
831

    
832
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
833
                                           old_name,
834
                                           instance.name, int(time.time()))
835

    
836
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
837
                        cwd=inst_os.path, output=logfile)
838

    
839
  if result.failed:
840
    logging.error("os create command '%s' returned error: %s output: %s",
841
                  result.cmd, result.fail_reason, result.output)
842
    lines = [utils.SafeEncode(val)
843
             for val in utils.TailFile(logfile, lines=20)]
844
    _Fail("OS rename script failed (%s), last lines in the"
845
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
846

    
847

    
848
def _GetVGInfo(vg_name):
849
  """Get information about the volume group.
850

851
  @type vg_name: str
852
  @param vg_name: the volume group which we query
853
  @rtype: dict
854
  @return:
855
    A dictionary with the following keys:
856
      - C{vg_size} is the total size of the volume group in MiB
857
      - C{vg_free} is the free size of the volume group in MiB
858
      - C{pv_count} are the number of physical disks in that VG
859

860
    If an error occurs during gathering of data, we return the same dict
861
    with keys all set to None.
862

863
  """
864
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
865

    
866
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
867
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
868

    
869
  if retval.failed:
870
    logging.error("volume group %s not present", vg_name)
871
    return retdic
872
  valarr = retval.stdout.strip().rstrip(':').split(':')
873
  if len(valarr) == 3:
874
    try:
875
      retdic = {
876
        "vg_size": int(round(float(valarr[0]), 0)),
877
        "vg_free": int(round(float(valarr[1]), 0)),
878
        "pv_count": int(valarr[2]),
879
        }
880
    except ValueError, err:
881
      logging.exception("Fail to parse vgs output: %s", err)
882
  else:
883
    logging.error("vgs output has the wrong number of fields (expected"
884
                  " three): %s", str(valarr))
885
  return retdic
886

    
887

    
888
def _GetBlockDevSymlinkPath(instance_name, idx):
889
  return os.path.join(constants.DISK_LINKS_DIR,
890
                      "%s:%d" % (instance_name, idx))
891

    
892

    
893
def _SymlinkBlockDev(instance_name, device_path, idx):
894
  """Set up symlinks to a instance's block device.
895

896
  This is an auxiliary function run when an instance is start (on the primary
897
  node) or when an instance is migrated (on the target node).
898

899

900
  @param instance_name: the name of the target instance
901
  @param device_path: path of the physical block device, on the node
902
  @param idx: the disk index
903
  @return: absolute path to the disk's symlink
904

905
  """
906
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
907
  try:
908
    os.symlink(device_path, link_name)
909
  except OSError, err:
910
    if err.errno == errno.EEXIST:
911
      if (not os.path.islink(link_name) or
912
          os.readlink(link_name) != device_path):
913
        os.remove(link_name)
914
        os.symlink(device_path, link_name)
915
    else:
916
      raise
917

    
918
  return link_name
919

    
920

    
921
def _RemoveBlockDevLinks(instance_name, disks):
922
  """Remove the block device symlinks belonging to the given instance.
923

924
  """
925
  for idx, _ in enumerate(disks):
926
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
927
    if os.path.islink(link_name):
928
      try:
929
        os.remove(link_name)
930
      except OSError:
931
        logging.exception("Can't remove symlink '%s'", link_name)
932

    
933

    
934
def _GatherAndLinkBlockDevs(instance):
935
  """Set up an instance's block device(s).
936

937
  This is run on the primary node at instance startup. The block
938
  devices must be already assembled.
939

940
  @type instance: L{objects.Instance}
941
  @param instance: the instance whose disks we shoul assemble
942
  @rtype: list
943
  @return: list of (disk_object, device_path)
944

945
  """
946
  block_devices = []
947
  for idx, disk in enumerate(instance.disks):
948
    device = _RecursiveFindBD(disk)
949
    if device is None:
950
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
951
                                    str(disk))
952
    device.Open()
953
    try:
954
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
955
    except OSError, e:
956
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
957
                                    e.strerror)
958

    
959
    block_devices.append((disk, link_name))
960

    
961
  return block_devices
962

    
963

    
964
def StartInstance(instance):
965
  """Start an instance.
966

967
  @type instance: L{objects.Instance}
968
  @param instance: the instance object
969
  @rtype: None
970

971
  """
972
  running_instances = GetInstanceList([instance.hypervisor])
973

    
974
  if instance.name in running_instances:
975
    logging.info("Instance %s already running, not starting", instance.name)
976
    return
977

    
978
  try:
979
    block_devices = _GatherAndLinkBlockDevs(instance)
980
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
981
    hyper.StartInstance(instance, block_devices)
982
  except errors.BlockDeviceError, err:
983
    _Fail("Block device error: %s", err, exc=True)
984
  except errors.HypervisorError, err:
985
    _RemoveBlockDevLinks(instance.name, instance.disks)
986
    _Fail("Hypervisor error: %s", err, exc=True)
987

    
988

    
989
def InstanceShutdown(instance, timeout):
990
  """Shut an instance down.
991

992
  @note: this functions uses polling with a hardcoded timeout.
993

994
  @type instance: L{objects.Instance}
995
  @param instance: the instance object
996
  @type timeout: integer
997
  @param timeout: maximum timeout for soft shutdown
998
  @rtype: None
999

1000
  """
1001
  hv_name = instance.hypervisor
1002
  hyper = hypervisor.GetHypervisor(hv_name)
1003
  iname = instance.name
1004

    
1005
  if instance.name not in hyper.ListInstances():
1006
    logging.info("Instance %s not running, doing nothing", iname)
1007
    return
1008

    
1009
  class _TryShutdown:
1010
    def __init__(self):
1011
      self.tried_once = False
1012

    
1013
    def __call__(self):
1014
      if iname not in hyper.ListInstances():
1015
        return
1016

    
1017
      try:
1018
        hyper.StopInstance(instance, retry=self.tried_once)
1019
      except errors.HypervisorError, err:
1020
        if iname not in hyper.ListInstances():
1021
          # if the instance is no longer existing, consider this a
1022
          # success and go to cleanup
1023
          return
1024

    
1025
        _Fail("Failed to stop instance %s: %s", iname, err)
1026

    
1027
      self.tried_once = True
1028

    
1029
      raise utils.RetryAgain()
1030

    
1031
  try:
1032
    utils.Retry(_TryShutdown(), 5, timeout)
1033
  except utils.RetryTimeout:
1034
    # the shutdown did not succeed
1035
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1036

    
1037
    try:
1038
      hyper.StopInstance(instance, force=True)
1039
    except errors.HypervisorError, err:
1040
      if iname in hyper.ListInstances():
1041
        # only raise an error if the instance still exists, otherwise
1042
        # the error could simply be "instance ... unknown"!
1043
        _Fail("Failed to force stop instance %s: %s", iname, err)
1044

    
1045
    time.sleep(1)
1046

    
1047
    if iname in hyper.ListInstances():
1048
      _Fail("Could not shutdown instance %s even by destroy", iname)
1049

    
1050
  _RemoveBlockDevLinks(iname, instance.disks)
1051

    
1052

    
1053
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1054
  """Reboot an instance.
1055

1056
  @type instance: L{objects.Instance}
1057
  @param instance: the instance object to reboot
1058
  @type reboot_type: str
1059
  @param reboot_type: the type of reboot, one the following
1060
    constants:
1061
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1062
        instance OS, do not recreate the VM
1063
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1064
        restart the VM (at the hypervisor level)
1065
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1066
        not accepted here, since that mode is handled differently, in
1067
        cmdlib, and translates into full stop and start of the
1068
        instance (instead of a call_instance_reboot RPC)
1069
  @type shutdown_timeout: integer
1070
  @param shutdown_timeout: maximum timeout for soft shutdown
1071
  @rtype: None
1072

1073
  """
1074
  running_instances = GetInstanceList([instance.hypervisor])
1075

    
1076
  if instance.name not in running_instances:
1077
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1078

    
1079
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1080
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1081
    try:
1082
      hyper.RebootInstance(instance)
1083
    except errors.HypervisorError, err:
1084
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1085
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1086
    try:
1087
      InstanceShutdown(instance, shutdown_timeout)
1088
      return StartInstance(instance)
1089
    except errors.HypervisorError, err:
1090
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1091
  else:
1092
    _Fail("Invalid reboot_type received: %s", reboot_type)
1093

    
1094

    
1095
def MigrationInfo(instance):
1096
  """Gather information about an instance to be migrated.
1097

1098
  @type instance: L{objects.Instance}
1099
  @param instance: the instance definition
1100

1101
  """
1102
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1103
  try:
1104
    info = hyper.MigrationInfo(instance)
1105
  except errors.HypervisorError, err:
1106
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1107
  return info
1108

    
1109

    
1110
def AcceptInstance(instance, info, target):
1111
  """Prepare the node to accept an instance.
1112

1113
  @type instance: L{objects.Instance}
1114
  @param instance: the instance definition
1115
  @type info: string/data (opaque)
1116
  @param info: migration information, from the source node
1117
  @type target: string
1118
  @param target: target host (usually ip), on this node
1119

1120
  """
1121
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1122
  try:
1123
    hyper.AcceptInstance(instance, info, target)
1124
  except errors.HypervisorError, err:
1125
    _Fail("Failed to accept instance: %s", err, exc=True)
1126

    
1127

    
1128
def FinalizeMigration(instance, info, success):
1129
  """Finalize any preparation to accept an instance.
1130

1131
  @type instance: L{objects.Instance}
1132
  @param instance: the instance definition
1133
  @type info: string/data (opaque)
1134
  @param info: migration information, from the source node
1135
  @type success: boolean
1136
  @param success: whether the migration was a success or a failure
1137

1138
  """
1139
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1140
  try:
1141
    hyper.FinalizeMigration(instance, info, success)
1142
  except errors.HypervisorError, err:
1143
    _Fail("Failed to finalize migration: %s", err, exc=True)
1144

    
1145

    
1146
def MigrateInstance(instance, target, live):
1147
  """Migrates an instance to another node.
1148

1149
  @type instance: L{objects.Instance}
1150
  @param instance: the instance definition
1151
  @type target: string
1152
  @param target: the target node name
1153
  @type live: boolean
1154
  @param live: whether the migration should be done live or not (the
1155
      interpretation of this parameter is left to the hypervisor)
1156
  @rtype: tuple
1157
  @return: a tuple of (success, msg) where:
1158
      - succes is a boolean denoting the success/failure of the operation
1159
      - msg is a string with details in case of failure
1160

1161
  """
1162
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1163

    
1164
  try:
1165
    hyper.MigrateInstance(instance, target, live)
1166
  except errors.HypervisorError, err:
1167
    _Fail("Failed to migrate instance: %s", err, exc=True)
1168

    
1169

    
1170
def BlockdevCreate(disk, size, owner, on_primary, info):
1171
  """Creates a block device for an instance.
1172

1173
  @type disk: L{objects.Disk}
1174
  @param disk: the object describing the disk we should create
1175
  @type size: int
1176
  @param size: the size of the physical underlying device, in MiB
1177
  @type owner: str
1178
  @param owner: the name of the instance for which disk is created,
1179
      used for device cache data
1180
  @type on_primary: boolean
1181
  @param on_primary:  indicates if it is the primary node or not
1182
  @type info: string
1183
  @param info: string that will be sent to the physical device
1184
      creation, used for example to set (LVM) tags on LVs
1185

1186
  @return: the new unique_id of the device (this can sometime be
1187
      computed only after creation), or None. On secondary nodes,
1188
      it's not required to return anything.
1189

1190
  """
1191
  clist = []
1192
  if disk.children:
1193
    for child in disk.children:
1194
      try:
1195
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1196
      except errors.BlockDeviceError, err:
1197
        _Fail("Can't assemble device %s: %s", child, err)
1198
      if on_primary or disk.AssembleOnSecondary():
1199
        # we need the children open in case the device itself has to
1200
        # be assembled
1201
        try:
1202
          crdev.Open()
1203
        except errors.BlockDeviceError, err:
1204
          _Fail("Can't make child '%s' read-write: %s", child, err)
1205
      clist.append(crdev)
1206

    
1207
  try:
1208
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1209
  except errors.BlockDeviceError, err:
1210
    _Fail("Can't create block device: %s", err)
1211

    
1212
  if on_primary or disk.AssembleOnSecondary():
1213
    try:
1214
      device.Assemble()
1215
    except errors.BlockDeviceError, err:
1216
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1217
    device.SetSyncSpeed(constants.SYNC_SPEED)
1218
    if on_primary or disk.OpenOnSecondary():
1219
      try:
1220
        device.Open(force=True)
1221
      except errors.BlockDeviceError, err:
1222
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1223
    DevCacheManager.UpdateCache(device.dev_path, owner,
1224
                                on_primary, disk.iv_name)
1225

    
1226
  device.SetInfo(info)
1227

    
1228
  return device.unique_id
1229

    
1230

    
1231
def BlockdevRemove(disk):
1232
  """Remove a block device.
1233

1234
  @note: This is intended to be called recursively.
1235

1236
  @type disk: L{objects.Disk}
1237
  @param disk: the disk object we should remove
1238
  @rtype: boolean
1239
  @return: the success of the operation
1240

1241
  """
1242
  msgs = []
1243
  try:
1244
    rdev = _RecursiveFindBD(disk)
1245
  except errors.BlockDeviceError, err:
1246
    # probably can't attach
1247
    logging.info("Can't attach to device %s in remove", disk)
1248
    rdev = None
1249
  if rdev is not None:
1250
    r_path = rdev.dev_path
1251
    try:
1252
      rdev.Remove()
1253
    except errors.BlockDeviceError, err:
1254
      msgs.append(str(err))
1255
    if not msgs:
1256
      DevCacheManager.RemoveCache(r_path)
1257

    
1258
  if disk.children:
1259
    for child in disk.children:
1260
      try:
1261
        BlockdevRemove(child)
1262
      except RPCFail, err:
1263
        msgs.append(str(err))
1264

    
1265
  if msgs:
1266
    _Fail("; ".join(msgs))
1267

    
1268

    
1269
def _RecursiveAssembleBD(disk, owner, as_primary):
1270
  """Activate a block device for an instance.
1271

1272
  This is run on the primary and secondary nodes for an instance.
1273

1274
  @note: this function is called recursively.
1275

1276
  @type disk: L{objects.Disk}
1277
  @param disk: the disk we try to assemble
1278
  @type owner: str
1279
  @param owner: the name of the instance which owns the disk
1280
  @type as_primary: boolean
1281
  @param as_primary: if we should make the block device
1282
      read/write
1283

1284
  @return: the assembled device or None (in case no device
1285
      was assembled)
1286
  @raise errors.BlockDeviceError: in case there is an error
1287
      during the activation of the children or the device
1288
      itself
1289

1290
  """
1291
  children = []
1292
  if disk.children:
1293
    mcn = disk.ChildrenNeeded()
1294
    if mcn == -1:
1295
      mcn = 0 # max number of Nones allowed
1296
    else:
1297
      mcn = len(disk.children) - mcn # max number of Nones
1298
    for chld_disk in disk.children:
1299
      try:
1300
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1301
      except errors.BlockDeviceError, err:
1302
        if children.count(None) >= mcn:
1303
          raise
1304
        cdev = None
1305
        logging.error("Error in child activation (but continuing): %s",
1306
                      str(err))
1307
      children.append(cdev)
1308

    
1309
  if as_primary or disk.AssembleOnSecondary():
1310
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1311
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1312
    result = r_dev
1313
    if as_primary or disk.OpenOnSecondary():
1314
      r_dev.Open()
1315
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1316
                                as_primary, disk.iv_name)
1317

    
1318
  else:
1319
    result = True
1320
  return result
1321

    
1322

    
1323
def BlockdevAssemble(disk, owner, as_primary):
1324
  """Activate a block device for an instance.
1325

1326
  This is a wrapper over _RecursiveAssembleBD.
1327

1328
  @rtype: str or boolean
1329
  @return: a C{/dev/...} path for primary nodes, and
1330
      C{True} for secondary nodes
1331

1332
  """
1333
  try:
1334
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1335
    if isinstance(result, bdev.BlockDev):
1336
      result = result.dev_path
1337
  except errors.BlockDeviceError, err:
1338
    _Fail("Error while assembling disk: %s", err, exc=True)
1339

    
1340
  return result
1341

    
1342

    
1343
def BlockdevShutdown(disk):
1344
  """Shut down a block device.
1345

1346
  First, if the device is assembled (Attach() is successful), then
1347
  the device is shutdown. Then the children of the device are
1348
  shutdown.
1349

1350
  This function is called recursively. Note that we don't cache the
1351
  children or such, as oppossed to assemble, shutdown of different
1352
  devices doesn't require that the upper device was active.
1353

1354
  @type disk: L{objects.Disk}
1355
  @param disk: the description of the disk we should
1356
      shutdown
1357
  @rtype: None
1358

1359
  """
1360
  msgs = []
1361
  r_dev = _RecursiveFindBD(disk)
1362
  if r_dev is not None:
1363
    r_path = r_dev.dev_path
1364
    try:
1365
      r_dev.Shutdown()
1366
      DevCacheManager.RemoveCache(r_path)
1367
    except errors.BlockDeviceError, err:
1368
      msgs.append(str(err))
1369

    
1370
  if disk.children:
1371
    for child in disk.children:
1372
      try:
1373
        BlockdevShutdown(child)
1374
      except RPCFail, err:
1375
        msgs.append(str(err))
1376

    
1377
  if msgs:
1378
    _Fail("; ".join(msgs))
1379

    
1380

    
1381
def BlockdevAddchildren(parent_cdev, new_cdevs):
1382
  """Extend a mirrored block device.
1383

1384
  @type parent_cdev: L{objects.Disk}
1385
  @param parent_cdev: the disk to which we should add children
1386
  @type new_cdevs: list of L{objects.Disk}
1387
  @param new_cdevs: the list of children which we should add
1388
  @rtype: None
1389

1390
  """
1391
  parent_bdev = _RecursiveFindBD(parent_cdev)
1392
  if parent_bdev is None:
1393
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1394
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1395
  if new_bdevs.count(None) > 0:
1396
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1397
  parent_bdev.AddChildren(new_bdevs)
1398

    
1399

    
1400
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1401
  """Shrink a mirrored block device.
1402

1403
  @type parent_cdev: L{objects.Disk}
1404
  @param parent_cdev: the disk from which we should remove children
1405
  @type new_cdevs: list of L{objects.Disk}
1406
  @param new_cdevs: the list of children which we should remove
1407
  @rtype: None
1408

1409
  """
1410
  parent_bdev = _RecursiveFindBD(parent_cdev)
1411
  if parent_bdev is None:
1412
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1413
  devs = []
1414
  for disk in new_cdevs:
1415
    rpath = disk.StaticDevPath()
1416
    if rpath is None:
1417
      bd = _RecursiveFindBD(disk)
1418
      if bd is None:
1419
        _Fail("Can't find device %s while removing children", disk)
1420
      else:
1421
        devs.append(bd.dev_path)
1422
    else:
1423
      devs.append(rpath)
1424
  parent_bdev.RemoveChildren(devs)
1425

    
1426

    
1427
def BlockdevGetmirrorstatus(disks):
1428
  """Get the mirroring status of a list of devices.
1429

1430
  @type disks: list of L{objects.Disk}
1431
  @param disks: the list of disks which we should query
1432
  @rtype: disk
1433
  @return:
1434
      a list of (mirror_done, estimated_time) tuples, which
1435
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1436
  @raise errors.BlockDeviceError: if any of the disks cannot be
1437
      found
1438

1439
  """
1440
  stats = []
1441
  for dsk in disks:
1442
    rbd = _RecursiveFindBD(dsk)
1443
    if rbd is None:
1444
      _Fail("Can't find device %s", dsk)
1445

    
1446
    stats.append(rbd.CombinedSyncStatus())
1447

    
1448
  return stats
1449

    
1450

    
1451
def _RecursiveFindBD(disk):
1452
  """Check if a device is activated.
1453

1454
  If so, return information about the real device.
1455

1456
  @type disk: L{objects.Disk}
1457
  @param disk: the disk object we need to find
1458

1459
  @return: None if the device can't be found,
1460
      otherwise the device instance
1461

1462
  """
1463
  children = []
1464
  if disk.children:
1465
    for chdisk in disk.children:
1466
      children.append(_RecursiveFindBD(chdisk))
1467

    
1468
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1469

    
1470

    
1471
def BlockdevFind(disk):
1472
  """Check if a device is activated.
1473

1474
  If it is, return information about the real device.
1475

1476
  @type disk: L{objects.Disk}
1477
  @param disk: the disk to find
1478
  @rtype: None or objects.BlockDevStatus
1479
  @return: None if the disk cannot be found, otherwise a the current
1480
           information
1481

1482
  """
1483
  try:
1484
    rbd = _RecursiveFindBD(disk)
1485
  except errors.BlockDeviceError, err:
1486
    _Fail("Failed to find device: %s", err, exc=True)
1487

    
1488
  if rbd is None:
1489
    return None
1490

    
1491
  return rbd.GetSyncStatus()
1492

    
1493

    
1494
def BlockdevGetsize(disks):
1495
  """Computes the size of the given disks.
1496

1497
  If a disk is not found, returns None instead.
1498

1499
  @type disks: list of L{objects.Disk}
1500
  @param disks: the list of disk to compute the size for
1501
  @rtype: list
1502
  @return: list with elements None if the disk cannot be found,
1503
      otherwise the size
1504

1505
  """
1506
  result = []
1507
  for cf in disks:
1508
    try:
1509
      rbd = _RecursiveFindBD(cf)
1510
    except errors.BlockDeviceError, err:
1511
      result.append(None)
1512
      continue
1513
    if rbd is None:
1514
      result.append(None)
1515
    else:
1516
      result.append(rbd.GetActualSize())
1517
  return result
1518

    
1519

    
1520
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1521
  """Export a block device to a remote node.
1522

1523
  @type disk: L{objects.Disk}
1524
  @param disk: the description of the disk to export
1525
  @type dest_node: str
1526
  @param dest_node: the destination node to export to
1527
  @type dest_path: str
1528
  @param dest_path: the destination path on the target node
1529
  @type cluster_name: str
1530
  @param cluster_name: the cluster name, needed for SSH hostalias
1531
  @rtype: None
1532

1533
  """
1534
  real_disk = _RecursiveFindBD(disk)
1535
  if real_disk is None:
1536
    _Fail("Block device '%s' is not set up", disk)
1537

    
1538
  real_disk.Open()
1539

    
1540
  # the block size on the read dd is 1MiB to match our units
1541
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1542
                               "dd if=%s bs=1048576 count=%s",
1543
                               real_disk.dev_path, str(disk.size))
1544

    
1545
  # we set here a smaller block size as, due to ssh buffering, more
1546
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1547
  # device is not already there or we pass a wrong path; we use
1548
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1549
  # to not buffer too much memory; this means that at best, we flush
1550
  # every 64k, which will not be very fast
1551
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1552
                                " oflag=dsync", dest_path)
1553

    
1554
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1555
                                                   constants.GANETI_RUNAS,
1556
                                                   destcmd)
1557

    
1558
  # all commands have been checked, so we're safe to combine them
1559
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1560

    
1561
  result = utils.RunCmd(["bash", "-c", command])
1562

    
1563
  if result.failed:
1564
    _Fail("Disk copy command '%s' returned error: %s"
1565
          " output: %s", command, result.fail_reason, result.output)
1566

    
1567

    
1568
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1569
  """Write a file to the filesystem.
1570

1571
  This allows the master to overwrite(!) a file. It will only perform
1572
  the operation if the file belongs to a list of configuration files.
1573

1574
  @type file_name: str
1575
  @param file_name: the target file name
1576
  @type data: str
1577
  @param data: the new contents of the file
1578
  @type mode: int
1579
  @param mode: the mode to give the file (can be None)
1580
  @type uid: int
1581
  @param uid: the owner of the file (can be -1 for default)
1582
  @type gid: int
1583
  @param gid: the group of the file (can be -1 for default)
1584
  @type atime: float
1585
  @param atime: the atime to set on the file (can be None)
1586
  @type mtime: float
1587
  @param mtime: the mtime to set on the file (can be None)
1588
  @rtype: None
1589

1590
  """
1591
  if not os.path.isabs(file_name):
1592
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1593

    
1594
  if file_name not in _ALLOWED_UPLOAD_FILES:
1595
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1596
          file_name)
1597

    
1598
  raw_data = _Decompress(data)
1599

    
1600
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1601
                  atime=atime, mtime=mtime)
1602

    
1603

    
1604
def WriteSsconfFiles(values):
1605
  """Update all ssconf files.
1606

1607
  Wrapper around the SimpleStore.WriteFiles.
1608

1609
  """
1610
  ssconf.SimpleStore().WriteFiles(values)
1611

    
1612

    
1613
def _ErrnoOrStr(err):
1614
  """Format an EnvironmentError exception.
1615

1616
  If the L{err} argument has an errno attribute, it will be looked up
1617
  and converted into a textual C{E...} description. Otherwise the
1618
  string representation of the error will be returned.
1619

1620
  @type err: L{EnvironmentError}
1621
  @param err: the exception to format
1622

1623
  """
1624
  if hasattr(err, 'errno'):
1625
    detail = errno.errorcode[err.errno]
1626
  else:
1627
    detail = str(err)
1628
  return detail
1629

    
1630

    
1631
def _OSOndiskAPIVersion(name, os_dir):
1632
  """Compute and return the API version of a given OS.
1633

1634
  This function will try to read the API version of the OS given by
1635
  the 'name' parameter and residing in the 'os_dir' directory.
1636

1637
  @type name: str
1638
  @param name: the OS name we should look for
1639
  @type os_dir: str
1640
  @param os_dir: the directory inwhich we should look for the OS
1641
  @rtype: tuple
1642
  @return: tuple (status, data) with status denoting the validity and
1643
      data holding either the vaid versions or an error message
1644

1645
  """
1646
  api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1647

    
1648
  try:
1649
    st = os.stat(api_file)
1650
  except EnvironmentError, err:
1651
    return False, ("Required file '%s' not found under path %s: %s" %
1652
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1653

    
1654
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1655
    return False, ("File '%s' in %s is not a regular file" %
1656
                   (constants.OS_API_FILE, os_dir))
1657

    
1658
  try:
1659
    api_versions = utils.ReadFile(api_file).splitlines()
1660
  except EnvironmentError, err:
1661
    return False, ("Error while reading the API version file at %s: %s" %
1662
                   (api_file, _ErrnoOrStr(err)))
1663

    
1664
  try:
1665
    api_versions = [int(version.strip()) for version in api_versions]
1666
  except (TypeError, ValueError), err:
1667
    return False, ("API version(s) can't be converted to integer: %s" %
1668
                   str(err))
1669

    
1670
  return True, api_versions
1671

    
1672

    
1673
def DiagnoseOS(top_dirs=None):
1674
  """Compute the validity for all OSes.
1675

1676
  @type top_dirs: list
1677
  @param top_dirs: the list of directories in which to
1678
      search (if not given defaults to
1679
      L{constants.OS_SEARCH_PATH})
1680
  @rtype: list of L{objects.OS}
1681
  @return: a list of tuples (name, path, status, diagnose, variants)
1682
      for all (potential) OSes under all search paths, where:
1683
          - name is the (potential) OS name
1684
          - path is the full path to the OS
1685
          - status True/False is the validity of the OS
1686
          - diagnose is the error message for an invalid OS, otherwise empty
1687
          - variants is a list of supported OS variants, if any
1688

1689
  """
1690
  if top_dirs is None:
1691
    top_dirs = constants.OS_SEARCH_PATH
1692

    
1693
  result = []
1694
  for dir_name in top_dirs:
1695
    if os.path.isdir(dir_name):
1696
      try:
1697
        f_names = utils.ListVisibleFiles(dir_name)
1698
      except EnvironmentError, err:
1699
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1700
        break
1701
      for name in f_names:
1702
        os_path = os.path.sep.join([dir_name, name])
1703
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1704
        if status:
1705
          diagnose = ""
1706
          variants = os_inst.supported_variants
1707
        else:
1708
          diagnose = os_inst
1709
          variants = []
1710
        result.append((name, os_path, status, diagnose, variants))
1711

    
1712
  return result
1713

    
1714

    
1715
def _TryOSFromDisk(name, base_dir=None):
1716
  """Create an OS instance from disk.
1717

1718
  This function will return an OS instance if the given name is a
1719
  valid OS name.
1720

1721
  @type base_dir: string
1722
  @keyword base_dir: Base directory containing OS installations.
1723
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1724
  @rtype: tuple
1725
  @return: success and either the OS instance if we find a valid one,
1726
      or error message
1727

1728
  """
1729
  if base_dir is None:
1730
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1731
    if os_dir is None:
1732
      return False, "Directory for OS %s not found in search path" % name
1733
  else:
1734
    os_dir = os.path.sep.join([base_dir, name])
1735

    
1736
  status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1737
  if not status:
1738
    # push the error up
1739
    return status, api_versions
1740

    
1741
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1742
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1743
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1744

    
1745
  # OS Files dictionary, we will populate it with the absolute path names
1746
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1747

    
1748
  if max(api_versions) >= constants.OS_API_V15:
1749
    os_files[constants.OS_VARIANTS_FILE] = ''
1750

    
1751
  for filename in os_files:
1752
    os_files[filename] = os.path.sep.join([os_dir, filename])
1753

    
1754
    try:
1755
      st = os.stat(os_files[filename])
1756
    except EnvironmentError, err:
1757
      return False, ("File '%s' under path '%s' is missing (%s)" %
1758
                     (filename, os_dir, _ErrnoOrStr(err)))
1759

    
1760
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1761
      return False, ("File '%s' under path '%s' is not a regular file" %
1762
                     (filename, os_dir))
1763

    
1764
    if filename in constants.OS_SCRIPTS:
1765
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1766
        return False, ("File '%s' under path '%s' is not executable" %
1767
                       (filename, os_dir))
1768

    
1769
  variants = None
1770
  if constants.OS_VARIANTS_FILE in os_files:
1771
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1772
    try:
1773
      variants = utils.ReadFile(variants_file).splitlines()
1774
    except EnvironmentError, err:
1775
      return False, ("Error while reading the OS variants file at %s: %s" %
1776
                     (variants_file, _ErrnoOrStr(err)))
1777
    if not variants:
1778
      return False, ("No supported os variant found")
1779

    
1780
  os_obj = objects.OS(name=name, path=os_dir,
1781
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1782
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1783
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1784
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1785
                      supported_variants=variants,
1786
                      api_versions=api_versions)
1787
  return True, os_obj
1788

    
1789

    
1790
def OSFromDisk(name, base_dir=None):
1791
  """Create an OS instance from disk.
1792

1793
  This function will return an OS instance if the given name is a
1794
  valid OS name. Otherwise, it will raise an appropriate
1795
  L{RPCFail} exception, detailing why this is not a valid OS.
1796

1797
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1798
  an exception but returns true/false status data.
1799

1800
  @type base_dir: string
1801
  @keyword base_dir: Base directory containing OS installations.
1802
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1803
  @rtype: L{objects.OS}
1804
  @return: the OS instance if we find a valid one
1805
  @raise RPCFail: if we don't find a valid OS
1806

1807
  """
1808
  name_only = name.split("+", 1)[0]
1809
  status, payload = _TryOSFromDisk(name_only, base_dir)
1810

    
1811
  if not status:
1812
    _Fail(payload)
1813

    
1814
  return payload
1815

    
1816

    
1817
def OSEnvironment(instance, inst_os, debug=0):
1818
  """Calculate the environment for an os script.
1819

1820
  @type instance: L{objects.Instance}
1821
  @param instance: target instance for the os script run
1822
  @type inst_os: L{objects.OS}
1823
  @param inst_os: operating system for which the environment is being built
1824
  @type debug: integer
1825
  @param debug: debug level (0 or 1, for OS Api 10)
1826
  @rtype: dict
1827
  @return: dict of environment variables
1828
  @raise errors.BlockDeviceError: if the block device
1829
      cannot be found
1830

1831
  """
1832
  result = {}
1833
  api_version = \
1834
    max(constants.OS_API_VERSIONS.intersection(inst_os.api_versions))
1835
  result['OS_API_VERSION'] = '%d' % api_version
1836
  result['INSTANCE_NAME'] = instance.name
1837
  result['INSTANCE_OS'] = instance.os
1838
  result['HYPERVISOR'] = instance.hypervisor
1839
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1840
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1841
  result['DEBUG_LEVEL'] = '%d' % debug
1842
  if api_version >= constants.OS_API_V15:
1843
    try:
1844
      variant = instance.os.split('+', 1)[1]
1845
    except IndexError:
1846
      variant = inst_os.supported_variants[0]
1847
    result['OS_VARIANT'] = variant
1848
  for idx, disk in enumerate(instance.disks):
1849
    real_disk = _RecursiveFindBD(disk)
1850
    if real_disk is None:
1851
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1852
                                    str(disk))
1853
    real_disk.Open()
1854
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1855
    result['DISK_%d_ACCESS' % idx] = disk.mode
1856
    if constants.HV_DISK_TYPE in instance.hvparams:
1857
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1858
        instance.hvparams[constants.HV_DISK_TYPE]
1859
    if disk.dev_type in constants.LDS_BLOCK:
1860
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1861
    elif disk.dev_type == constants.LD_FILE:
1862
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1863
        'file:%s' % disk.physical_id[0]
1864
  for idx, nic in enumerate(instance.nics):
1865
    result['NIC_%d_MAC' % idx] = nic.mac
1866
    if nic.ip:
1867
      result['NIC_%d_IP' % idx] = nic.ip
1868
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1869
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1870
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1871
    if nic.nicparams[constants.NIC_LINK]:
1872
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1873
    if constants.HV_NIC_TYPE in instance.hvparams:
1874
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1875
        instance.hvparams[constants.HV_NIC_TYPE]
1876

    
1877
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1878
    for key, value in source.items():
1879
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1880

    
1881
  return result
1882

    
1883
def BlockdevGrow(disk, amount):
1884
  """Grow a stack of block devices.
1885

1886
  This function is called recursively, with the childrens being the
1887
  first ones to resize.
1888

1889
  @type disk: L{objects.Disk}
1890
  @param disk: the disk to be grown
1891
  @rtype: (status, result)
1892
  @return: a tuple with the status of the operation
1893
      (True/False), and the errors message if status
1894
      is False
1895

1896
  """
1897
  r_dev = _RecursiveFindBD(disk)
1898
  if r_dev is None:
1899
    _Fail("Cannot find block device %s", disk)
1900

    
1901
  try:
1902
    r_dev.Grow(amount)
1903
  except errors.BlockDeviceError, err:
1904
    _Fail("Failed to grow block device: %s", err, exc=True)
1905

    
1906

    
1907
def BlockdevSnapshot(disk):
1908
  """Create a snapshot copy of a block device.
1909

1910
  This function is called recursively, and the snapshot is actually created
1911
  just for the leaf lvm backend device.
1912

1913
  @type disk: L{objects.Disk}
1914
  @param disk: the disk to be snapshotted
1915
  @rtype: string
1916
  @return: snapshot disk path
1917

1918
  """
1919
  if disk.children:
1920
    if len(disk.children) == 1:
1921
      # only one child, let's recurse on it
1922
      return BlockdevSnapshot(disk.children[0])
1923
    else:
1924
      # more than one child, choose one that matches
1925
      for child in disk.children:
1926
        if child.size == disk.size:
1927
          # return implies breaking the loop
1928
          return BlockdevSnapshot(child)
1929
  elif disk.dev_type == constants.LD_LV:
1930
    r_dev = _RecursiveFindBD(disk)
1931
    if r_dev is not None:
1932
      # let's stay on the safe side and ask for the full size, for now
1933
      return r_dev.Snapshot(disk.size)
1934
    else:
1935
      _Fail("Cannot find block device %s", disk)
1936
  else:
1937
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1938
          disk.unique_id, disk.dev_type)
1939

    
1940

    
1941
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1942
  """Export a block device snapshot to a remote node.
1943

1944
  @type disk: L{objects.Disk}
1945
  @param disk: the description of the disk to export
1946
  @type dest_node: str
1947
  @param dest_node: the destination node to export to
1948
  @type instance: L{objects.Instance}
1949
  @param instance: the instance object to whom the disk belongs
1950
  @type cluster_name: str
1951
  @param cluster_name: the cluster name, needed for SSH hostalias
1952
  @type idx: int
1953
  @param idx: the index of the disk in the instance's disk list,
1954
      used to export to the OS scripts environment
1955
  @rtype: None
1956

1957
  """
1958
  inst_os = OSFromDisk(instance.os)
1959
  export_env = OSEnvironment(instance, inst_os)
1960

    
1961
  export_script = inst_os.export_script
1962

    
1963
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1964
                                     instance.name, int(time.time()))
1965
  if not os.path.exists(constants.LOG_OS_DIR):
1966
    os.mkdir(constants.LOG_OS_DIR, 0750)
1967
  real_disk = _RecursiveFindBD(disk)
1968
  if real_disk is None:
1969
    _Fail("Block device '%s' is not set up", disk)
1970

    
1971
  real_disk.Open()
1972

    
1973
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1974
  export_env['EXPORT_INDEX'] = str(idx)
1975

    
1976
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1977
  destfile = disk.physical_id[1]
1978

    
1979
  # the target command is built out of three individual commands,
1980
  # which are joined by pipes; we check each individual command for
1981
  # valid parameters
1982
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1983
                               inst_os.path, export_script, logfile)
1984

    
1985
  comprcmd = "gzip"
1986

    
1987
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1988
                                destdir, destdir, destfile)
1989
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1990
                                                   constants.GANETI_RUNAS,
1991
                                                   destcmd)
1992

    
1993
  # all commands have been checked, so we're safe to combine them
1994
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1995

    
1996
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1997

    
1998
  if result.failed:
1999
    _Fail("OS snapshot export command '%s' returned error: %s"
2000
          " output: %s", command, result.fail_reason, result.output)
2001

    
2002

    
2003
def FinalizeExport(instance, snap_disks):
2004
  """Write out the export configuration information.
2005

2006
  @type instance: L{objects.Instance}
2007
  @param instance: the instance which we export, used for
2008
      saving configuration
2009
  @type snap_disks: list of L{objects.Disk}
2010
  @param snap_disks: list of snapshot block devices, which
2011
      will be used to get the actual name of the dump file
2012

2013
  @rtype: None
2014

2015
  """
2016
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
2017
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
2018

    
2019
  config = objects.SerializableConfigParser()
2020

    
2021
  config.add_section(constants.INISECT_EXP)
2022
  config.set(constants.INISECT_EXP, 'version', '0')
2023
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2024
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2025
  config.set(constants.INISECT_EXP, 'os', instance.os)
2026
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2027

    
2028
  config.add_section(constants.INISECT_INS)
2029
  config.set(constants.INISECT_INS, 'name', instance.name)
2030
  config.set(constants.INISECT_INS, 'memory', '%d' %
2031
             instance.beparams[constants.BE_MEMORY])
2032
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2033
             instance.beparams[constants.BE_VCPUS])
2034
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2035

    
2036
  nic_total = 0
2037
  for nic_count, nic in enumerate(instance.nics):
2038
    nic_total += 1
2039
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2040
               nic_count, '%s' % nic.mac)
2041
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2042
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2043
               '%s' % nic.bridge)
2044
  # TODO: redundant: on load can read nics until it doesn't exist
2045
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2046

    
2047
  disk_total = 0
2048
  for disk_count, disk in enumerate(snap_disks):
2049
    if disk:
2050
      disk_total += 1
2051
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2052
                 ('%s' % disk.iv_name))
2053
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2054
                 ('%s' % disk.physical_id[1]))
2055
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2056
                 ('%d' % disk.size))
2057

    
2058
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2059

    
2060
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2061
                  data=config.Dumps())
2062
  shutil.rmtree(finaldestdir, True)
2063
  shutil.move(destdir, finaldestdir)
2064

    
2065

    
2066
def ExportInfo(dest):
2067
  """Get export configuration information.
2068

2069
  @type dest: str
2070
  @param dest: directory containing the export
2071

2072
  @rtype: L{objects.SerializableConfigParser}
2073
  @return: a serializable config file containing the
2074
      export info
2075

2076
  """
2077
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2078

    
2079
  config = objects.SerializableConfigParser()
2080
  config.read(cff)
2081

    
2082
  if (not config.has_section(constants.INISECT_EXP) or
2083
      not config.has_section(constants.INISECT_INS)):
2084
    _Fail("Export info file doesn't have the required fields")
2085

    
2086
  return config.Dumps()
2087

    
2088

    
2089
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2090
  """Import an os image into an instance.
2091

2092
  @type instance: L{objects.Instance}
2093
  @param instance: instance to import the disks into
2094
  @type src_node: string
2095
  @param src_node: source node for the disk images
2096
  @type src_images: list of string
2097
  @param src_images: absolute paths of the disk images
2098
  @rtype: list of boolean
2099
  @return: each boolean represent the success of importing the n-th disk
2100

2101
  """
2102
  inst_os = OSFromDisk(instance.os)
2103
  import_env = OSEnvironment(instance, inst_os)
2104
  import_script = inst_os.import_script
2105

    
2106
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2107
                                        instance.name, int(time.time()))
2108
  if not os.path.exists(constants.LOG_OS_DIR):
2109
    os.mkdir(constants.LOG_OS_DIR, 0750)
2110

    
2111
  comprcmd = "gunzip"
2112
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2113
                               import_script, logfile)
2114

    
2115
  final_result = []
2116
  for idx, image in enumerate(src_images):
2117
    if image:
2118
      destcmd = utils.BuildShellCmd('cat %s', image)
2119
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2120
                                                       constants.GANETI_RUNAS,
2121
                                                       destcmd)
2122
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2123
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2124
      import_env['IMPORT_INDEX'] = str(idx)
2125
      result = utils.RunCmd(command, env=import_env)
2126
      if result.failed:
2127
        logging.error("Disk import command '%s' returned error: %s"
2128
                      " output: %s", command, result.fail_reason,
2129
                      result.output)
2130
        final_result.append("error importing disk %d: %s, %s" %
2131
                            (idx, result.fail_reason, result.output[-100]))
2132

    
2133
  if final_result:
2134
    _Fail("; ".join(final_result), log=False)
2135

    
2136

    
2137
def ListExports():
2138
  """Return a list of exports currently available on this machine.
2139

2140
  @rtype: list
2141
  @return: list of the exports
2142

2143
  """
2144
  if os.path.isdir(constants.EXPORT_DIR):
2145
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2146
  else:
2147
    _Fail("No exports directory")
2148

    
2149

    
2150
def RemoveExport(export):
2151
  """Remove an existing export from the node.
2152

2153
  @type export: str
2154
  @param export: the name of the export to remove
2155
  @rtype: None
2156

2157
  """
2158
  target = os.path.join(constants.EXPORT_DIR, export)
2159

    
2160
  try:
2161
    shutil.rmtree(target)
2162
  except EnvironmentError, err:
2163
    _Fail("Error while removing the export: %s", err, exc=True)
2164

    
2165

    
2166
def BlockdevRename(devlist):
2167
  """Rename a list of block devices.
2168

2169
  @type devlist: list of tuples
2170
  @param devlist: list of tuples of the form  (disk,
2171
      new_logical_id, new_physical_id); disk is an
2172
      L{objects.Disk} object describing the current disk,
2173
      and new logical_id/physical_id is the name we
2174
      rename it to
2175
  @rtype: boolean
2176
  @return: True if all renames succeeded, False otherwise
2177

2178
  """
2179
  msgs = []
2180
  result = True
2181
  for disk, unique_id in devlist:
2182
    dev = _RecursiveFindBD(disk)
2183
    if dev is None:
2184
      msgs.append("Can't find device %s in rename" % str(disk))
2185
      result = False
2186
      continue
2187
    try:
2188
      old_rpath = dev.dev_path
2189
      dev.Rename(unique_id)
2190
      new_rpath = dev.dev_path
2191
      if old_rpath != new_rpath:
2192
        DevCacheManager.RemoveCache(old_rpath)
2193
        # FIXME: we should add the new cache information here, like:
2194
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2195
        # but we don't have the owner here - maybe parse from existing
2196
        # cache? for now, we only lose lvm data when we rename, which
2197
        # is less critical than DRBD or MD
2198
    except errors.BlockDeviceError, err:
2199
      msgs.append("Can't rename device '%s' to '%s': %s" %
2200
                  (dev, unique_id, err))
2201
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2202
      result = False
2203
  if not result:
2204
    _Fail("; ".join(msgs))
2205

    
2206

    
2207
def _TransformFileStorageDir(file_storage_dir):
2208
  """Checks whether given file_storage_dir is valid.
2209

2210
  Checks wheter the given file_storage_dir is within the cluster-wide
2211
  default file_storage_dir stored in SimpleStore. Only paths under that
2212
  directory are allowed.
2213

2214
  @type file_storage_dir: str
2215
  @param file_storage_dir: the path to check
2216

2217
  @return: the normalized path if valid, None otherwise
2218

2219
  """
2220
  cfg = _GetConfig()
2221
  file_storage_dir = os.path.normpath(file_storage_dir)
2222
  base_file_storage_dir = cfg.GetFileStorageDir()
2223
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2224
      base_file_storage_dir):
2225
    _Fail("File storage directory '%s' is not under base file"
2226
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2227
  return file_storage_dir
2228

    
2229

    
2230
def CreateFileStorageDir(file_storage_dir):
2231
  """Create file storage directory.
2232

2233
  @type file_storage_dir: str
2234
  @param file_storage_dir: directory to create
2235

2236
  @rtype: tuple
2237
  @return: tuple with first element a boolean indicating wheter dir
2238
      creation was successful or not
2239

2240
  """
2241
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2242
  if os.path.exists(file_storage_dir):
2243
    if not os.path.isdir(file_storage_dir):
2244
      _Fail("Specified storage dir '%s' is not a directory",
2245
            file_storage_dir)
2246
  else:
2247
    try:
2248
      os.makedirs(file_storage_dir, 0750)
2249
    except OSError, err:
2250
      _Fail("Cannot create file storage directory '%s': %s",
2251
            file_storage_dir, err, exc=True)
2252

    
2253

    
2254
def RemoveFileStorageDir(file_storage_dir):
2255
  """Remove file storage directory.
2256

2257
  Remove it only if it's empty. If not log an error and return.
2258

2259
  @type file_storage_dir: str
2260
  @param file_storage_dir: the directory we should cleanup
2261
  @rtype: tuple (success,)
2262
  @return: tuple of one element, C{success}, denoting
2263
      whether the operation was successful
2264

2265
  """
2266
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2267
  if os.path.exists(file_storage_dir):
2268
    if not os.path.isdir(file_storage_dir):
2269
      _Fail("Specified Storage directory '%s' is not a directory",
2270
            file_storage_dir)
2271
    # deletes dir only if empty, otherwise we want to fail the rpc call
2272
    try:
2273
      os.rmdir(file_storage_dir)
2274
    except OSError, err:
2275
      _Fail("Cannot remove file storage directory '%s': %s",
2276
            file_storage_dir, err)
2277

    
2278

    
2279
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2280
  """Rename the file storage directory.
2281

2282
  @type old_file_storage_dir: str
2283
  @param old_file_storage_dir: the current path
2284
  @type new_file_storage_dir: str
2285
  @param new_file_storage_dir: the name we should rename to
2286
  @rtype: tuple (success,)
2287
  @return: tuple of one element, C{success}, denoting
2288
      whether the operation was successful
2289

2290
  """
2291
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2292
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2293
  if not os.path.exists(new_file_storage_dir):
2294
    if os.path.isdir(old_file_storage_dir):
2295
      try:
2296
        os.rename(old_file_storage_dir, new_file_storage_dir)
2297
      except OSError, err:
2298
        _Fail("Cannot rename '%s' to '%s': %s",
2299
              old_file_storage_dir, new_file_storage_dir, err)
2300
    else:
2301
      _Fail("Specified storage dir '%s' is not a directory",
2302
            old_file_storage_dir)
2303
  else:
2304
    if os.path.exists(old_file_storage_dir):
2305
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2306
            old_file_storage_dir, new_file_storage_dir)
2307

    
2308

    
2309
def _EnsureJobQueueFile(file_name):
2310
  """Checks whether the given filename is in the queue directory.
2311

2312
  @type file_name: str
2313
  @param file_name: the file name we should check
2314
  @rtype: None
2315
  @raises RPCFail: if the file is not valid
2316

2317
  """
2318
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2319
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2320

    
2321
  if not result:
2322
    _Fail("Passed job queue file '%s' does not belong to"
2323
          " the queue directory '%s'", file_name, queue_dir)
2324

    
2325

    
2326
def JobQueueUpdate(file_name, content):
2327
  """Updates a file in the queue directory.
2328

2329
  This is just a wrapper over L{utils.WriteFile}, with proper
2330
  checking.
2331

2332
  @type file_name: str
2333
  @param file_name: the job file name
2334
  @type content: str
2335
  @param content: the new job contents
2336
  @rtype: boolean
2337
  @return: the success of the operation
2338

2339
  """
2340
  _EnsureJobQueueFile(file_name)
2341

    
2342
  # Write and replace the file atomically
2343
  utils.WriteFile(file_name, data=_Decompress(content))
2344

    
2345

    
2346
def JobQueueRename(old, new):
2347
  """Renames a job queue file.
2348

2349
  This is just a wrapper over os.rename with proper checking.
2350

2351
  @type old: str
2352
  @param old: the old (actual) file name
2353
  @type new: str
2354
  @param new: the desired file name
2355
  @rtype: tuple
2356
  @return: the success of the operation and payload
2357

2358
  """
2359
  _EnsureJobQueueFile(old)
2360
  _EnsureJobQueueFile(new)
2361

    
2362
  utils.RenameFile(old, new, mkdir=True)
2363

    
2364

    
2365
def JobQueueSetDrainFlag(drain_flag):
2366
  """Set the drain flag for the queue.
2367

2368
  This will set or unset the queue drain flag.
2369

2370
  @type drain_flag: boolean
2371
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2372
  @rtype: truple
2373
  @return: always True, None
2374
  @warning: the function always returns True
2375

2376
  """
2377
  if drain_flag:
2378
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2379
  else:
2380
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2381

    
2382

    
2383
def BlockdevClose(instance_name, disks):
2384
  """Closes the given block devices.
2385

2386
  This means they will be switched to secondary mode (in case of
2387
  DRBD).
2388

2389
  @param instance_name: if the argument is not empty, the symlinks
2390
      of this instance will be removed
2391
  @type disks: list of L{objects.Disk}
2392
  @param disks: the list of disks to be closed
2393
  @rtype: tuple (success, message)
2394
  @return: a tuple of success and message, where success
2395
      indicates the succes of the operation, and message
2396
      which will contain the error details in case we
2397
      failed
2398

2399
  """
2400
  bdevs = []
2401
  for cf in disks:
2402
    rd = _RecursiveFindBD(cf)
2403
    if rd is None:
2404
      _Fail("Can't find device %s", cf)
2405
    bdevs.append(rd)
2406

    
2407
  msg = []
2408
  for rd in bdevs:
2409
    try:
2410
      rd.Close()
2411
    except errors.BlockDeviceError, err:
2412
      msg.append(str(err))
2413
  if msg:
2414
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2415
  else:
2416
    if instance_name:
2417
      _RemoveBlockDevLinks(instance_name, disks)
2418

    
2419

    
2420
def ValidateHVParams(hvname, hvparams):
2421
  """Validates the given hypervisor parameters.
2422

2423
  @type hvname: string
2424
  @param hvname: the hypervisor name
2425
  @type hvparams: dict
2426
  @param hvparams: the hypervisor parameters to be validated
2427
  @rtype: None
2428

2429
  """
2430
  try:
2431
    hv_type = hypervisor.GetHypervisor(hvname)
2432
    hv_type.ValidateParameters(hvparams)
2433
  except errors.HypervisorError, err:
2434
    _Fail(str(err), log=False)
2435

    
2436

    
2437
def DemoteFromMC():
2438
  """Demotes the current node from master candidate role.
2439

2440
  """
2441
  # try to ensure we're not the master by mistake
2442
  master, myself = ssconf.GetMasterAndMyself()
2443
  if master == myself:
2444
    _Fail("ssconf status shows I'm the master node, will not demote")
2445

    
2446
  result = utils.RunCmd([constants.DAEMON_UTIL, "check", constants.MASTERD])
2447
  if not result.failed:
2448
    _Fail("The master daemon is running, will not demote")
2449

    
2450
  try:
2451
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2452
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2453
  except EnvironmentError, err:
2454
    if err.errno != errno.ENOENT:
2455
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2456

    
2457
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2458

    
2459

    
2460
def _FindDisks(nodes_ip, disks):
2461
  """Sets the physical ID on disks and returns the block devices.
2462

2463
  """
2464
  # set the correct physical ID
2465
  my_name = utils.HostInfo().name
2466
  for cf in disks:
2467
    cf.SetPhysicalID(my_name, nodes_ip)
2468

    
2469
  bdevs = []
2470

    
2471
  for cf in disks:
2472
    rd = _RecursiveFindBD(cf)
2473
    if rd is None:
2474
      _Fail("Can't find device %s", cf)
2475
    bdevs.append(rd)
2476
  return bdevs
2477

    
2478

    
2479
def DrbdDisconnectNet(nodes_ip, disks):
2480
  """Disconnects the network on a list of drbd devices.
2481

2482
  """
2483
  bdevs = _FindDisks(nodes_ip, disks)
2484

    
2485
  # disconnect disks
2486
  for rd in bdevs:
2487
    try:
2488
      rd.DisconnectNet()
2489
    except errors.BlockDeviceError, err:
2490
      _Fail("Can't change network configuration to standalone mode: %s",
2491
            err, exc=True)
2492

    
2493

    
2494
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2495
  """Attaches the network on a list of drbd devices.
2496

2497
  """
2498
  bdevs = _FindDisks(nodes_ip, disks)
2499

    
2500
  if multimaster:
2501
    for idx, rd in enumerate(bdevs):
2502
      try:
2503
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2504
      except EnvironmentError, err:
2505
        _Fail("Can't create symlink: %s", err)
2506
  # reconnect disks, switch to new master configuration and if
2507
  # needed primary mode
2508
  for rd in bdevs:
2509
    try:
2510
      rd.AttachNet(multimaster)
2511
    except errors.BlockDeviceError, err:
2512
      _Fail("Can't change network configuration: %s", err)
2513

    
2514
  # wait until the disks are connected; we need to retry the re-attach
2515
  # if the device becomes standalone, as this might happen if the one
2516
  # node disconnects and reconnects in a different mode before the
2517
  # other node reconnects; in this case, one or both of the nodes will
2518
  # decide it has wrong configuration and switch to standalone
2519

    
2520
  def _Attach():
2521
    all_connected = True
2522

    
2523
    for rd in bdevs:
2524
      stats = rd.GetProcStatus()
2525

    
2526
      all_connected = (all_connected and
2527
                       (stats.is_connected or stats.is_in_resync))
2528

    
2529
      if stats.is_standalone:
2530
        # peer had different config info and this node became
2531
        # standalone, even though this should not happen with the
2532
        # new staged way of changing disk configs
2533
        try:
2534
          rd.AttachNet(multimaster)
2535
        except errors.BlockDeviceError, err:
2536
          _Fail("Can't change network configuration: %s", err)
2537

    
2538
    if not all_connected:
2539
      raise utils.RetryAgain()
2540

    
2541
  try:
2542
    # Start with a delay of 100 miliseconds and go up to 5 seconds
2543
    utils.Retry(_Attach, (0.1, 1.5, 5.0), 2 * 60)
2544
  except utils.RetryTimeout:
2545
    _Fail("Timeout in disk reconnecting")
2546

    
2547
  if multimaster:
2548
    # change to primary mode
2549
    for rd in bdevs:
2550
      try:
2551
        rd.Open()
2552
      except errors.BlockDeviceError, err:
2553
        _Fail("Can't change to primary mode: %s", err)
2554

    
2555

    
2556
def DrbdWaitSync(nodes_ip, disks):
2557
  """Wait until DRBDs have synchronized.
2558

2559
  """
2560
  def _helper(rd):
2561
    stats = rd.GetProcStatus()
2562
    if not (stats.is_connected or stats.is_in_resync):
2563
      raise utils.RetryAgain()
2564
    return stats
2565

    
2566
  bdevs = _FindDisks(nodes_ip, disks)
2567

    
2568
  min_resync = 100
2569
  alldone = True
2570
  for rd in bdevs:
2571
    try:
2572
      # poll each second for 15 seconds
2573
      stats = utils.Retry(_helper, 1, 15, args=[rd])
2574
    except utils.RetryTimeout:
2575
      stats = rd.GetProcStatus()
2576
      # last check
2577
      if not (stats.is_connected or stats.is_in_resync):
2578
        _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2579
    alldone = alldone and (not stats.is_in_resync)
2580
    if stats.sync_percent is not None:
2581
      min_resync = min(min_resync, stats.sync_percent)
2582

    
2583
  return (alldone, min_resync)
2584

    
2585

    
2586
def PowercycleNode(hypervisor_type):
2587
  """Hard-powercycle the node.
2588

2589
  Because we need to return first, and schedule the powercycle in the
2590
  background, we won't be able to report failures nicely.
2591

2592
  """
2593
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2594
  try:
2595
    pid = os.fork()
2596
  except OSError:
2597
    # if we can't fork, we'll pretend that we're in the child process
2598
    pid = 0
2599
  if pid > 0:
2600
    return "Reboot scheduled in 5 seconds"
2601
  time.sleep(5)
2602
  hyper.PowercycleNode()
2603

    
2604

    
2605
class HooksRunner(object):
2606
  """Hook runner.
2607

2608
  This class is instantiated on the node side (ganeti-noded) and not
2609
  on the master side.
2610

2611
  """
2612
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2613

    
2614
  def __init__(self, hooks_base_dir=None):
2615
    """Constructor for hooks runner.
2616

2617
    @type hooks_base_dir: str or None
2618
    @param hooks_base_dir: if not None, this overrides the
2619
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2620

2621
    """
2622
    if hooks_base_dir is None:
2623
      hooks_base_dir = constants.HOOKS_BASE_DIR
2624
    self._BASE_DIR = hooks_base_dir
2625

    
2626
  @staticmethod
2627
  def ExecHook(script, env):
2628
    """Exec one hook script.
2629

2630
    @type script: str
2631
    @param script: the full path to the script
2632
    @type env: dict
2633
    @param env: the environment with which to exec the script
2634
    @rtype: tuple (success, message)
2635
    @return: a tuple of success and message, where success
2636
        indicates the succes of the operation, and message
2637
        which will contain the error details in case we
2638
        failed
2639

2640
    """
2641
    # exec the process using subprocess and log the output
2642
    fdstdin = None
2643
    try:
2644
      fdstdin = open("/dev/null", "r")
2645
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2646
                               stderr=subprocess.STDOUT, close_fds=True,
2647
                               shell=False, cwd="/", env=env)
2648
      output = ""
2649
      try:
2650
        output = child.stdout.read(4096)
2651
        child.stdout.close()
2652
      except EnvironmentError, err:
2653
        output += "Hook script error: %s" % str(err)
2654

    
2655
      while True:
2656
        try:
2657
          result = child.wait()
2658
          break
2659
        except EnvironmentError, err:
2660
          if err.errno == errno.EINTR:
2661
            continue
2662
          raise
2663
    finally:
2664
      # try not to leak fds
2665
      for fd in (fdstdin, ):
2666
        if fd is not None:
2667
          try:
2668
            fd.close()
2669
          except EnvironmentError, err:
2670
            # just log the error
2671
            #logging.exception("Error while closing fd %s", fd)
2672
            pass
2673

    
2674
    return result == 0, utils.SafeEncode(output.strip())
2675

    
2676
  def RunHooks(self, hpath, phase, env):
2677
    """Run the scripts in the hooks directory.
2678

2679
    @type hpath: str
2680
    @param hpath: the path to the hooks directory which
2681
        holds the scripts
2682
    @type phase: str
2683
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2684
        L{constants.HOOKS_PHASE_POST}
2685
    @type env: dict
2686
    @param env: dictionary with the environment for the hook
2687
    @rtype: list
2688
    @return: list of 3-element tuples:
2689
      - script path
2690
      - script result, either L{constants.HKR_SUCCESS} or
2691
        L{constants.HKR_FAIL}
2692
      - output of the script
2693

2694
    @raise errors.ProgrammerError: for invalid input
2695
        parameters
2696

2697
    """
2698
    if phase == constants.HOOKS_PHASE_PRE:
2699
      suffix = "pre"
2700
    elif phase == constants.HOOKS_PHASE_POST:
2701
      suffix = "post"
2702
    else:
2703
      _Fail("Unknown hooks phase '%s'", phase)
2704

    
2705
    rr = []
2706

    
2707
    subdir = "%s-%s.d" % (hpath, suffix)
2708
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2709
    try:
2710
      dir_contents = utils.ListVisibleFiles(dir_name)
2711
    except OSError:
2712
      # FIXME: must log output in case of failures
2713
      return rr
2714

    
2715
    # we use the standard python sort order,
2716
    # so 00name is the recommended naming scheme
2717
    dir_contents.sort()
2718
    for relname in dir_contents:
2719
      fname = os.path.join(dir_name, relname)
2720
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2721
          self.RE_MASK.match(relname) is not None):
2722
        rrval = constants.HKR_SKIP
2723
        output = ""
2724
      else:
2725
        result, output = self.ExecHook(fname, env)
2726
        if not result:
2727
          rrval = constants.HKR_FAIL
2728
        else:
2729
          rrval = constants.HKR_SUCCESS
2730
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2731

    
2732
    return rr
2733

    
2734

    
2735
class IAllocatorRunner(object):
2736
  """IAllocator runner.
2737

2738
  This class is instantiated on the node side (ganeti-noded) and not on
2739
  the master side.
2740

2741
  """
2742
  def Run(self, name, idata):
2743
    """Run an iallocator script.
2744

2745
    @type name: str
2746
    @param name: the iallocator script name
2747
    @type idata: str
2748
    @param idata: the allocator input data
2749

2750
    @rtype: tuple
2751
    @return: two element tuple of:
2752
       - status
2753
       - either error message or stdout of allocator (for success)
2754

2755
    """
2756
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2757
                                  os.path.isfile)
2758
    if alloc_script is None:
2759
      _Fail("iallocator module '%s' not found in the search path", name)
2760

    
2761
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2762
    try:
2763
      os.write(fd, idata)
2764
      os.close(fd)
2765
      result = utils.RunCmd([alloc_script, fin_name])
2766
      if result.failed:
2767
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2768
              name, result.fail_reason, result.output)
2769
    finally:
2770
      os.unlink(fin_name)
2771

    
2772
    return result.stdout
2773

    
2774

    
2775
class DevCacheManager(object):
2776
  """Simple class for managing a cache of block device information.
2777

2778
  """
2779
  _DEV_PREFIX = "/dev/"
2780
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2781

    
2782
  @classmethod
2783
  def _ConvertPath(cls, dev_path):
2784
    """Converts a /dev/name path to the cache file name.
2785

2786
    This replaces slashes with underscores and strips the /dev
2787
    prefix. It then returns the full path to the cache file.
2788

2789
    @type dev_path: str
2790
    @param dev_path: the C{/dev/} path name
2791
    @rtype: str
2792
    @return: the converted path name
2793

2794
    """
2795
    if dev_path.startswith(cls._DEV_PREFIX):
2796
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2797
    dev_path = dev_path.replace("/", "_")
2798
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2799
    return fpath
2800

    
2801
  @classmethod
2802
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2803
    """Updates the cache information for a given device.
2804

2805
    @type dev_path: str
2806
    @param dev_path: the pathname of the device
2807
    @type owner: str
2808
    @param owner: the owner (instance name) of the device
2809
    @type on_primary: bool
2810
    @param on_primary: whether this is the primary
2811
        node nor not
2812
    @type iv_name: str
2813
    @param iv_name: the instance-visible name of the
2814
        device, as in objects.Disk.iv_name
2815

2816
    @rtype: None
2817

2818
    """
2819
    if dev_path is None:
2820
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2821
      return
2822
    fpath = cls._ConvertPath(dev_path)
2823
    if on_primary:
2824
      state = "primary"
2825
    else:
2826
      state = "secondary"
2827
    if iv_name is None:
2828
      iv_name = "not_visible"
2829
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2830
    try:
2831
      utils.WriteFile(fpath, data=fdata)
2832
    except EnvironmentError, err:
2833
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2834

    
2835
  @classmethod
2836
  def RemoveCache(cls, dev_path):
2837
    """Remove data for a dev_path.
2838

2839
    This is just a wrapper over L{utils.RemoveFile} with a converted
2840
    path name and logging.
2841

2842
    @type dev_path: str
2843
    @param dev_path: the pathname of the device
2844

2845
    @rtype: None
2846

2847
    """
2848
    if dev_path is None:
2849
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2850
      return
2851
    fpath = cls._ConvertPath(dev_path)
2852
    try:
2853
      utils.RemoveFile(fpath)
2854
    except EnvironmentError, err:
2855
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)