Statistics
| Branch: | Tag: | Revision:

root / lib / backend.py @ 3782acd7

History | View | Annotate | Download (85.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Functions used by the node daemon
23

24
@var _ALLOWED_UPLOAD_FILES: denotes which files are accepted in
25
     the L{UploadFile} function
26

27
"""
28

    
29

    
30
import os
31
import os.path
32
import shutil
33
import time
34
import stat
35
import errno
36
import re
37
import subprocess
38
import random
39
import logging
40
import tempfile
41
import zlib
42
import base64
43

    
44
from ganeti import errors
45
from ganeti import utils
46
from ganeti import ssh
47
from ganeti import hypervisor
48
from ganeti import constants
49
from ganeti import bdev
50
from ganeti import objects
51
from ganeti import ssconf
52

    
53

    
54
_BOOT_ID_PATH = "/proc/sys/kernel/random/boot_id"
55

    
56

    
57
class RPCFail(Exception):
58
  """Class denoting RPC failure.
59

60
  Its argument is the error message.
61

62
  """
63

    
64

    
65
def _Fail(msg, *args, **kwargs):
66
  """Log an error and the raise an RPCFail exception.
67

68
  This exception is then handled specially in the ganeti daemon and
69
  turned into a 'failed' return type. As such, this function is a
70
  useful shortcut for logging the error and returning it to the master
71
  daemon.
72

73
  @type msg: string
74
  @param msg: the text of the exception
75
  @raise RPCFail
76

77
  """
78
  if args:
79
    msg = msg % args
80
  if "log" not in kwargs or kwargs["log"]: # if we should log this error
81
    if "exc" in kwargs and kwargs["exc"]:
82
      logging.exception(msg)
83
    else:
84
      logging.error(msg)
85
  raise RPCFail(msg)
86

    
87

    
88
def _GetConfig():
89
  """Simple wrapper to return a SimpleStore.
90

91
  @rtype: L{ssconf.SimpleStore}
92
  @return: a SimpleStore instance
93

94
  """
95
  return ssconf.SimpleStore()
96

    
97

    
98
def _GetSshRunner(cluster_name):
99
  """Simple wrapper to return an SshRunner.
100

101
  @type cluster_name: str
102
  @param cluster_name: the cluster name, which is needed
103
      by the SshRunner constructor
104
  @rtype: L{ssh.SshRunner}
105
  @return: an SshRunner instance
106

107
  """
108
  return ssh.SshRunner(cluster_name)
109

    
110

    
111
def _Decompress(data):
112
  """Unpacks data compressed by the RPC client.
113

114
  @type data: list or tuple
115
  @param data: Data sent by RPC client
116
  @rtype: str
117
  @return: Decompressed data
118

119
  """
120
  assert isinstance(data, (list, tuple))
121
  assert len(data) == 2
122
  (encoding, content) = data
123
  if encoding == constants.RPC_ENCODING_NONE:
124
    return content
125
  elif encoding == constants.RPC_ENCODING_ZLIB_BASE64:
126
    return zlib.decompress(base64.b64decode(content))
127
  else:
128
    raise AssertionError("Unknown data encoding")
129

    
130

    
131
def _CleanDirectory(path, exclude=None):
132
  """Removes all regular files in a directory.
133

134
  @type path: str
135
  @param path: the directory to clean
136
  @type exclude: list
137
  @param exclude: list of files to be excluded, defaults
138
      to the empty list
139

140
  """
141
  if not os.path.isdir(path):
142
    return
143
  if exclude is None:
144
    exclude = []
145
  else:
146
    # Normalize excluded paths
147
    exclude = [os.path.normpath(i) for i in exclude]
148

    
149
  for rel_name in utils.ListVisibleFiles(path):
150
    full_name = os.path.normpath(os.path.join(path, rel_name))
151
    if full_name in exclude:
152
      continue
153
    if os.path.isfile(full_name) and not os.path.islink(full_name):
154
      utils.RemoveFile(full_name)
155

    
156

    
157
def _BuildUploadFileList():
158
  """Build the list of allowed upload files.
159

160
  This is abstracted so that it's built only once at module import time.
161

162
  """
163
  allowed_files = set([
164
    constants.CLUSTER_CONF_FILE,
165
    constants.ETC_HOSTS,
166
    constants.SSH_KNOWN_HOSTS_FILE,
167
    constants.VNC_PASSWORD_FILE,
168
    constants.RAPI_CERT_FILE,
169
    constants.RAPI_USERS_FILE,
170
    constants.HMAC_CLUSTER_KEY,
171
    ])
172

    
173
  for hv_name in constants.HYPER_TYPES:
174
    hv_class = hypervisor.GetHypervisorClass(hv_name)
175
    allowed_files.update(hv_class.GetAncillaryFiles())
176

    
177
  return frozenset(allowed_files)
178

    
179

    
180
_ALLOWED_UPLOAD_FILES = _BuildUploadFileList()
181

    
182

    
183
def JobQueuePurge():
184
  """Removes job queue files and archived jobs.
185

186
  @rtype: tuple
187
  @return: True, None
188

189
  """
190
  _CleanDirectory(constants.QUEUE_DIR, exclude=[constants.JOB_QUEUE_LOCK_FILE])
191
  _CleanDirectory(constants.JOB_QUEUE_ARCHIVE_DIR)
192

    
193

    
194
def GetMasterInfo():
195
  """Returns master information.
196

197
  This is an utility function to compute master information, either
198
  for consumption here or from the node daemon.
199

200
  @rtype: tuple
201
  @return: master_netdev, master_ip, master_name
202
  @raise RPCFail: in case of errors
203

204
  """
205
  try:
206
    cfg = _GetConfig()
207
    master_netdev = cfg.GetMasterNetdev()
208
    master_ip = cfg.GetMasterIP()
209
    master_node = cfg.GetMasterNode()
210
  except errors.ConfigurationError, err:
211
    _Fail("Cluster configuration incomplete: %s", err, exc=True)
212
  return (master_netdev, master_ip, master_node)
213

    
214

    
215
def StartMaster(start_daemons, no_voting):
216
  """Activate local node as master node.
217

218
  The function will always try activate the IP address of the master
219
  (unless someone else has it). It will also start the master daemons,
220
  based on the start_daemons parameter.
221

222
  @type start_daemons: boolean
223
  @param start_daemons: whether to also start the master
224
      daemons (ganeti-masterd and ganeti-rapi)
225
  @type no_voting: boolean
226
  @param no_voting: whether to start ganeti-masterd without a node vote
227
      (if start_daemons is True), but still non-interactively
228
  @rtype: None
229

230
  """
231
  # GetMasterInfo will raise an exception if not able to return data
232
  master_netdev, master_ip, _ = GetMasterInfo()
233

    
234
  err_msgs = []
235
  if utils.TcpPing(master_ip, constants.DEFAULT_NODED_PORT):
236
    if utils.OwnIpAddress(master_ip):
237
      # we already have the ip:
238
      logging.debug("Master IP already configured, doing nothing")
239
    else:
240
      msg = "Someone else has the master ip, not activating"
241
      logging.error(msg)
242
      err_msgs.append(msg)
243
  else:
244
    result = utils.RunCmd(["ip", "address", "add", "%s/32" % master_ip,
245
                           "dev", master_netdev, "label",
246
                           "%s:0" % master_netdev])
247
    if result.failed:
248
      msg = "Can't activate master IP: %s" % result.output
249
      logging.error(msg)
250
      err_msgs.append(msg)
251

    
252
    result = utils.RunCmd(["arping", "-q", "-U", "-c 3", "-I", master_netdev,
253
                           "-s", master_ip, master_ip])
254
    # we'll ignore the exit code of arping
255

    
256
  # and now start the master and rapi daemons
257
  if start_daemons:
258
    daemons_params = {
259
        'ganeti-masterd': [],
260
        'ganeti-rapi': [],
261
        }
262
    if no_voting:
263
      daemons_params['ganeti-masterd'].append('--no-voting')
264
      daemons_params['ganeti-masterd'].append('--yes-do-it')
265
    for daemon in daemons_params:
266
      cmd = [daemon]
267
      cmd.extend(daemons_params[daemon])
268
      result = utils.RunCmd(cmd)
269
      if result.failed:
270
        msg = "Can't start daemon %s: %s" % (daemon, result.output)
271
        logging.error(msg)
272
        err_msgs.append(msg)
273

    
274
  if err_msgs:
275
    _Fail("; ".join(err_msgs))
276

    
277

    
278
def StopMaster(stop_daemons):
279
  """Deactivate this node as master.
280

281
  The function will always try to deactivate the IP address of the
282
  master. It will also stop the master daemons depending on the
283
  stop_daemons parameter.
284

285
  @type stop_daemons: boolean
286
  @param stop_daemons: whether to also stop the master daemons
287
      (ganeti-masterd and ganeti-rapi)
288
  @rtype: None
289

290
  """
291
  # TODO: log and report back to the caller the error failures; we
292
  # need to decide in which case we fail the RPC for this
293

    
294
  # GetMasterInfo will raise an exception if not able to return data
295
  master_netdev, master_ip, _ = GetMasterInfo()
296

    
297
  result = utils.RunCmd(["ip", "address", "del", "%s/32" % master_ip,
298
                         "dev", master_netdev])
299
  if result.failed:
300
    logging.error("Can't remove the master IP, error: %s", result.output)
301
    # but otherwise ignore the failure
302

    
303
  if stop_daemons:
304
    # stop/kill the rapi and the master daemon
305
    for daemon in constants.RAPI, constants.MASTERD:
306
      utils.KillProcess(utils.ReadPidFile(utils.DaemonPidFileName(daemon)))
307

    
308

    
309
def AddNode(dsa, dsapub, rsa, rsapub, sshkey, sshpub):
310
  """Joins this node to the cluster.
311

312
  This does the following:
313
      - updates the hostkeys of the machine (rsa and dsa)
314
      - adds the ssh private key to the user
315
      - adds the ssh public key to the users' authorized_keys file
316

317
  @type dsa: str
318
  @param dsa: the DSA private key to write
319
  @type dsapub: str
320
  @param dsapub: the DSA public key to write
321
  @type rsa: str
322
  @param rsa: the RSA private key to write
323
  @type rsapub: str
324
  @param rsapub: the RSA public key to write
325
  @type sshkey: str
326
  @param sshkey: the SSH private key to write
327
  @type sshpub: str
328
  @param sshpub: the SSH public key to write
329
  @rtype: boolean
330
  @return: the success of the operation
331

332
  """
333
  sshd_keys =  [(constants.SSH_HOST_RSA_PRIV, rsa, 0600),
334
                (constants.SSH_HOST_RSA_PUB, rsapub, 0644),
335
                (constants.SSH_HOST_DSA_PRIV, dsa, 0600),
336
                (constants.SSH_HOST_DSA_PUB, dsapub, 0644)]
337
  for name, content, mode in sshd_keys:
338
    utils.WriteFile(name, data=content, mode=mode)
339

    
340
  try:
341
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS,
342
                                                    mkdir=True)
343
  except errors.OpExecError, err:
344
    _Fail("Error while processing user ssh files: %s", err, exc=True)
345

    
346
  for name, content in [(priv_key, sshkey), (pub_key, sshpub)]:
347
    utils.WriteFile(name, data=content, mode=0600)
348

    
349
  utils.AddAuthorizedKey(auth_keys, sshpub)
350

    
351
  utils.RunCmd([constants.SSH_INITD_SCRIPT, "restart"])
352

    
353

    
354
def LeaveCluster():
355
  """Cleans up and remove the current node.
356

357
  This function cleans up and prepares the current node to be removed
358
  from the cluster.
359

360
  If processing is successful, then it raises an
361
  L{errors.QuitGanetiException} which is used as a special case to
362
  shutdown the node daemon.
363

364
  """
365
  _CleanDirectory(constants.DATA_DIR)
366
  JobQueuePurge()
367

    
368
  try:
369
    priv_key, pub_key, auth_keys = ssh.GetUserFiles(constants.GANETI_RUNAS)
370

    
371
    utils.RemoveAuthorizedKey(auth_keys, utils.ReadFile(pub_key))
372

    
373
    utils.RemoveFile(priv_key)
374
    utils.RemoveFile(pub_key)
375
  except errors.OpExecError:
376
    logging.exception("Error while processing ssh files")
377

    
378
  try:
379
    utils.RemoveFile(constants.HMAC_CLUSTER_KEY)
380
    utils.RemoveFile(constants.RAPI_CERT_FILE)
381
    utils.RemoveFile(constants.SSL_CERT_FILE)
382
  except:
383
    logging.exception("Error while removing cluster secrets")
384

    
385
  confd_pid = utils.ReadPidFile(utils.DaemonPidFileName(constants.CONFD))
386

    
387
  if confd_pid:
388
    utils.KillProcess(confd_pid, timeout=2)
389

    
390
  # Raise a custom exception (handled in ganeti-noded)
391
  raise errors.QuitGanetiException(True, 'Shutdown scheduled')
392

    
393

    
394
def GetNodeInfo(vgname, hypervisor_type):
395
  """Gives back a hash with different information about the node.
396

397
  @type vgname: C{string}
398
  @param vgname: the name of the volume group to ask for disk space information
399
  @type hypervisor_type: C{str}
400
  @param hypervisor_type: the name of the hypervisor to ask for
401
      memory information
402
  @rtype: C{dict}
403
  @return: dictionary with the following keys:
404
      - vg_size is the size of the configured volume group in MiB
405
      - vg_free is the free size of the volume group in MiB
406
      - memory_dom0 is the memory allocated for domain0 in MiB
407
      - memory_free is the currently available (free) ram in MiB
408
      - memory_total is the total number of ram in MiB
409

410
  """
411
  outputarray = {}
412
  vginfo = _GetVGInfo(vgname)
413
  outputarray['vg_size'] = vginfo['vg_size']
414
  outputarray['vg_free'] = vginfo['vg_free']
415

    
416
  hyper = hypervisor.GetHypervisor(hypervisor_type)
417
  hyp_info = hyper.GetNodeInfo()
418
  if hyp_info is not None:
419
    outputarray.update(hyp_info)
420

    
421
  outputarray["bootid"] = utils.ReadFile(_BOOT_ID_PATH, size=128).rstrip("\n")
422

    
423
  return outputarray
424

    
425

    
426
def VerifyNode(what, cluster_name):
427
  """Verify the status of the local node.
428

429
  Based on the input L{what} parameter, various checks are done on the
430
  local node.
431

432
  If the I{filelist} key is present, this list of
433
  files is checksummed and the file/checksum pairs are returned.
434

435
  If the I{nodelist} key is present, we check that we have
436
  connectivity via ssh with the target nodes (and check the hostname
437
  report).
438

439
  If the I{node-net-test} key is present, we check that we have
440
  connectivity to the given nodes via both primary IP and, if
441
  applicable, secondary IPs.
442

443
  @type what: C{dict}
444
  @param what: a dictionary of things to check:
445
      - filelist: list of files for which to compute checksums
446
      - nodelist: list of nodes we should check ssh communication with
447
      - node-net-test: list of nodes we should check node daemon port
448
        connectivity with
449
      - hypervisor: list with hypervisors to run the verify for
450
  @rtype: dict
451
  @return: a dictionary with the same keys as the input dict, and
452
      values representing the result of the checks
453

454
  """
455
  result = {}
456

    
457
  if constants.NV_HYPERVISOR in what:
458
    result[constants.NV_HYPERVISOR] = tmp = {}
459
    for hv_name in what[constants.NV_HYPERVISOR]:
460
      tmp[hv_name] = hypervisor.GetHypervisor(hv_name).Verify()
461

    
462
  if constants.NV_FILELIST in what:
463
    result[constants.NV_FILELIST] = utils.FingerprintFiles(
464
      what[constants.NV_FILELIST])
465

    
466
  if constants.NV_NODELIST in what:
467
    result[constants.NV_NODELIST] = tmp = {}
468
    random.shuffle(what[constants.NV_NODELIST])
469
    for node in what[constants.NV_NODELIST]:
470
      success, message = _GetSshRunner(cluster_name).VerifyNodeHostname(node)
471
      if not success:
472
        tmp[node] = message
473

    
474
  if constants.NV_NODENETTEST in what:
475
    result[constants.NV_NODENETTEST] = tmp = {}
476
    my_name = utils.HostInfo().name
477
    my_pip = my_sip = None
478
    for name, pip, sip in what[constants.NV_NODENETTEST]:
479
      if name == my_name:
480
        my_pip = pip
481
        my_sip = sip
482
        break
483
    if not my_pip:
484
      tmp[my_name] = ("Can't find my own primary/secondary IP"
485
                      " in the node list")
486
    else:
487
      port = utils.GetDaemonPort(constants.NODED)
488
      for name, pip, sip in what[constants.NV_NODENETTEST]:
489
        fail = []
490
        if not utils.TcpPing(pip, port, source=my_pip):
491
          fail.append("primary")
492
        if sip != pip:
493
          if not utils.TcpPing(sip, port, source=my_sip):
494
            fail.append("secondary")
495
        if fail:
496
          tmp[name] = ("failure using the %s interface(s)" %
497
                       " and ".join(fail))
498

    
499
  if constants.NV_LVLIST in what:
500
    result[constants.NV_LVLIST] = GetVolumeList(what[constants.NV_LVLIST])
501

    
502
  if constants.NV_INSTANCELIST in what:
503
    result[constants.NV_INSTANCELIST] = GetInstanceList(
504
      what[constants.NV_INSTANCELIST])
505

    
506
  if constants.NV_VGLIST in what:
507
    result[constants.NV_VGLIST] = utils.ListVolumeGroups()
508

    
509
  if constants.NV_VERSION in what:
510
    result[constants.NV_VERSION] = (constants.PROTOCOL_VERSION,
511
                                    constants.RELEASE_VERSION)
512

    
513
  if constants.NV_HVINFO in what:
514
    hyper = hypervisor.GetHypervisor(what[constants.NV_HVINFO])
515
    result[constants.NV_HVINFO] = hyper.GetNodeInfo()
516

    
517
  if constants.NV_DRBDLIST in what:
518
    try:
519
      used_minors = bdev.DRBD8.GetUsedDevs().keys()
520
    except errors.BlockDeviceError, err:
521
      logging.warning("Can't get used minors list", exc_info=True)
522
      used_minors = str(err)
523
    result[constants.NV_DRBDLIST] = used_minors
524

    
525
  if constants.NV_NODESETUP in what:
526
    result[constants.NV_NODESETUP] = tmpr = []
527
    if not os.path.isdir("/sys/block") or not os.path.isdir("/sys/class/net"):
528
      tmpr.append("The sysfs filesytem doesn't seem to be mounted"
529
                  " under /sys, missing required directories /sys/block"
530
                  " and /sys/class/net")
531
    if (not os.path.isdir("/proc/sys") or
532
        not os.path.isfile("/proc/sysrq-trigger")):
533
      tmpr.append("The procfs filesystem doesn't seem to be mounted"
534
                  " under /proc, missing required directory /proc/sys and"
535
                  " the file /proc/sysrq-trigger")
536
  return result
537

    
538

    
539
def GetVolumeList(vg_name):
540
  """Compute list of logical volumes and their size.
541

542
  @type vg_name: str
543
  @param vg_name: the volume group whose LVs we should list
544
  @rtype: dict
545
  @return:
546
      dictionary of all partions (key) with value being a tuple of
547
      their size (in MiB), inactive and online status::
548

549
        {'test1': ('20.06', True, True)}
550

551
      in case of errors, a string is returned with the error
552
      details.
553

554
  """
555
  lvs = {}
556
  sep = '|'
557
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
558
                         "--separator=%s" % sep,
559
                         "-olv_name,lv_size,lv_attr", vg_name])
560
  if result.failed:
561
    _Fail("Failed to list logical volumes, lvs output: %s", result.output)
562

    
563
  valid_line_re = re.compile("^ *([^|]+)\|([0-9.]+)\|([^|]{6})\|?$")
564
  for line in result.stdout.splitlines():
565
    line = line.strip()
566
    match = valid_line_re.match(line)
567
    if not match:
568
      logging.error("Invalid line returned from lvs output: '%s'", line)
569
      continue
570
    name, size, attr = match.groups()
571
    inactive = attr[4] == '-'
572
    online = attr[5] == 'o'
573
    virtual = attr[0] == 'v'
574
    if virtual:
575
      # we don't want to report such volumes as existing, since they
576
      # don't really hold data
577
      continue
578
    lvs[name] = (size, inactive, online)
579

    
580
  return lvs
581

    
582

    
583
def ListVolumeGroups():
584
  """List the volume groups and their size.
585

586
  @rtype: dict
587
  @return: dictionary with keys volume name and values the
588
      size of the volume
589

590
  """
591
  return utils.ListVolumeGroups()
592

    
593

    
594
def NodeVolumes():
595
  """List all volumes on this node.
596

597
  @rtype: list
598
  @return:
599
    A list of dictionaries, each having four keys:
600
      - name: the logical volume name,
601
      - size: the size of the logical volume
602
      - dev: the physical device on which the LV lives
603
      - vg: the volume group to which it belongs
604

605
    In case of errors, we return an empty list and log the
606
    error.
607

608
    Note that since a logical volume can live on multiple physical
609
    volumes, the resulting list might include a logical volume
610
    multiple times.
611

612
  """
613
  result = utils.RunCmd(["lvs", "--noheadings", "--units=m", "--nosuffix",
614
                         "--separator=|",
615
                         "--options=lv_name,lv_size,devices,vg_name"])
616
  if result.failed:
617
    _Fail("Failed to list logical volumes, lvs output: %s",
618
          result.output)
619

    
620
  def parse_dev(dev):
621
    if '(' in dev:
622
      return dev.split('(')[0]
623
    else:
624
      return dev
625

    
626
  def map_line(line):
627
    return {
628
      'name': line[0].strip(),
629
      'size': line[1].strip(),
630
      'dev': parse_dev(line[2].strip()),
631
      'vg': line[3].strip(),
632
    }
633

    
634
  return [map_line(line.split('|')) for line in result.stdout.splitlines()
635
          if line.count('|') >= 3]
636

    
637

    
638
def BridgesExist(bridges_list):
639
  """Check if a list of bridges exist on the current node.
640

641
  @rtype: boolean
642
  @return: C{True} if all of them exist, C{False} otherwise
643

644
  """
645
  missing = []
646
  for bridge in bridges_list:
647
    if not utils.BridgeExists(bridge):
648
      missing.append(bridge)
649

    
650
  if missing:
651
    _Fail("Missing bridges %s", ", ".join(missing))
652

    
653

    
654
def GetInstanceList(hypervisor_list):
655
  """Provides a list of instances.
656

657
  @type hypervisor_list: list
658
  @param hypervisor_list: the list of hypervisors to query information
659

660
  @rtype: list
661
  @return: a list of all running instances on the current node
662
    - instance1.example.com
663
    - instance2.example.com
664

665
  """
666
  results = []
667
  for hname in hypervisor_list:
668
    try:
669
      names = hypervisor.GetHypervisor(hname).ListInstances()
670
      results.extend(names)
671
    except errors.HypervisorError, err:
672
      _Fail("Error enumerating instances (hypervisor %s): %s",
673
            hname, err, exc=True)
674

    
675
  return results
676

    
677

    
678
def GetInstanceInfo(instance, hname):
679
  """Gives back the information about an instance as a dictionary.
680

681
  @type instance: string
682
  @param instance: the instance name
683
  @type hname: string
684
  @param hname: the hypervisor type of the instance
685

686
  @rtype: dict
687
  @return: dictionary with the following keys:
688
      - memory: memory size of instance (int)
689
      - state: xen state of instance (string)
690
      - time: cpu time of instance (float)
691

692
  """
693
  output = {}
694

    
695
  iinfo = hypervisor.GetHypervisor(hname).GetInstanceInfo(instance)
696
  if iinfo is not None:
697
    output['memory'] = iinfo[2]
698
    output['state'] = iinfo[4]
699
    output['time'] = iinfo[5]
700

    
701
  return output
702

    
703

    
704
def GetInstanceMigratable(instance):
705
  """Gives whether an instance can be migrated.
706

707
  @type instance: L{objects.Instance}
708
  @param instance: object representing the instance to be checked.
709

710
  @rtype: tuple
711
  @return: tuple of (result, description) where:
712
      - result: whether the instance can be migrated or not
713
      - description: a description of the issue, if relevant
714

715
  """
716
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
717
  iname = instance.name
718
  if iname not in hyper.ListInstances():
719
    _Fail("Instance %s is not running", iname)
720

    
721
  for idx in range(len(instance.disks)):
722
    link_name = _GetBlockDevSymlinkPath(iname, idx)
723
    if not os.path.islink(link_name):
724
      _Fail("Instance %s was not restarted since ganeti 1.2.5", iname)
725

    
726

    
727
def GetAllInstancesInfo(hypervisor_list):
728
  """Gather data about all instances.
729

730
  This is the equivalent of L{GetInstanceInfo}, except that it
731
  computes data for all instances at once, thus being faster if one
732
  needs data about more than one instance.
733

734
  @type hypervisor_list: list
735
  @param hypervisor_list: list of hypervisors to query for instance data
736

737
  @rtype: dict
738
  @return: dictionary of instance: data, with data having the following keys:
739
      - memory: memory size of instance (int)
740
      - state: xen state of instance (string)
741
      - time: cpu time of instance (float)
742
      - vcpus: the number of vcpus
743

744
  """
745
  output = {}
746

    
747
  for hname in hypervisor_list:
748
    iinfo = hypervisor.GetHypervisor(hname).GetAllInstancesInfo()
749
    if iinfo:
750
      for name, _, memory, vcpus, state, times in iinfo:
751
        value = {
752
          'memory': memory,
753
          'vcpus': vcpus,
754
          'state': state,
755
          'time': times,
756
          }
757
        if name in output:
758
          # we only check static parameters, like memory and vcpus,
759
          # and not state and time which can change between the
760
          # invocations of the different hypervisors
761
          for key in 'memory', 'vcpus':
762
            if value[key] != output[name][key]:
763
              _Fail("Instance %s is running twice"
764
                    " with different parameters", name)
765
        output[name] = value
766

    
767
  return output
768

    
769

    
770
def InstanceOsAdd(instance, reinstall):
771
  """Add an OS to an instance.
772

773
  @type instance: L{objects.Instance}
774
  @param instance: Instance whose OS is to be installed
775
  @type reinstall: boolean
776
  @param reinstall: whether this is an instance reinstall
777
  @rtype: None
778

779
  """
780
  inst_os = OSFromDisk(instance.os)
781

    
782
  create_env = OSEnvironment(instance, inst_os)
783
  if reinstall:
784
    create_env['INSTANCE_REINSTALL'] = "1"
785

    
786
  logfile = "%s/add-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
787
                                     instance.name, int(time.time()))
788

    
789
  result = utils.RunCmd([inst_os.create_script], env=create_env,
790
                        cwd=inst_os.path, output=logfile,)
791
  if result.failed:
792
    logging.error("os create command '%s' returned error: %s, logfile: %s,"
793
                  " output: %s", result.cmd, result.fail_reason, logfile,
794
                  result.output)
795
    lines = [utils.SafeEncode(val)
796
             for val in utils.TailFile(logfile, lines=20)]
797
    _Fail("OS create script failed (%s), last lines in the"
798
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
799

    
800

    
801
def RunRenameInstance(instance, old_name):
802
  """Run the OS rename script for an instance.
803

804
  @type instance: L{objects.Instance}
805
  @param instance: Instance whose OS is to be installed
806
  @type old_name: string
807
  @param old_name: previous instance name
808
  @rtype: boolean
809
  @return: the success of the operation
810

811
  """
812
  inst_os = OSFromDisk(instance.os)
813

    
814
  rename_env = OSEnvironment(instance, inst_os)
815
  rename_env['OLD_INSTANCE_NAME'] = old_name
816

    
817
  logfile = "%s/rename-%s-%s-%s-%d.log" % (constants.LOG_OS_DIR, instance.os,
818
                                           old_name,
819
                                           instance.name, int(time.time()))
820

    
821
  result = utils.RunCmd([inst_os.rename_script], env=rename_env,
822
                        cwd=inst_os.path, output=logfile)
823

    
824
  if result.failed:
825
    logging.error("os create command '%s' returned error: %s output: %s",
826
                  result.cmd, result.fail_reason, result.output)
827
    lines = [utils.SafeEncode(val)
828
             for val in utils.TailFile(logfile, lines=20)]
829
    _Fail("OS rename script failed (%s), last lines in the"
830
          " log file:\n%s", result.fail_reason, "\n".join(lines), log=False)
831

    
832

    
833
def _GetVGInfo(vg_name):
834
  """Get information about the volume group.
835

836
  @type vg_name: str
837
  @param vg_name: the volume group which we query
838
  @rtype: dict
839
  @return:
840
    A dictionary with the following keys:
841
      - C{vg_size} is the total size of the volume group in MiB
842
      - C{vg_free} is the free size of the volume group in MiB
843
      - C{pv_count} are the number of physical disks in that VG
844

845
    If an error occurs during gathering of data, we return the same dict
846
    with keys all set to None.
847

848
  """
849
  retdic = dict.fromkeys(["vg_size", "vg_free", "pv_count"])
850

    
851
  retval = utils.RunCmd(["vgs", "-ovg_size,vg_free,pv_count", "--noheadings",
852
                         "--nosuffix", "--units=m", "--separator=:", vg_name])
853

    
854
  if retval.failed:
855
    logging.error("volume group %s not present", vg_name)
856
    return retdic
857
  valarr = retval.stdout.strip().rstrip(':').split(':')
858
  if len(valarr) == 3:
859
    try:
860
      retdic = {
861
        "vg_size": int(round(float(valarr[0]), 0)),
862
        "vg_free": int(round(float(valarr[1]), 0)),
863
        "pv_count": int(valarr[2]),
864
        }
865
    except ValueError, err:
866
      logging.exception("Fail to parse vgs output: %s", err)
867
  else:
868
    logging.error("vgs output has the wrong number of fields (expected"
869
                  " three): %s", str(valarr))
870
  return retdic
871

    
872

    
873
def _GetBlockDevSymlinkPath(instance_name, idx):
874
  return os.path.join(constants.DISK_LINKS_DIR,
875
                      "%s:%d" % (instance_name, idx))
876

    
877

    
878
def _SymlinkBlockDev(instance_name, device_path, idx):
879
  """Set up symlinks to a instance's block device.
880

881
  This is an auxiliary function run when an instance is start (on the primary
882
  node) or when an instance is migrated (on the target node).
883

884

885
  @param instance_name: the name of the target instance
886
  @param device_path: path of the physical block device, on the node
887
  @param idx: the disk index
888
  @return: absolute path to the disk's symlink
889

890
  """
891
  link_name = _GetBlockDevSymlinkPath(instance_name, idx)
892
  try:
893
    os.symlink(device_path, link_name)
894
  except OSError, err:
895
    if err.errno == errno.EEXIST:
896
      if (not os.path.islink(link_name) or
897
          os.readlink(link_name) != device_path):
898
        os.remove(link_name)
899
        os.symlink(device_path, link_name)
900
    else:
901
      raise
902

    
903
  return link_name
904

    
905

    
906
def _RemoveBlockDevLinks(instance_name, disks):
907
  """Remove the block device symlinks belonging to the given instance.
908

909
  """
910
  for idx, _ in enumerate(disks):
911
    link_name = _GetBlockDevSymlinkPath(instance_name, idx)
912
    if os.path.islink(link_name):
913
      try:
914
        os.remove(link_name)
915
      except OSError:
916
        logging.exception("Can't remove symlink '%s'", link_name)
917

    
918

    
919
def _GatherAndLinkBlockDevs(instance):
920
  """Set up an instance's block device(s).
921

922
  This is run on the primary node at instance startup. The block
923
  devices must be already assembled.
924

925
  @type instance: L{objects.Instance}
926
  @param instance: the instance whose disks we shoul assemble
927
  @rtype: list
928
  @return: list of (disk_object, device_path)
929

930
  """
931
  block_devices = []
932
  for idx, disk in enumerate(instance.disks):
933
    device = _RecursiveFindBD(disk)
934
    if device is None:
935
      raise errors.BlockDeviceError("Block device '%s' is not set up." %
936
                                    str(disk))
937
    device.Open()
938
    try:
939
      link_name = _SymlinkBlockDev(instance.name, device.dev_path, idx)
940
    except OSError, e:
941
      raise errors.BlockDeviceError("Cannot create block device symlink: %s" %
942
                                    e.strerror)
943

    
944
    block_devices.append((disk, link_name))
945

    
946
  return block_devices
947

    
948

    
949
def StartInstance(instance):
950
  """Start an instance.
951

952
  @type instance: L{objects.Instance}
953
  @param instance: the instance object
954
  @rtype: None
955

956
  """
957
  running_instances = GetInstanceList([instance.hypervisor])
958

    
959
  if instance.name in running_instances:
960
    logging.info("Instance %s already running, not starting", instance.name)
961
    return
962

    
963
  try:
964
    block_devices = _GatherAndLinkBlockDevs(instance)
965
    hyper = hypervisor.GetHypervisor(instance.hypervisor)
966
    hyper.StartInstance(instance, block_devices)
967
  except errors.BlockDeviceError, err:
968
    _Fail("Block device error: %s", err, exc=True)
969
  except errors.HypervisorError, err:
970
    _RemoveBlockDevLinks(instance.name, instance.disks)
971
    _Fail("Hypervisor error: %s", err, exc=True)
972

    
973

    
974
def InstanceShutdown(instance, timeout):
975
  """Shut an instance down.
976

977
  @note: this functions uses polling with a hardcoded timeout.
978

979
  @type instance: L{objects.Instance}
980
  @param instance: the instance object
981
  @type timeout: integer
982
  @param timeout: maximum timeout for soft shutdown
983
  @rtype: None
984

985
  """
986
  hv_name = instance.hypervisor
987
  hyper = hypervisor.GetHypervisor(hv_name)
988
  running_instances = hyper.ListInstances()
989
  iname = instance.name
990

    
991
  if iname not in running_instances:
992
    logging.info("Instance %s not running, doing nothing", iname)
993
    return
994

    
995
  start = time.time()
996
  end = start + timeout
997
  sleep_time = 5
998

    
999
  tried_once = False
1000
  while time.time() < end:
1001
    try:
1002
      hyper.StopInstance(instance, retry=tried_once)
1003
    except errors.HypervisorError, err:
1004
      if instance.name not in hyper.ListInstances():
1005
        # if the instance is no longer existing, consider this a
1006
        # success and go to cleanup
1007
        break
1008
      _Fail("Failed to stop instance %s: %s", iname, err)
1009
    tried_once = True
1010
    time.sleep(sleep_time)
1011
    if instance.name not in hyper.ListInstances():
1012
      break
1013
  else:
1014
    # the shutdown did not succeed
1015
    logging.error("Shutdown of '%s' unsuccessful, forcing", iname)
1016

    
1017
    try:
1018
      hyper.StopInstance(instance, force=True)
1019
    except errors.HypervisorError, err:
1020
      if instance.name in hyper.ListInstances():
1021
        # only raise an error if the instance still exists, otherwise
1022
        # the error could simply be "instance ... unknown"!
1023
        _Fail("Failed to force stop instance %s: %s", iname, err)
1024

    
1025
    time.sleep(1)
1026
    if instance.name in GetInstanceList([hv_name]):
1027
      _Fail("Could not shutdown instance %s even by destroy", iname)
1028

    
1029
  _RemoveBlockDevLinks(iname, instance.disks)
1030

    
1031

    
1032
def InstanceReboot(instance, reboot_type, shutdown_timeout):
1033
  """Reboot an instance.
1034

1035
  @type instance: L{objects.Instance}
1036
  @param instance: the instance object to reboot
1037
  @type reboot_type: str
1038
  @param reboot_type: the type of reboot, one the following
1039
    constants:
1040
      - L{constants.INSTANCE_REBOOT_SOFT}: only reboot the
1041
        instance OS, do not recreate the VM
1042
      - L{constants.INSTANCE_REBOOT_HARD}: tear down and
1043
        restart the VM (at the hypervisor level)
1044
      - the other reboot type (L{constants.INSTANCE_REBOOT_FULL}) is
1045
        not accepted here, since that mode is handled differently, in
1046
        cmdlib, and translates into full stop and start of the
1047
        instance (instead of a call_instance_reboot RPC)
1048
  @type timeout: integer
1049
  @param timeout: maximum timeout for soft shutdown
1050
  @rtype: None
1051

1052
  """
1053
  running_instances = GetInstanceList([instance.hypervisor])
1054

    
1055
  if instance.name not in running_instances:
1056
    _Fail("Cannot reboot instance %s that is not running", instance.name)
1057

    
1058
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1059
  if reboot_type == constants.INSTANCE_REBOOT_SOFT:
1060
    try:
1061
      hyper.RebootInstance(instance)
1062
    except errors.HypervisorError, err:
1063
      _Fail("Failed to soft reboot instance %s: %s", instance.name, err)
1064
  elif reboot_type == constants.INSTANCE_REBOOT_HARD:
1065
    try:
1066
      InstanceShutdown(instance, shutdown_timeout)
1067
      return StartInstance(instance)
1068
    except errors.HypervisorError, err:
1069
      _Fail("Failed to hard reboot instance %s: %s", instance.name, err)
1070
  else:
1071
    _Fail("Invalid reboot_type received: %s", reboot_type)
1072

    
1073

    
1074
def MigrationInfo(instance):
1075
  """Gather information about an instance to be migrated.
1076

1077
  @type instance: L{objects.Instance}
1078
  @param instance: the instance definition
1079

1080
  """
1081
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1082
  try:
1083
    info = hyper.MigrationInfo(instance)
1084
  except errors.HypervisorError, err:
1085
    _Fail("Failed to fetch migration information: %s", err, exc=True)
1086
  return info
1087

    
1088

    
1089
def AcceptInstance(instance, info, target):
1090
  """Prepare the node to accept an instance.
1091

1092
  @type instance: L{objects.Instance}
1093
  @param instance: the instance definition
1094
  @type info: string/data (opaque)
1095
  @param info: migration information, from the source node
1096
  @type target: string
1097
  @param target: target host (usually ip), on this node
1098

1099
  """
1100
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1101
  try:
1102
    hyper.AcceptInstance(instance, info, target)
1103
  except errors.HypervisorError, err:
1104
    _Fail("Failed to accept instance: %s", err, exc=True)
1105

    
1106

    
1107
def FinalizeMigration(instance, info, success):
1108
  """Finalize any preparation to accept an instance.
1109

1110
  @type instance: L{objects.Instance}
1111
  @param instance: the instance definition
1112
  @type info: string/data (opaque)
1113
  @param info: migration information, from the source node
1114
  @type success: boolean
1115
  @param success: whether the migration was a success or a failure
1116

1117
  """
1118
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1119
  try:
1120
    hyper.FinalizeMigration(instance, info, success)
1121
  except errors.HypervisorError, err:
1122
    _Fail("Failed to finalize migration: %s", err, exc=True)
1123

    
1124

    
1125
def MigrateInstance(instance, target, live):
1126
  """Migrates an instance to another node.
1127

1128
  @type instance: L{objects.Instance}
1129
  @param instance: the instance definition
1130
  @type target: string
1131
  @param target: the target node name
1132
  @type live: boolean
1133
  @param live: whether the migration should be done live or not (the
1134
      interpretation of this parameter is left to the hypervisor)
1135
  @rtype: tuple
1136
  @return: a tuple of (success, msg) where:
1137
      - succes is a boolean denoting the success/failure of the operation
1138
      - msg is a string with details in case of failure
1139

1140
  """
1141
  hyper = hypervisor.GetHypervisor(instance.hypervisor)
1142

    
1143
  try:
1144
    hyper.MigrateInstance(instance.name, target, live)
1145
  except errors.HypervisorError, err:
1146
    _Fail("Failed to migrate instance: %s", err, exc=True)
1147

    
1148

    
1149
def BlockdevCreate(disk, size, owner, on_primary, info):
1150
  """Creates a block device for an instance.
1151

1152
  @type disk: L{objects.Disk}
1153
  @param disk: the object describing the disk we should create
1154
  @type size: int
1155
  @param size: the size of the physical underlying device, in MiB
1156
  @type owner: str
1157
  @param owner: the name of the instance for which disk is created,
1158
      used for device cache data
1159
  @type on_primary: boolean
1160
  @param on_primary:  indicates if it is the primary node or not
1161
  @type info: string
1162
  @param info: string that will be sent to the physical device
1163
      creation, used for example to set (LVM) tags on LVs
1164

1165
  @return: the new unique_id of the device (this can sometime be
1166
      computed only after creation), or None. On secondary nodes,
1167
      it's not required to return anything.
1168

1169
  """
1170
  clist = []
1171
  if disk.children:
1172
    for child in disk.children:
1173
      try:
1174
        crdev = _RecursiveAssembleBD(child, owner, on_primary)
1175
      except errors.BlockDeviceError, err:
1176
        _Fail("Can't assemble device %s: %s", child, err)
1177
      if on_primary or disk.AssembleOnSecondary():
1178
        # we need the children open in case the device itself has to
1179
        # be assembled
1180
        try:
1181
          crdev.Open()
1182
        except errors.BlockDeviceError, err:
1183
          _Fail("Can't make child '%s' read-write: %s", child, err)
1184
      clist.append(crdev)
1185

    
1186
  try:
1187
    device = bdev.Create(disk.dev_type, disk.physical_id, clist, disk.size)
1188
  except errors.BlockDeviceError, err:
1189
    _Fail("Can't create block device: %s", err)
1190

    
1191
  if on_primary or disk.AssembleOnSecondary():
1192
    try:
1193
      device.Assemble()
1194
    except errors.BlockDeviceError, err:
1195
      _Fail("Can't assemble device after creation, unusual event: %s", err)
1196
    device.SetSyncSpeed(constants.SYNC_SPEED)
1197
    if on_primary or disk.OpenOnSecondary():
1198
      try:
1199
        device.Open(force=True)
1200
      except errors.BlockDeviceError, err:
1201
        _Fail("Can't make device r/w after creation, unusual event: %s", err)
1202
    DevCacheManager.UpdateCache(device.dev_path, owner,
1203
                                on_primary, disk.iv_name)
1204

    
1205
  device.SetInfo(info)
1206

    
1207
  return device.unique_id
1208

    
1209

    
1210
def BlockdevRemove(disk):
1211
  """Remove a block device.
1212

1213
  @note: This is intended to be called recursively.
1214

1215
  @type disk: L{objects.Disk}
1216
  @param disk: the disk object we should remove
1217
  @rtype: boolean
1218
  @return: the success of the operation
1219

1220
  """
1221
  msgs = []
1222
  try:
1223
    rdev = _RecursiveFindBD(disk)
1224
  except errors.BlockDeviceError, err:
1225
    # probably can't attach
1226
    logging.info("Can't attach to device %s in remove", disk)
1227
    rdev = None
1228
  if rdev is not None:
1229
    r_path = rdev.dev_path
1230
    try:
1231
      rdev.Remove()
1232
    except errors.BlockDeviceError, err:
1233
      msgs.append(str(err))
1234
    if not msgs:
1235
      DevCacheManager.RemoveCache(r_path)
1236

    
1237
  if disk.children:
1238
    for child in disk.children:
1239
      try:
1240
        BlockdevRemove(child)
1241
      except RPCFail, err:
1242
        msgs.append(str(err))
1243

    
1244
  if msgs:
1245
    _Fail("; ".join(msgs))
1246

    
1247

    
1248
def _RecursiveAssembleBD(disk, owner, as_primary):
1249
  """Activate a block device for an instance.
1250

1251
  This is run on the primary and secondary nodes for an instance.
1252

1253
  @note: this function is called recursively.
1254

1255
  @type disk: L{objects.Disk}
1256
  @param disk: the disk we try to assemble
1257
  @type owner: str
1258
  @param owner: the name of the instance which owns the disk
1259
  @type as_primary: boolean
1260
  @param as_primary: if we should make the block device
1261
      read/write
1262

1263
  @return: the assembled device or None (in case no device
1264
      was assembled)
1265
  @raise errors.BlockDeviceError: in case there is an error
1266
      during the activation of the children or the device
1267
      itself
1268

1269
  """
1270
  children = []
1271
  if disk.children:
1272
    mcn = disk.ChildrenNeeded()
1273
    if mcn == -1:
1274
      mcn = 0 # max number of Nones allowed
1275
    else:
1276
      mcn = len(disk.children) - mcn # max number of Nones
1277
    for chld_disk in disk.children:
1278
      try:
1279
        cdev = _RecursiveAssembleBD(chld_disk, owner, as_primary)
1280
      except errors.BlockDeviceError, err:
1281
        if children.count(None) >= mcn:
1282
          raise
1283
        cdev = None
1284
        logging.error("Error in child activation (but continuing): %s",
1285
                      str(err))
1286
      children.append(cdev)
1287

    
1288
  if as_primary or disk.AssembleOnSecondary():
1289
    r_dev = bdev.Assemble(disk.dev_type, disk.physical_id, children, disk.size)
1290
    r_dev.SetSyncSpeed(constants.SYNC_SPEED)
1291
    result = r_dev
1292
    if as_primary or disk.OpenOnSecondary():
1293
      r_dev.Open()
1294
    DevCacheManager.UpdateCache(r_dev.dev_path, owner,
1295
                                as_primary, disk.iv_name)
1296

    
1297
  else:
1298
    result = True
1299
  return result
1300

    
1301

    
1302
def BlockdevAssemble(disk, owner, as_primary):
1303
  """Activate a block device for an instance.
1304

1305
  This is a wrapper over _RecursiveAssembleBD.
1306

1307
  @rtype: str or boolean
1308
  @return: a C{/dev/...} path for primary nodes, and
1309
      C{True} for secondary nodes
1310

1311
  """
1312
  try:
1313
    result = _RecursiveAssembleBD(disk, owner, as_primary)
1314
    if isinstance(result, bdev.BlockDev):
1315
      result = result.dev_path
1316
  except errors.BlockDeviceError, err:
1317
    _Fail("Error while assembling disk: %s", err, exc=True)
1318

    
1319
  return result
1320

    
1321

    
1322
def BlockdevShutdown(disk):
1323
  """Shut down a block device.
1324

1325
  First, if the device is assembled (Attach() is successful), then
1326
  the device is shutdown. Then the children of the device are
1327
  shutdown.
1328

1329
  This function is called recursively. Note that we don't cache the
1330
  children or such, as oppossed to assemble, shutdown of different
1331
  devices doesn't require that the upper device was active.
1332

1333
  @type disk: L{objects.Disk}
1334
  @param disk: the description of the disk we should
1335
      shutdown
1336
  @rtype: None
1337

1338
  """
1339
  msgs = []
1340
  r_dev = _RecursiveFindBD(disk)
1341
  if r_dev is not None:
1342
    r_path = r_dev.dev_path
1343
    try:
1344
      r_dev.Shutdown()
1345
      DevCacheManager.RemoveCache(r_path)
1346
    except errors.BlockDeviceError, err:
1347
      msgs.append(str(err))
1348

    
1349
  if disk.children:
1350
    for child in disk.children:
1351
      try:
1352
        BlockdevShutdown(child)
1353
      except RPCFail, err:
1354
        msgs.append(str(err))
1355

    
1356
  if msgs:
1357
    _Fail("; ".join(msgs))
1358

    
1359

    
1360
def BlockdevAddchildren(parent_cdev, new_cdevs):
1361
  """Extend a mirrored block device.
1362

1363
  @type parent_cdev: L{objects.Disk}
1364
  @param parent_cdev: the disk to which we should add children
1365
  @type new_cdevs: list of L{objects.Disk}
1366
  @param new_cdevs: the list of children which we should add
1367
  @rtype: None
1368

1369
  """
1370
  parent_bdev = _RecursiveFindBD(parent_cdev)
1371
  if parent_bdev is None:
1372
    _Fail("Can't find parent device '%s' in add children", parent_cdev)
1373
  new_bdevs = [_RecursiveFindBD(disk) for disk in new_cdevs]
1374
  if new_bdevs.count(None) > 0:
1375
    _Fail("Can't find new device(s) to add: %s:%s", new_bdevs, new_cdevs)
1376
  parent_bdev.AddChildren(new_bdevs)
1377

    
1378

    
1379
def BlockdevRemovechildren(parent_cdev, new_cdevs):
1380
  """Shrink a mirrored block device.
1381

1382
  @type parent_cdev: L{objects.Disk}
1383
  @param parent_cdev: the disk from which we should remove children
1384
  @type new_cdevs: list of L{objects.Disk}
1385
  @param new_cdevs: the list of children which we should remove
1386
  @rtype: None
1387

1388
  """
1389
  parent_bdev = _RecursiveFindBD(parent_cdev)
1390
  if parent_bdev is None:
1391
    _Fail("Can't find parent device '%s' in remove children", parent_cdev)
1392
  devs = []
1393
  for disk in new_cdevs:
1394
    rpath = disk.StaticDevPath()
1395
    if rpath is None:
1396
      bd = _RecursiveFindBD(disk)
1397
      if bd is None:
1398
        _Fail("Can't find device %s while removing children", disk)
1399
      else:
1400
        devs.append(bd.dev_path)
1401
    else:
1402
      devs.append(rpath)
1403
  parent_bdev.RemoveChildren(devs)
1404

    
1405

    
1406
def BlockdevGetmirrorstatus(disks):
1407
  """Get the mirroring status of a list of devices.
1408

1409
  @type disks: list of L{objects.Disk}
1410
  @param disks: the list of disks which we should query
1411
  @rtype: disk
1412
  @return:
1413
      a list of (mirror_done, estimated_time) tuples, which
1414
      are the result of L{bdev.BlockDev.CombinedSyncStatus}
1415
  @raise errors.BlockDeviceError: if any of the disks cannot be
1416
      found
1417

1418
  """
1419
  stats = []
1420
  for dsk in disks:
1421
    rbd = _RecursiveFindBD(dsk)
1422
    if rbd is None:
1423
      _Fail("Can't find device %s", dsk)
1424

    
1425
    stats.append(rbd.CombinedSyncStatus())
1426

    
1427
  return stats
1428

    
1429

    
1430
def _RecursiveFindBD(disk):
1431
  """Check if a device is activated.
1432

1433
  If so, return information about the real device.
1434

1435
  @type disk: L{objects.Disk}
1436
  @param disk: the disk object we need to find
1437

1438
  @return: None if the device can't be found,
1439
      otherwise the device instance
1440

1441
  """
1442
  children = []
1443
  if disk.children:
1444
    for chdisk in disk.children:
1445
      children.append(_RecursiveFindBD(chdisk))
1446

    
1447
  return bdev.FindDevice(disk.dev_type, disk.physical_id, children, disk.size)
1448

    
1449

    
1450
def BlockdevFind(disk):
1451
  """Check if a device is activated.
1452

1453
  If it is, return information about the real device.
1454

1455
  @type disk: L{objects.Disk}
1456
  @param disk: the disk to find
1457
  @rtype: None or objects.BlockDevStatus
1458
  @return: None if the disk cannot be found, otherwise a the current
1459
           information
1460

1461
  """
1462
  try:
1463
    rbd = _RecursiveFindBD(disk)
1464
  except errors.BlockDeviceError, err:
1465
    _Fail("Failed to find device: %s", err, exc=True)
1466

    
1467
  if rbd is None:
1468
    return None
1469

    
1470
  return rbd.GetSyncStatus()
1471

    
1472

    
1473
def BlockdevGetsize(disks):
1474
  """Computes the size of the given disks.
1475

1476
  If a disk is not found, returns None instead.
1477

1478
  @type disks: list of L{objects.Disk}
1479
  @param disks: the list of disk to compute the size for
1480
  @rtype: list
1481
  @return: list with elements None if the disk cannot be found,
1482
      otherwise the size
1483

1484
  """
1485
  result = []
1486
  for cf in disks:
1487
    try:
1488
      rbd = _RecursiveFindBD(cf)
1489
    except errors.BlockDeviceError, err:
1490
      result.append(None)
1491
      continue
1492
    if rbd is None:
1493
      result.append(None)
1494
    else:
1495
      result.append(rbd.GetActualSize())
1496
  return result
1497

    
1498

    
1499
def BlockdevExport(disk, dest_node, dest_path, cluster_name):
1500
  """Export a block device to a remote node.
1501

1502
  @type disk: L{objects.Disk}
1503
  @param disk: the description of the disk to export
1504
  @type dest_node: str
1505
  @param dest_node: the destination node to export to
1506
  @type dest_path: str
1507
  @param dest_path: the destination path on the target node
1508
  @type cluster_name: str
1509
  @param cluster_name: the cluster name, needed for SSH hostalias
1510
  @rtype: None
1511

1512
  """
1513
  real_disk = _RecursiveFindBD(disk)
1514
  if real_disk is None:
1515
    _Fail("Block device '%s' is not set up", disk)
1516

    
1517
  real_disk.Open()
1518

    
1519
  # the block size on the read dd is 1MiB to match our units
1520
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; "
1521
                               "dd if=%s bs=1048576 count=%s",
1522
                               real_disk.dev_path, str(disk.size))
1523

    
1524
  # we set here a smaller block size as, due to ssh buffering, more
1525
  # than 64-128k will mostly ignored; we use nocreat to fail if the
1526
  # device is not already there or we pass a wrong path; we use
1527
  # notrunc to no attempt truncate on an LV device; we use oflag=dsync
1528
  # to not buffer too much memory; this means that at best, we flush
1529
  # every 64k, which will not be very fast
1530
  destcmd = utils.BuildShellCmd("dd of=%s conv=nocreat,notrunc bs=65536"
1531
                                " oflag=dsync", dest_path)
1532

    
1533
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1534
                                                   constants.GANETI_RUNAS,
1535
                                                   destcmd)
1536

    
1537
  # all commands have been checked, so we're safe to combine them
1538
  command = '|'.join([expcmd, utils.ShellQuoteArgs(remotecmd)])
1539

    
1540
  result = utils.RunCmd(["bash", "-c", command])
1541

    
1542
  if result.failed:
1543
    _Fail("Disk copy command '%s' returned error: %s"
1544
          " output: %s", command, result.fail_reason, result.output)
1545

    
1546

    
1547
def UploadFile(file_name, data, mode, uid, gid, atime, mtime):
1548
  """Write a file to the filesystem.
1549

1550
  This allows the master to overwrite(!) a file. It will only perform
1551
  the operation if the file belongs to a list of configuration files.
1552

1553
  @type file_name: str
1554
  @param file_name: the target file name
1555
  @type data: str
1556
  @param data: the new contents of the file
1557
  @type mode: int
1558
  @param mode: the mode to give the file (can be None)
1559
  @type uid: int
1560
  @param uid: the owner of the file (can be -1 for default)
1561
  @type gid: int
1562
  @param gid: the group of the file (can be -1 for default)
1563
  @type atime: float
1564
  @param atime: the atime to set on the file (can be None)
1565
  @type mtime: float
1566
  @param mtime: the mtime to set on the file (can be None)
1567
  @rtype: None
1568

1569
  """
1570
  if not os.path.isabs(file_name):
1571
    _Fail("Filename passed to UploadFile is not absolute: '%s'", file_name)
1572

    
1573
  if file_name not in _ALLOWED_UPLOAD_FILES:
1574
    _Fail("Filename passed to UploadFile not in allowed upload targets: '%s'",
1575
          file_name)
1576

    
1577
  raw_data = _Decompress(data)
1578

    
1579
  utils.WriteFile(file_name, data=raw_data, mode=mode, uid=uid, gid=gid,
1580
                  atime=atime, mtime=mtime)
1581

    
1582

    
1583
def WriteSsconfFiles(values):
1584
  """Update all ssconf files.
1585

1586
  Wrapper around the SimpleStore.WriteFiles.
1587

1588
  """
1589
  ssconf.SimpleStore().WriteFiles(values)
1590

    
1591

    
1592
def _ErrnoOrStr(err):
1593
  """Format an EnvironmentError exception.
1594

1595
  If the L{err} argument has an errno attribute, it will be looked up
1596
  and converted into a textual C{E...} description. Otherwise the
1597
  string representation of the error will be returned.
1598

1599
  @type err: L{EnvironmentError}
1600
  @param err: the exception to format
1601

1602
  """
1603
  if hasattr(err, 'errno'):
1604
    detail = errno.errorcode[err.errno]
1605
  else:
1606
    detail = str(err)
1607
  return detail
1608

    
1609

    
1610
def _OSOndiskAPIVersion(name, os_dir):
1611
  """Compute and return the API version of a given OS.
1612

1613
  This function will try to read the API version of the OS given by
1614
  the 'name' parameter and residing in the 'os_dir' directory.
1615

1616
  @type name: str
1617
  @param name: the OS name we should look for
1618
  @type os_dir: str
1619
  @param os_dir: the directory inwhich we should look for the OS
1620
  @rtype: tuple
1621
  @return: tuple (status, data) with status denoting the validity and
1622
      data holding either the vaid versions or an error message
1623

1624
  """
1625
  api_file = os.path.sep.join([os_dir, constants.OS_API_FILE])
1626

    
1627
  try:
1628
    st = os.stat(api_file)
1629
  except EnvironmentError, err:
1630
    return False, ("Required file '%s' not found under path %s: %s" %
1631
                   (constants.OS_API_FILE, os_dir, _ErrnoOrStr(err)))
1632

    
1633
  if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1634
    return False, ("File '%s' in %s is not a regular file" %
1635
                   (constants.OS_API_FILE, os_dir))
1636

    
1637
  try:
1638
    api_versions = utils.ReadFile(api_file).splitlines()
1639
  except EnvironmentError, err:
1640
    return False, ("Error while reading the API version file at %s: %s" %
1641
                   (api_file, _ErrnoOrStr(err)))
1642

    
1643
  try:
1644
    api_versions = [int(version.strip()) for version in api_versions]
1645
  except (TypeError, ValueError), err:
1646
    return False, ("API version(s) can't be converted to integer: %s" %
1647
                   str(err))
1648

    
1649
  return True, api_versions
1650

    
1651

    
1652
def DiagnoseOS(top_dirs=None):
1653
  """Compute the validity for all OSes.
1654

1655
  @type top_dirs: list
1656
  @param top_dirs: the list of directories in which to
1657
      search (if not given defaults to
1658
      L{constants.OS_SEARCH_PATH})
1659
  @rtype: list of L{objects.OS}
1660
  @return: a list of tuples (name, path, status, diagnose, variants)
1661
      for all (potential) OSes under all search paths, where:
1662
          - name is the (potential) OS name
1663
          - path is the full path to the OS
1664
          - status True/False is the validity of the OS
1665
          - diagnose is the error message for an invalid OS, otherwise empty
1666
          - variants is a list of supported OS variants, if any
1667

1668
  """
1669
  if top_dirs is None:
1670
    top_dirs = constants.OS_SEARCH_PATH
1671

    
1672
  result = []
1673
  for dir_name in top_dirs:
1674
    if os.path.isdir(dir_name):
1675
      try:
1676
        f_names = utils.ListVisibleFiles(dir_name)
1677
      except EnvironmentError, err:
1678
        logging.exception("Can't list the OS directory %s: %s", dir_name, err)
1679
        break
1680
      for name in f_names:
1681
        os_path = os.path.sep.join([dir_name, name])
1682
        status, os_inst = _TryOSFromDisk(name, base_dir=dir_name)
1683
        if status:
1684
          diagnose = ""
1685
          variants = os_inst.supported_variants
1686
        else:
1687
          diagnose = os_inst
1688
          variants = []
1689
        result.append((name, os_path, status, diagnose, variants))
1690

    
1691
  return result
1692

    
1693

    
1694
def _TryOSFromDisk(name, base_dir=None):
1695
  """Create an OS instance from disk.
1696

1697
  This function will return an OS instance if the given name is a
1698
  valid OS name.
1699

1700
  @type base_dir: string
1701
  @keyword base_dir: Base directory containing OS installations.
1702
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1703
  @rtype: tuple
1704
  @return: success and either the OS instance if we find a valid one,
1705
      or error message
1706

1707
  """
1708
  if base_dir is None:
1709
    os_dir = utils.FindFile(name, constants.OS_SEARCH_PATH, os.path.isdir)
1710
    if os_dir is None:
1711
      return False, "Directory for OS %s not found in search path" % name
1712
  else:
1713
    os_dir = os.path.sep.join([base_dir, name])
1714

    
1715
  status, api_versions = _OSOndiskAPIVersion(name, os_dir)
1716
  if not status:
1717
    # push the error up
1718
    return status, api_versions
1719

    
1720
  if not constants.OS_API_VERSIONS.intersection(api_versions):
1721
    return False, ("API version mismatch for path '%s': found %s, want %s." %
1722
                   (os_dir, api_versions, constants.OS_API_VERSIONS))
1723

    
1724
  # OS Files dictionary, we will populate it with the absolute path names
1725
  os_files = dict.fromkeys(constants.OS_SCRIPTS)
1726

    
1727
  if max(api_versions) >= constants.OS_API_V15:
1728
    os_files[constants.OS_VARIANTS_FILE] = ''
1729

    
1730
  for name in os_files:
1731
    os_files[name] = os.path.sep.join([os_dir, name])
1732

    
1733
    try:
1734
      st = os.stat(os_files[name])
1735
    except EnvironmentError, err:
1736
      return False, ("File '%s' under path '%s' is missing (%s)" %
1737
                     (name, os_dir, _ErrnoOrStr(err)))
1738

    
1739
    if not stat.S_ISREG(stat.S_IFMT(st.st_mode)):
1740
      return False, ("File '%s' under path '%s' is not a regular file" %
1741
                     (name, os_dir))
1742

    
1743
    if name in constants.OS_SCRIPTS:
1744
      if stat.S_IMODE(st.st_mode) & stat.S_IXUSR != stat.S_IXUSR:
1745
        return False, ("File '%s' under path '%s' is not executable" %
1746
                       (name, os_dir))
1747

    
1748
  variants = None
1749
  if constants.OS_VARIANTS_FILE in os_files:
1750
    variants_file = os_files[constants.OS_VARIANTS_FILE]
1751
    try:
1752
      variants = utils.ReadFile(variants_file).splitlines()
1753
    except EnvironmentError, err:
1754
      return False, ("Error while reading the OS variants file at %s: %s" %
1755
                     (variants_file, _ErrnoOrStr(err)))
1756
    if not variants:
1757
      return False, ("No supported os variant found")
1758

    
1759
  os_obj = objects.OS(name=name, path=os_dir,
1760
                      create_script=os_files[constants.OS_SCRIPT_CREATE],
1761
                      export_script=os_files[constants.OS_SCRIPT_EXPORT],
1762
                      import_script=os_files[constants.OS_SCRIPT_IMPORT],
1763
                      rename_script=os_files[constants.OS_SCRIPT_RENAME],
1764
                      supported_variants=variants,
1765
                      api_versions=api_versions)
1766
  return True, os_obj
1767

    
1768

    
1769
def OSFromDisk(name, base_dir=None):
1770
  """Create an OS instance from disk.
1771

1772
  This function will return an OS instance if the given name is a
1773
  valid OS name. Otherwise, it will raise an appropriate
1774
  L{RPCFail} exception, detailing why this is not a valid OS.
1775

1776
  This is just a wrapper over L{_TryOSFromDisk}, which doesn't raise
1777
  an exception but returns true/false status data.
1778

1779
  @type base_dir: string
1780
  @keyword base_dir: Base directory containing OS installations.
1781
                     Defaults to a search in all the OS_SEARCH_PATH dirs.
1782
  @rtype: L{objects.OS}
1783
  @return: the OS instance if we find a valid one
1784
  @raise RPCFail: if we don't find a valid OS
1785

1786
  """
1787
  name_only = name.split("+", 1)[0]
1788
  status, payload = _TryOSFromDisk(name_only, base_dir)
1789

    
1790
  if not status:
1791
    _Fail(payload)
1792

    
1793
  return payload
1794

    
1795

    
1796
def OSEnvironment(instance, os, debug=0):
1797
  """Calculate the environment for an os script.
1798

1799
  @type instance: L{objects.Instance}
1800
  @param instance: target instance for the os script run
1801
  @type os: L{objects.OS}
1802
  @param os: operating system for which the environment is being built
1803
  @type debug: integer
1804
  @param debug: debug level (0 or 1, for OS Api 10)
1805
  @rtype: dict
1806
  @return: dict of environment variables
1807
  @raise errors.BlockDeviceError: if the block device
1808
      cannot be found
1809

1810
  """
1811
  result = {}
1812
  api_version = max(constants.OS_API_VERSIONS.intersection(os.api_versions))
1813
  result['OS_API_VERSION'] = '%d' % api_version
1814
  result['INSTANCE_NAME'] = instance.name
1815
  result['INSTANCE_OS'] = instance.os
1816
  result['HYPERVISOR'] = instance.hypervisor
1817
  result['DISK_COUNT'] = '%d' % len(instance.disks)
1818
  result['NIC_COUNT'] = '%d' % len(instance.nics)
1819
  result['DEBUG_LEVEL'] = '%d' % debug
1820
  if api_version >= constants.OS_API_V15:
1821
    try:
1822
      variant = instance.os.split('+', 1)[1]
1823
    except IndexError:
1824
      variant = os.supported_variants[0]
1825
    result['OS_VARIANT'] = variant
1826
  for idx, disk in enumerate(instance.disks):
1827
    real_disk = _RecursiveFindBD(disk)
1828
    if real_disk is None:
1829
      raise errors.BlockDeviceError("Block device '%s' is not set up" %
1830
                                    str(disk))
1831
    real_disk.Open()
1832
    result['DISK_%d_PATH' % idx] = real_disk.dev_path
1833
    result['DISK_%d_ACCESS' % idx] = disk.mode
1834
    if constants.HV_DISK_TYPE in instance.hvparams:
1835
      result['DISK_%d_FRONTEND_TYPE' % idx] = \
1836
        instance.hvparams[constants.HV_DISK_TYPE]
1837
    if disk.dev_type in constants.LDS_BLOCK:
1838
      result['DISK_%d_BACKEND_TYPE' % idx] = 'block'
1839
    elif disk.dev_type == constants.LD_FILE:
1840
      result['DISK_%d_BACKEND_TYPE' % idx] = \
1841
        'file:%s' % disk.physical_id[0]
1842
  for idx, nic in enumerate(instance.nics):
1843
    result['NIC_%d_MAC' % idx] = nic.mac
1844
    if nic.ip:
1845
      result['NIC_%d_IP' % idx] = nic.ip
1846
    result['NIC_%d_MODE' % idx] = nic.nicparams[constants.NIC_MODE]
1847
    if nic.nicparams[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
1848
      result['NIC_%d_BRIDGE' % idx] = nic.nicparams[constants.NIC_LINK]
1849
    if nic.nicparams[constants.NIC_LINK]:
1850
      result['NIC_%d_LINK' % idx] = nic.nicparams[constants.NIC_LINK]
1851
    if constants.HV_NIC_TYPE in instance.hvparams:
1852
      result['NIC_%d_FRONTEND_TYPE' % idx] = \
1853
        instance.hvparams[constants.HV_NIC_TYPE]
1854

    
1855
  for source, kind in [(instance.beparams, "BE"), (instance.hvparams, "HV")]:
1856
    for key, value in source.items():
1857
      result["INSTANCE_%s_%s" % (kind, key)] = str(value)
1858

    
1859
  return result
1860

    
1861
def BlockdevGrow(disk, amount):
1862
  """Grow a stack of block devices.
1863

1864
  This function is called recursively, with the childrens being the
1865
  first ones to resize.
1866

1867
  @type disk: L{objects.Disk}
1868
  @param disk: the disk to be grown
1869
  @rtype: (status, result)
1870
  @return: a tuple with the status of the operation
1871
      (True/False), and the errors message if status
1872
      is False
1873

1874
  """
1875
  r_dev = _RecursiveFindBD(disk)
1876
  if r_dev is None:
1877
    _Fail("Cannot find block device %s", disk)
1878

    
1879
  try:
1880
    r_dev.Grow(amount)
1881
  except errors.BlockDeviceError, err:
1882
    _Fail("Failed to grow block device: %s", err, exc=True)
1883

    
1884

    
1885
def BlockdevSnapshot(disk):
1886
  """Create a snapshot copy of a block device.
1887

1888
  This function is called recursively, and the snapshot is actually created
1889
  just for the leaf lvm backend device.
1890

1891
  @type disk: L{objects.Disk}
1892
  @param disk: the disk to be snapshotted
1893
  @rtype: string
1894
  @return: snapshot disk path
1895

1896
  """
1897
  if disk.children:
1898
    if len(disk.children) == 1:
1899
      # only one child, let's recurse on it
1900
      return BlockdevSnapshot(disk.children[0])
1901
    else:
1902
      # more than one child, choose one that matches
1903
      for child in disk.children:
1904
        if child.size == disk.size:
1905
          # return implies breaking the loop
1906
          return BlockdevSnapshot(child)
1907
  elif disk.dev_type == constants.LD_LV:
1908
    r_dev = _RecursiveFindBD(disk)
1909
    if r_dev is not None:
1910
      # let's stay on the safe side and ask for the full size, for now
1911
      return r_dev.Snapshot(disk.size)
1912
    else:
1913
      _Fail("Cannot find block device %s", disk)
1914
  else:
1915
    _Fail("Cannot snapshot non-lvm block device '%s' of type '%s'",
1916
          disk.unique_id, disk.dev_type)
1917

    
1918

    
1919
def ExportSnapshot(disk, dest_node, instance, cluster_name, idx):
1920
  """Export a block device snapshot to a remote node.
1921

1922
  @type disk: L{objects.Disk}
1923
  @param disk: the description of the disk to export
1924
  @type dest_node: str
1925
  @param dest_node: the destination node to export to
1926
  @type instance: L{objects.Instance}
1927
  @param instance: the instance object to whom the disk belongs
1928
  @type cluster_name: str
1929
  @param cluster_name: the cluster name, needed for SSH hostalias
1930
  @type idx: int
1931
  @param idx: the index of the disk in the instance's disk list,
1932
      used to export to the OS scripts environment
1933
  @rtype: None
1934

1935
  """
1936
  inst_os = OSFromDisk(instance.os)
1937
  export_env = OSEnvironment(instance, inst_os)
1938

    
1939
  export_script = inst_os.export_script
1940

    
1941
  logfile = "%s/exp-%s-%s-%s.log" % (constants.LOG_OS_DIR, inst_os.name,
1942
                                     instance.name, int(time.time()))
1943
  if not os.path.exists(constants.LOG_OS_DIR):
1944
    os.mkdir(constants.LOG_OS_DIR, 0750)
1945
  real_disk = _RecursiveFindBD(disk)
1946
  if real_disk is None:
1947
    _Fail("Block device '%s' is not set up", disk)
1948

    
1949
  real_disk.Open()
1950

    
1951
  export_env['EXPORT_DEVICE'] = real_disk.dev_path
1952
  export_env['EXPORT_INDEX'] = str(idx)
1953

    
1954
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1955
  destfile = disk.physical_id[1]
1956

    
1957
  # the target command is built out of three individual commands,
1958
  # which are joined by pipes; we check each individual command for
1959
  # valid parameters
1960
  expcmd = utils.BuildShellCmd("set -e; set -o pipefail; cd %s; %s 2>%s",
1961
                               inst_os.path, export_script, logfile)
1962

    
1963
  comprcmd = "gzip"
1964

    
1965
  destcmd = utils.BuildShellCmd("mkdir -p %s && cat > %s/%s",
1966
                                destdir, destdir, destfile)
1967
  remotecmd = _GetSshRunner(cluster_name).BuildCmd(dest_node,
1968
                                                   constants.GANETI_RUNAS,
1969
                                                   destcmd)
1970

    
1971
  # all commands have been checked, so we're safe to combine them
1972
  command = '|'.join([expcmd, comprcmd, utils.ShellQuoteArgs(remotecmd)])
1973

    
1974
  result = utils.RunCmd(["bash", "-c", command], env=export_env)
1975

    
1976
  if result.failed:
1977
    _Fail("OS snapshot export command '%s' returned error: %s"
1978
          " output: %s", command, result.fail_reason, result.output)
1979

    
1980

    
1981
def FinalizeExport(instance, snap_disks):
1982
  """Write out the export configuration information.
1983

1984
  @type instance: L{objects.Instance}
1985
  @param instance: the instance which we export, used for
1986
      saving configuration
1987
  @type snap_disks: list of L{objects.Disk}
1988
  @param snap_disks: list of snapshot block devices, which
1989
      will be used to get the actual name of the dump file
1990

1991
  @rtype: None
1992

1993
  """
1994
  destdir = os.path.join(constants.EXPORT_DIR, instance.name + ".new")
1995
  finaldestdir = os.path.join(constants.EXPORT_DIR, instance.name)
1996

    
1997
  config = objects.SerializableConfigParser()
1998

    
1999
  config.add_section(constants.INISECT_EXP)
2000
  config.set(constants.INISECT_EXP, 'version', '0')
2001
  config.set(constants.INISECT_EXP, 'timestamp', '%d' % int(time.time()))
2002
  config.set(constants.INISECT_EXP, 'source', instance.primary_node)
2003
  config.set(constants.INISECT_EXP, 'os', instance.os)
2004
  config.set(constants.INISECT_EXP, 'compression', 'gzip')
2005

    
2006
  config.add_section(constants.INISECT_INS)
2007
  config.set(constants.INISECT_INS, 'name', instance.name)
2008
  config.set(constants.INISECT_INS, 'memory', '%d' %
2009
             instance.beparams[constants.BE_MEMORY])
2010
  config.set(constants.INISECT_INS, 'vcpus', '%d' %
2011
             instance.beparams[constants.BE_VCPUS])
2012
  config.set(constants.INISECT_INS, 'disk_template', instance.disk_template)
2013

    
2014
  nic_total = 0
2015
  for nic_count, nic in enumerate(instance.nics):
2016
    nic_total += 1
2017
    config.set(constants.INISECT_INS, 'nic%d_mac' %
2018
               nic_count, '%s' % nic.mac)
2019
    config.set(constants.INISECT_INS, 'nic%d_ip' % nic_count, '%s' % nic.ip)
2020
    config.set(constants.INISECT_INS, 'nic%d_bridge' % nic_count,
2021
               '%s' % nic.bridge)
2022
  # TODO: redundant: on load can read nics until it doesn't exist
2023
  config.set(constants.INISECT_INS, 'nic_count' , '%d' % nic_total)
2024

    
2025
  disk_total = 0
2026
  for disk_count, disk in enumerate(snap_disks):
2027
    if disk:
2028
      disk_total += 1
2029
      config.set(constants.INISECT_INS, 'disk%d_ivname' % disk_count,
2030
                 ('%s' % disk.iv_name))
2031
      config.set(constants.INISECT_INS, 'disk%d_dump' % disk_count,
2032
                 ('%s' % disk.physical_id[1]))
2033
      config.set(constants.INISECT_INS, 'disk%d_size' % disk_count,
2034
                 ('%d' % disk.size))
2035

    
2036
  config.set(constants.INISECT_INS, 'disk_count' , '%d' % disk_total)
2037

    
2038
  utils.WriteFile(os.path.join(destdir, constants.EXPORT_CONF_FILE),
2039
                  data=config.Dumps())
2040
  shutil.rmtree(finaldestdir, True)
2041
  shutil.move(destdir, finaldestdir)
2042

    
2043

    
2044
def ExportInfo(dest):
2045
  """Get export configuration information.
2046

2047
  @type dest: str
2048
  @param dest: directory containing the export
2049

2050
  @rtype: L{objects.SerializableConfigParser}
2051
  @return: a serializable config file containing the
2052
      export info
2053

2054
  """
2055
  cff = os.path.join(dest, constants.EXPORT_CONF_FILE)
2056

    
2057
  config = objects.SerializableConfigParser()
2058
  config.read(cff)
2059

    
2060
  if (not config.has_section(constants.INISECT_EXP) or
2061
      not config.has_section(constants.INISECT_INS)):
2062
    _Fail("Export info file doesn't have the required fields")
2063

    
2064
  return config.Dumps()
2065

    
2066

    
2067
def ImportOSIntoInstance(instance, src_node, src_images, cluster_name):
2068
  """Import an os image into an instance.
2069

2070
  @type instance: L{objects.Instance}
2071
  @param instance: instance to import the disks into
2072
  @type src_node: string
2073
  @param src_node: source node for the disk images
2074
  @type src_images: list of string
2075
  @param src_images: absolute paths of the disk images
2076
  @rtype: list of boolean
2077
  @return: each boolean represent the success of importing the n-th disk
2078

2079
  """
2080
  inst_os = OSFromDisk(instance.os)
2081
  import_env = OSEnvironment(instance, inst_os)
2082
  import_script = inst_os.import_script
2083

    
2084
  logfile = "%s/import-%s-%s-%s.log" % (constants.LOG_OS_DIR, instance.os,
2085
                                        instance.name, int(time.time()))
2086
  if not os.path.exists(constants.LOG_OS_DIR):
2087
    os.mkdir(constants.LOG_OS_DIR, 0750)
2088

    
2089
  comprcmd = "gunzip"
2090
  impcmd = utils.BuildShellCmd("(cd %s; %s >%s 2>&1)", inst_os.path,
2091
                               import_script, logfile)
2092

    
2093
  final_result = []
2094
  for idx, image in enumerate(src_images):
2095
    if image:
2096
      destcmd = utils.BuildShellCmd('cat %s', image)
2097
      remotecmd = _GetSshRunner(cluster_name).BuildCmd(src_node,
2098
                                                       constants.GANETI_RUNAS,
2099
                                                       destcmd)
2100
      command = '|'.join([utils.ShellQuoteArgs(remotecmd), comprcmd, impcmd])
2101
      import_env['IMPORT_DEVICE'] = import_env['DISK_%d_PATH' % idx]
2102
      import_env['IMPORT_INDEX'] = str(idx)
2103
      result = utils.RunCmd(command, env=import_env)
2104
      if result.failed:
2105
        logging.error("Disk import command '%s' returned error: %s"
2106
                      " output: %s", command, result.fail_reason,
2107
                      result.output)
2108
        final_result.append("error importing disk %d: %s, %s" %
2109
                            (idx, result.fail_reason, result.output[-100]))
2110

    
2111
  if final_result:
2112
    _Fail("; ".join(final_result), log=False)
2113

    
2114

    
2115
def ListExports():
2116
  """Return a list of exports currently available on this machine.
2117

2118
  @rtype: list
2119
  @return: list of the exports
2120

2121
  """
2122
  if os.path.isdir(constants.EXPORT_DIR):
2123
    return utils.ListVisibleFiles(constants.EXPORT_DIR)
2124
  else:
2125
    _Fail("No exports directory")
2126

    
2127

    
2128
def RemoveExport(export):
2129
  """Remove an existing export from the node.
2130

2131
  @type export: str
2132
  @param export: the name of the export to remove
2133
  @rtype: None
2134

2135
  """
2136
  target = os.path.join(constants.EXPORT_DIR, export)
2137

    
2138
  try:
2139
    shutil.rmtree(target)
2140
  except EnvironmentError, err:
2141
    _Fail("Error while removing the export: %s", err, exc=True)
2142

    
2143

    
2144
def BlockdevRename(devlist):
2145
  """Rename a list of block devices.
2146

2147
  @type devlist: list of tuples
2148
  @param devlist: list of tuples of the form  (disk,
2149
      new_logical_id, new_physical_id); disk is an
2150
      L{objects.Disk} object describing the current disk,
2151
      and new logical_id/physical_id is the name we
2152
      rename it to
2153
  @rtype: boolean
2154
  @return: True if all renames succeeded, False otherwise
2155

2156
  """
2157
  msgs = []
2158
  result = True
2159
  for disk, unique_id in devlist:
2160
    dev = _RecursiveFindBD(disk)
2161
    if dev is None:
2162
      msgs.append("Can't find device %s in rename" % str(disk))
2163
      result = False
2164
      continue
2165
    try:
2166
      old_rpath = dev.dev_path
2167
      dev.Rename(unique_id)
2168
      new_rpath = dev.dev_path
2169
      if old_rpath != new_rpath:
2170
        DevCacheManager.RemoveCache(old_rpath)
2171
        # FIXME: we should add the new cache information here, like:
2172
        # DevCacheManager.UpdateCache(new_rpath, owner, ...)
2173
        # but we don't have the owner here - maybe parse from existing
2174
        # cache? for now, we only lose lvm data when we rename, which
2175
        # is less critical than DRBD or MD
2176
    except errors.BlockDeviceError, err:
2177
      msgs.append("Can't rename device '%s' to '%s': %s" %
2178
                  (dev, unique_id, err))
2179
      logging.exception("Can't rename device '%s' to '%s'", dev, unique_id)
2180
      result = False
2181
  if not result:
2182
    _Fail("; ".join(msgs))
2183

    
2184

    
2185
def _TransformFileStorageDir(file_storage_dir):
2186
  """Checks whether given file_storage_dir is valid.
2187

2188
  Checks wheter the given file_storage_dir is within the cluster-wide
2189
  default file_storage_dir stored in SimpleStore. Only paths under that
2190
  directory are allowed.
2191

2192
  @type file_storage_dir: str
2193
  @param file_storage_dir: the path to check
2194

2195
  @return: the normalized path if valid, None otherwise
2196

2197
  """
2198
  cfg = _GetConfig()
2199
  file_storage_dir = os.path.normpath(file_storage_dir)
2200
  base_file_storage_dir = cfg.GetFileStorageDir()
2201
  if (not os.path.commonprefix([file_storage_dir, base_file_storage_dir]) ==
2202
      base_file_storage_dir):
2203
    _Fail("File storage directory '%s' is not under base file"
2204
          " storage directory '%s'", file_storage_dir, base_file_storage_dir)
2205
  return file_storage_dir
2206

    
2207

    
2208
def CreateFileStorageDir(file_storage_dir):
2209
  """Create file storage directory.
2210

2211
  @type file_storage_dir: str
2212
  @param file_storage_dir: directory to create
2213

2214
  @rtype: tuple
2215
  @return: tuple with first element a boolean indicating wheter dir
2216
      creation was successful or not
2217

2218
  """
2219
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2220
  if os.path.exists(file_storage_dir):
2221
    if not os.path.isdir(file_storage_dir):
2222
      _Fail("Specified storage dir '%s' is not a directory",
2223
            file_storage_dir)
2224
  else:
2225
    try:
2226
      os.makedirs(file_storage_dir, 0750)
2227
    except OSError, err:
2228
      _Fail("Cannot create file storage directory '%s': %s",
2229
            file_storage_dir, err, exc=True)
2230

    
2231

    
2232
def RemoveFileStorageDir(file_storage_dir):
2233
  """Remove file storage directory.
2234

2235
  Remove it only if it's empty. If not log an error and return.
2236

2237
  @type file_storage_dir: str
2238
  @param file_storage_dir: the directory we should cleanup
2239
  @rtype: tuple (success,)
2240
  @return: tuple of one element, C{success}, denoting
2241
      whether the operation was successful
2242

2243
  """
2244
  file_storage_dir = _TransformFileStorageDir(file_storage_dir)
2245
  if os.path.exists(file_storage_dir):
2246
    if not os.path.isdir(file_storage_dir):
2247
      _Fail("Specified Storage directory '%s' is not a directory",
2248
            file_storage_dir)
2249
    # deletes dir only if empty, otherwise we want to fail the rpc call
2250
    try:
2251
      os.rmdir(file_storage_dir)
2252
    except OSError, err:
2253
      _Fail("Cannot remove file storage directory '%s': %s",
2254
            file_storage_dir, err)
2255

    
2256

    
2257
def RenameFileStorageDir(old_file_storage_dir, new_file_storage_dir):
2258
  """Rename the file storage directory.
2259

2260
  @type old_file_storage_dir: str
2261
  @param old_file_storage_dir: the current path
2262
  @type new_file_storage_dir: str
2263
  @param new_file_storage_dir: the name we should rename to
2264
  @rtype: tuple (success,)
2265
  @return: tuple of one element, C{success}, denoting
2266
      whether the operation was successful
2267

2268
  """
2269
  old_file_storage_dir = _TransformFileStorageDir(old_file_storage_dir)
2270
  new_file_storage_dir = _TransformFileStorageDir(new_file_storage_dir)
2271
  if not os.path.exists(new_file_storage_dir):
2272
    if os.path.isdir(old_file_storage_dir):
2273
      try:
2274
        os.rename(old_file_storage_dir, new_file_storage_dir)
2275
      except OSError, err:
2276
        _Fail("Cannot rename '%s' to '%s': %s",
2277
              old_file_storage_dir, new_file_storage_dir, err)
2278
    else:
2279
      _Fail("Specified storage dir '%s' is not a directory",
2280
            old_file_storage_dir)
2281
  else:
2282
    if os.path.exists(old_file_storage_dir):
2283
      _Fail("Cannot rename '%s' to '%s': both locations exist",
2284
            old_file_storage_dir, new_file_storage_dir)
2285

    
2286

    
2287
def _EnsureJobQueueFile(file_name):
2288
  """Checks whether the given filename is in the queue directory.
2289

2290
  @type file_name: str
2291
  @param file_name: the file name we should check
2292
  @rtype: None
2293
  @raises RPCFail: if the file is not valid
2294

2295
  """
2296
  queue_dir = os.path.normpath(constants.QUEUE_DIR)
2297
  result = (os.path.commonprefix([queue_dir, file_name]) == queue_dir)
2298

    
2299
  if not result:
2300
    _Fail("Passed job queue file '%s' does not belong to"
2301
          " the queue directory '%s'", file_name, queue_dir)
2302

    
2303

    
2304
def JobQueueUpdate(file_name, content):
2305
  """Updates a file in the queue directory.
2306

2307
  This is just a wrapper over L{utils.WriteFile}, with proper
2308
  checking.
2309

2310
  @type file_name: str
2311
  @param file_name: the job file name
2312
  @type content: str
2313
  @param content: the new job contents
2314
  @rtype: boolean
2315
  @return: the success of the operation
2316

2317
  """
2318
  _EnsureJobQueueFile(file_name)
2319

    
2320
  # Write and replace the file atomically
2321
  utils.WriteFile(file_name, data=_Decompress(content))
2322

    
2323

    
2324
def JobQueueRename(old, new):
2325
  """Renames a job queue file.
2326

2327
  This is just a wrapper over os.rename with proper checking.
2328

2329
  @type old: str
2330
  @param old: the old (actual) file name
2331
  @type new: str
2332
  @param new: the desired file name
2333
  @rtype: tuple
2334
  @return: the success of the operation and payload
2335

2336
  """
2337
  _EnsureJobQueueFile(old)
2338
  _EnsureJobQueueFile(new)
2339

    
2340
  utils.RenameFile(old, new, mkdir=True)
2341

    
2342

    
2343
def JobQueueSetDrainFlag(drain_flag):
2344
  """Set the drain flag for the queue.
2345

2346
  This will set or unset the queue drain flag.
2347

2348
  @type drain_flag: boolean
2349
  @param drain_flag: if True, will set the drain flag, otherwise reset it.
2350
  @rtype: truple
2351
  @return: always True, None
2352
  @warning: the function always returns True
2353

2354
  """
2355
  if drain_flag:
2356
    utils.WriteFile(constants.JOB_QUEUE_DRAIN_FILE, data="", close=True)
2357
  else:
2358
    utils.RemoveFile(constants.JOB_QUEUE_DRAIN_FILE)
2359

    
2360

    
2361
def BlockdevClose(instance_name, disks):
2362
  """Closes the given block devices.
2363

2364
  This means they will be switched to secondary mode (in case of
2365
  DRBD).
2366

2367
  @param instance_name: if the argument is not empty, the symlinks
2368
      of this instance will be removed
2369
  @type disks: list of L{objects.Disk}
2370
  @param disks: the list of disks to be closed
2371
  @rtype: tuple (success, message)
2372
  @return: a tuple of success and message, where success
2373
      indicates the succes of the operation, and message
2374
      which will contain the error details in case we
2375
      failed
2376

2377
  """
2378
  bdevs = []
2379
  for cf in disks:
2380
    rd = _RecursiveFindBD(cf)
2381
    if rd is None:
2382
      _Fail("Can't find device %s", cf)
2383
    bdevs.append(rd)
2384

    
2385
  msg = []
2386
  for rd in bdevs:
2387
    try:
2388
      rd.Close()
2389
    except errors.BlockDeviceError, err:
2390
      msg.append(str(err))
2391
  if msg:
2392
    _Fail("Can't make devices secondary: %s", ",".join(msg))
2393
  else:
2394
    if instance_name:
2395
      _RemoveBlockDevLinks(instance_name, disks)
2396

    
2397

    
2398
def ValidateHVParams(hvname, hvparams):
2399
  """Validates the given hypervisor parameters.
2400

2401
  @type hvname: string
2402
  @param hvname: the hypervisor name
2403
  @type hvparams: dict
2404
  @param hvparams: the hypervisor parameters to be validated
2405
  @rtype: None
2406

2407
  """
2408
  try:
2409
    hv_type = hypervisor.GetHypervisor(hvname)
2410
    hv_type.ValidateParameters(hvparams)
2411
  except errors.HypervisorError, err:
2412
    _Fail(str(err), log=False)
2413

    
2414

    
2415
def DemoteFromMC():
2416
  """Demotes the current node from master candidate role.
2417

2418
  """
2419
  # try to ensure we're not the master by mistake
2420
  master, myself = ssconf.GetMasterAndMyself()
2421
  if master == myself:
2422
    _Fail("ssconf status shows I'm the master node, will not demote")
2423
  pid_file = utils.DaemonPidFileName(constants.MASTERD)
2424
  if utils.IsProcessAlive(utils.ReadPidFile(pid_file)):
2425
    _Fail("The master daemon is running, will not demote")
2426
  try:
2427
    if os.path.isfile(constants.CLUSTER_CONF_FILE):
2428
      utils.CreateBackup(constants.CLUSTER_CONF_FILE)
2429
  except EnvironmentError, err:
2430
    if err.errno != errno.ENOENT:
2431
      _Fail("Error while backing up cluster file: %s", err, exc=True)
2432
  utils.RemoveFile(constants.CLUSTER_CONF_FILE)
2433

    
2434

    
2435
def _FindDisks(nodes_ip, disks):
2436
  """Sets the physical ID on disks and returns the block devices.
2437

2438
  """
2439
  # set the correct physical ID
2440
  my_name = utils.HostInfo().name
2441
  for cf in disks:
2442
    cf.SetPhysicalID(my_name, nodes_ip)
2443

    
2444
  bdevs = []
2445

    
2446
  for cf in disks:
2447
    rd = _RecursiveFindBD(cf)
2448
    if rd is None:
2449
      _Fail("Can't find device %s", cf)
2450
    bdevs.append(rd)
2451
  return bdevs
2452

    
2453

    
2454
def DrbdDisconnectNet(nodes_ip, disks):
2455
  """Disconnects the network on a list of drbd devices.
2456

2457
  """
2458
  bdevs = _FindDisks(nodes_ip, disks)
2459

    
2460
  # disconnect disks
2461
  for rd in bdevs:
2462
    try:
2463
      rd.DisconnectNet()
2464
    except errors.BlockDeviceError, err:
2465
      _Fail("Can't change network configuration to standalone mode: %s",
2466
            err, exc=True)
2467

    
2468

    
2469
def DrbdAttachNet(nodes_ip, disks, instance_name, multimaster):
2470
  """Attaches the network on a list of drbd devices.
2471

2472
  """
2473
  bdevs = _FindDisks(nodes_ip, disks)
2474

    
2475
  if multimaster:
2476
    for idx, rd in enumerate(bdevs):
2477
      try:
2478
        _SymlinkBlockDev(instance_name, rd.dev_path, idx)
2479
      except EnvironmentError, err:
2480
        _Fail("Can't create symlink: %s", err)
2481
  # reconnect disks, switch to new master configuration and if
2482
  # needed primary mode
2483
  for rd in bdevs:
2484
    try:
2485
      rd.AttachNet(multimaster)
2486
    except errors.BlockDeviceError, err:
2487
      _Fail("Can't change network configuration: %s", err)
2488
  # wait until the disks are connected; we need to retry the re-attach
2489
  # if the device becomes standalone, as this might happen if the one
2490
  # node disconnects and reconnects in a different mode before the
2491
  # other node reconnects; in this case, one or both of the nodes will
2492
  # decide it has wrong configuration and switch to standalone
2493
  RECONNECT_TIMEOUT = 2 * 60
2494
  sleep_time = 0.100 # start with 100 miliseconds
2495
  timeout_limit = time.time() + RECONNECT_TIMEOUT
2496
  while time.time() < timeout_limit:
2497
    all_connected = True
2498
    for rd in bdevs:
2499
      stats = rd.GetProcStatus()
2500
      if not (stats.is_connected or stats.is_in_resync):
2501
        all_connected = False
2502
      if stats.is_standalone:
2503
        # peer had different config info and this node became
2504
        # standalone, even though this should not happen with the
2505
        # new staged way of changing disk configs
2506
        try:
2507
          rd.AttachNet(multimaster)
2508
        except errors.BlockDeviceError, err:
2509
          _Fail("Can't change network configuration: %s", err)
2510
    if all_connected:
2511
      break
2512
    time.sleep(sleep_time)
2513
    sleep_time = min(5, sleep_time * 1.5)
2514
  if not all_connected:
2515
    _Fail("Timeout in disk reconnecting")
2516
  if multimaster:
2517
    # change to primary mode
2518
    for rd in bdevs:
2519
      try:
2520
        rd.Open()
2521
      except errors.BlockDeviceError, err:
2522
        _Fail("Can't change to primary mode: %s", err)
2523

    
2524

    
2525
def DrbdWaitSync(nodes_ip, disks):
2526
  """Wait until DRBDs have synchronized.
2527

2528
  """
2529
  bdevs = _FindDisks(nodes_ip, disks)
2530

    
2531
  min_resync = 100
2532
  alldone = True
2533
  for rd in bdevs:
2534
    stats = rd.GetProcStatus()
2535
    if not (stats.is_connected or stats.is_in_resync):
2536
      _Fail("DRBD device %s is not in sync: stats=%s", rd, stats)
2537
    alldone = alldone and (not stats.is_in_resync)
2538
    if stats.sync_percent is not None:
2539
      min_resync = min(min_resync, stats.sync_percent)
2540

    
2541
  return (alldone, min_resync)
2542

    
2543

    
2544
def PowercycleNode(hypervisor_type):
2545
  """Hard-powercycle the node.
2546

2547
  Because we need to return first, and schedule the powercycle in the
2548
  background, we won't be able to report failures nicely.
2549

2550
  """
2551
  hyper = hypervisor.GetHypervisor(hypervisor_type)
2552
  try:
2553
    pid = os.fork()
2554
  except OSError:
2555
    # if we can't fork, we'll pretend that we're in the child process
2556
    pid = 0
2557
  if pid > 0:
2558
    return "Reboot scheduled in 5 seconds"
2559
  time.sleep(5)
2560
  hyper.PowercycleNode()
2561

    
2562

    
2563
class HooksRunner(object):
2564
  """Hook runner.
2565

2566
  This class is instantiated on the node side (ganeti-noded) and not
2567
  on the master side.
2568

2569
  """
2570
  RE_MASK = re.compile("^[a-zA-Z0-9_-]+$")
2571

    
2572
  def __init__(self, hooks_base_dir=None):
2573
    """Constructor for hooks runner.
2574

2575
    @type hooks_base_dir: str or None
2576
    @param hooks_base_dir: if not None, this overrides the
2577
        L{constants.HOOKS_BASE_DIR} (useful for unittests)
2578

2579
    """
2580
    if hooks_base_dir is None:
2581
      hooks_base_dir = constants.HOOKS_BASE_DIR
2582
    self._BASE_DIR = hooks_base_dir
2583

    
2584
  @staticmethod
2585
  def ExecHook(script, env):
2586
    """Exec one hook script.
2587

2588
    @type script: str
2589
    @param script: the full path to the script
2590
    @type env: dict
2591
    @param env: the environment with which to exec the script
2592
    @rtype: tuple (success, message)
2593
    @return: a tuple of success and message, where success
2594
        indicates the succes of the operation, and message
2595
        which will contain the error details in case we
2596
        failed
2597

2598
    """
2599
    # exec the process using subprocess and log the output
2600
    fdstdin = None
2601
    try:
2602
      fdstdin = open("/dev/null", "r")
2603
      child = subprocess.Popen([script], stdin=fdstdin, stdout=subprocess.PIPE,
2604
                               stderr=subprocess.STDOUT, close_fds=True,
2605
                               shell=False, cwd="/", env=env)
2606
      output = ""
2607
      try:
2608
        output = child.stdout.read(4096)
2609
        child.stdout.close()
2610
      except EnvironmentError, err:
2611
        output += "Hook script error: %s" % str(err)
2612

    
2613
      while True:
2614
        try:
2615
          result = child.wait()
2616
          break
2617
        except EnvironmentError, err:
2618
          if err.errno == errno.EINTR:
2619
            continue
2620
          raise
2621
    finally:
2622
      # try not to leak fds
2623
      for fd in (fdstdin, ):
2624
        if fd is not None:
2625
          try:
2626
            fd.close()
2627
          except EnvironmentError, err:
2628
            # just log the error
2629
            #logging.exception("Error while closing fd %s", fd)
2630
            pass
2631

    
2632
    return result == 0, utils.SafeEncode(output.strip())
2633

    
2634
  def RunHooks(self, hpath, phase, env):
2635
    """Run the scripts in the hooks directory.
2636

2637
    @type hpath: str
2638
    @param hpath: the path to the hooks directory which
2639
        holds the scripts
2640
    @type phase: str
2641
    @param phase: either L{constants.HOOKS_PHASE_PRE} or
2642
        L{constants.HOOKS_PHASE_POST}
2643
    @type env: dict
2644
    @param env: dictionary with the environment for the hook
2645
    @rtype: list
2646
    @return: list of 3-element tuples:
2647
      - script path
2648
      - script result, either L{constants.HKR_SUCCESS} or
2649
        L{constants.HKR_FAIL}
2650
      - output of the script
2651

2652
    @raise errors.ProgrammerError: for invalid input
2653
        parameters
2654

2655
    """
2656
    if phase == constants.HOOKS_PHASE_PRE:
2657
      suffix = "pre"
2658
    elif phase == constants.HOOKS_PHASE_POST:
2659
      suffix = "post"
2660
    else:
2661
      _Fail("Unknown hooks phase '%s'", phase)
2662

    
2663
    rr = []
2664

    
2665
    subdir = "%s-%s.d" % (hpath, suffix)
2666
    dir_name = "%s/%s" % (self._BASE_DIR, subdir)
2667
    try:
2668
      dir_contents = utils.ListVisibleFiles(dir_name)
2669
    except OSError:
2670
      # FIXME: must log output in case of failures
2671
      return rr
2672

    
2673
    # we use the standard python sort order,
2674
    # so 00name is the recommended naming scheme
2675
    dir_contents.sort()
2676
    for relname in dir_contents:
2677
      fname = os.path.join(dir_name, relname)
2678
      if not (os.path.isfile(fname) and os.access(fname, os.X_OK) and
2679
          self.RE_MASK.match(relname) is not None):
2680
        rrval = constants.HKR_SKIP
2681
        output = ""
2682
      else:
2683
        result, output = self.ExecHook(fname, env)
2684
        if not result:
2685
          rrval = constants.HKR_FAIL
2686
        else:
2687
          rrval = constants.HKR_SUCCESS
2688
      rr.append(("%s/%s" % (subdir, relname), rrval, output))
2689

    
2690
    return rr
2691

    
2692

    
2693
class IAllocatorRunner(object):
2694
  """IAllocator runner.
2695

2696
  This class is instantiated on the node side (ganeti-noded) and not on
2697
  the master side.
2698

2699
  """
2700
  def Run(self, name, idata):
2701
    """Run an iallocator script.
2702

2703
    @type name: str
2704
    @param name: the iallocator script name
2705
    @type idata: str
2706
    @param idata: the allocator input data
2707

2708
    @rtype: tuple
2709
    @return: two element tuple of:
2710
       - status
2711
       - either error message or stdout of allocator (for success)
2712

2713
    """
2714
    alloc_script = utils.FindFile(name, constants.IALLOCATOR_SEARCH_PATH,
2715
                                  os.path.isfile)
2716
    if alloc_script is None:
2717
      _Fail("iallocator module '%s' not found in the search path", name)
2718

    
2719
    fd, fin_name = tempfile.mkstemp(prefix="ganeti-iallocator.")
2720
    try:
2721
      os.write(fd, idata)
2722
      os.close(fd)
2723
      result = utils.RunCmd([alloc_script, fin_name])
2724
      if result.failed:
2725
        _Fail("iallocator module '%s' failed: %s, output '%s'",
2726
              name, result.fail_reason, result.output)
2727
    finally:
2728
      os.unlink(fin_name)
2729

    
2730
    return result.stdout
2731

    
2732

    
2733
class DevCacheManager(object):
2734
  """Simple class for managing a cache of block device information.
2735

2736
  """
2737
  _DEV_PREFIX = "/dev/"
2738
  _ROOT_DIR = constants.BDEV_CACHE_DIR
2739

    
2740
  @classmethod
2741
  def _ConvertPath(cls, dev_path):
2742
    """Converts a /dev/name path to the cache file name.
2743

2744
    This replaces slashes with underscores and strips the /dev
2745
    prefix. It then returns the full path to the cache file.
2746

2747
    @type dev_path: str
2748
    @param dev_path: the C{/dev/} path name
2749
    @rtype: str
2750
    @return: the converted path name
2751

2752
    """
2753
    if dev_path.startswith(cls._DEV_PREFIX):
2754
      dev_path = dev_path[len(cls._DEV_PREFIX):]
2755
    dev_path = dev_path.replace("/", "_")
2756
    fpath = "%s/bdev_%s" % (cls._ROOT_DIR, dev_path)
2757
    return fpath
2758

    
2759
  @classmethod
2760
  def UpdateCache(cls, dev_path, owner, on_primary, iv_name):
2761
    """Updates the cache information for a given device.
2762

2763
    @type dev_path: str
2764
    @param dev_path: the pathname of the device
2765
    @type owner: str
2766
    @param owner: the owner (instance name) of the device
2767
    @type on_primary: bool
2768
    @param on_primary: whether this is the primary
2769
        node nor not
2770
    @type iv_name: str
2771
    @param iv_name: the instance-visible name of the
2772
        device, as in objects.Disk.iv_name
2773

2774
    @rtype: None
2775

2776
    """
2777
    if dev_path is None:
2778
      logging.error("DevCacheManager.UpdateCache got a None dev_path")
2779
      return
2780
    fpath = cls._ConvertPath(dev_path)
2781
    if on_primary:
2782
      state = "primary"
2783
    else:
2784
      state = "secondary"
2785
    if iv_name is None:
2786
      iv_name = "not_visible"
2787
    fdata = "%s %s %s\n" % (str(owner), state, iv_name)
2788
    try:
2789
      utils.WriteFile(fpath, data=fdata)
2790
    except EnvironmentError, err:
2791
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)
2792

    
2793
  @classmethod
2794
  def RemoveCache(cls, dev_path):
2795
    """Remove data for a dev_path.
2796

2797
    This is just a wrapper over L{utils.RemoveFile} with a converted
2798
    path name and logging.
2799

2800
    @type dev_path: str
2801
    @param dev_path: the pathname of the device
2802

2803
    @rtype: None
2804

2805
    """
2806
    if dev_path is None:
2807
      logging.error("DevCacheManager.RemoveCache got a None dev_path")
2808
      return
2809
    fpath = cls._ConvertPath(dev_path)
2810
    try:
2811
      utils.RemoveFile(fpath)
2812
    except EnvironmentError, err:
2813
      logging.exception("Can't update bdev cache for %s: %s", dev_path, err)