Statistics
| Branch: | Tag: | Revision:

root / lib / rapi / client.py @ d9b67f70

History | View | Annotate | Download (28.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Ganeti RAPI client."""
23

    
24
import httplib
25
import urllib2
26
import logging
27
import simplejson
28
import socket
29
import urllib
30
import OpenSSL
31
import distutils.version
32

    
33

    
34
GANETI_RAPI_PORT = 5080
35

    
36
HTTP_DELETE = "DELETE"
37
HTTP_GET = "GET"
38
HTTP_PUT = "PUT"
39
HTTP_POST = "POST"
40
HTTP_OK = 200
41
HTTP_APP_JSON = "application/json"
42

    
43
REPLACE_DISK_PRI = "replace_on_primary"
44
REPLACE_DISK_SECONDARY = "replace_on_secondary"
45
REPLACE_DISK_CHG = "replace_new_secondary"
46
REPLACE_DISK_AUTO = "replace_auto"
47
VALID_REPLACEMENT_MODES = frozenset([
48
  REPLACE_DISK_PRI,
49
  REPLACE_DISK_SECONDARY,
50
  REPLACE_DISK_CHG,
51
  REPLACE_DISK_AUTO,
52
  ])
53
VALID_NODE_ROLES = frozenset([
54
  "drained", "master", "master-candidate", "offline", "regular",
55
  ])
56
VALID_STORAGE_TYPES = frozenset(["file", "lvm-pv", "lvm-vg"])
57

    
58

    
59
class Error(Exception):
60
  """Base error class for this module.
61

62
  """
63
  pass
64

    
65

    
66
class CertificateError(Error):
67
  """Raised when a problem is found with the SSL certificate.
68

69
  """
70
  pass
71

    
72

    
73
class GanetiApiError(Error):
74
  """Generic error raised from Ganeti API.
75

76
  """
77
  pass
78

    
79

    
80
class InvalidReplacementMode(Error):
81
  """Raised when an invalid disk replacement mode is attempted.
82

83
  """
84
  pass
85

    
86

    
87
class InvalidStorageType(Error):
88
  """Raised when an invalid storage type is used.
89

90
  """
91
  pass
92

    
93

    
94
class InvalidNodeRole(Error):
95
  """Raised when an invalid node role is used.
96

97
  """
98
  pass
99

    
100

    
101
def FormatX509Name(x509_name):
102
  """Formats an X509 name.
103

104
  @type x509_name: OpenSSL.crypto.X509Name
105

106
  """
107
  try:
108
    # Only supported in pyOpenSSL 0.7 and above
109
    get_components_fn = x509_name.get_components
110
  except AttributeError:
111
    return repr(x509_name)
112
  else:
113
    return "".join("/%s=%s" % (name, value)
114
                   for name, value in get_components_fn())
115

    
116

    
117
class CertAuthorityVerify:
118
  """Certificate verificator for SSL context.
119

120
  Configures SSL context to verify server's certificate.
121

122
  """
123
  _CAPATH_MINVERSION = "0.9"
124
  _DEFVFYPATHS_MINVERSION = "0.9"
125

    
126
  _PYOPENSSL_VERSION = OpenSSL.__version__
127
  _PARSED_PYOPENSSL_VERSION = distutils.version.LooseVersion(_PYOPENSSL_VERSION)
128

    
129
  _SUPPORT_CAPATH = (_PARSED_PYOPENSSL_VERSION >= _CAPATH_MINVERSION)
130
  _SUPPORT_DEFVFYPATHS = (_PARSED_PYOPENSSL_VERSION >= _DEFVFYPATHS_MINVERSION)
131

    
132
  def __init__(self, cafile=None, capath=None, use_default_verify_paths=False):
133
    """Initializes this class.
134

135
    @type cafile: string
136
    @param cafile: In which file we can find the certificates
137
    @type capath: string
138
    @param capath: In which directory we can find the certificates
139
    @type use_default_verify_paths: bool
140
    @param use_default_verify_paths: Whether the platform provided CA
141
                                     certificates are to be used for
142
                                     verification purposes
143

144
    """
145
    self._cafile = cafile
146
    self._capath = capath
147
    self._use_default_verify_paths = use_default_verify_paths
148

    
149
    if self._capath is not None and not self._SUPPORT_CAPATH:
150
      raise Error(("PyOpenSSL %s has no support for a CA directory,"
151
                   " version %s or above is required") %
152
                  (self._PYOPENSSL_VERSION, self._CAPATH_MINVERSION))
153

    
154
    if self._use_default_verify_paths and not self._SUPPORT_DEFVFYPATHS:
155
      raise Error(("PyOpenSSL %s has no support for using default verification"
156
                   " paths, version %s or above is required") %
157
                  (self._PYOPENSSL_VERSION, self._DEFVFYPATHS_MINVERSION))
158

    
159
  @staticmethod
160
  def _VerifySslCertCb(logger, _, cert, errnum, errdepth, ok):
161
    """Callback for SSL certificate verification.
162

163
    @param logger: Logging object
164

165
    """
166
    if ok:
167
      log_fn = logger.debug
168
    else:
169
      log_fn = logger.error
170

    
171
    log_fn("Verifying SSL certificate at depth %s, subject '%s', issuer '%s'",
172
           errdepth, FormatX509Name(cert.get_subject()),
173
           FormatX509Name(cert.get_issuer()))
174

    
175
    if not ok:
176
      try:
177
        # Only supported in pyOpenSSL 0.7 and above
178
        # pylint: disable-msg=E1101
179
        fn = OpenSSL.crypto.X509_verify_cert_error_string
180
      except AttributeError:
181
        errmsg = ""
182
      else:
183
        errmsg = ":%s" % fn(errnum)
184

    
185
      logger.error("verify error:num=%s%s", errnum, errmsg)
186

    
187
    return ok
188

    
189
  def __call__(self, ctx, logger):
190
    """Configures an SSL context to verify certificates.
191

192
    @type ctx: OpenSSL.SSL.Context
193
    @param ctx: SSL context
194

195
    """
196
    if self._use_default_verify_paths:
197
      ctx.set_default_verify_paths()
198

    
199
    if self._cafile or self._capath:
200
      if self._SUPPORT_CAPATH:
201
        ctx.load_verify_locations(self._cafile, self._capath)
202
      else:
203
        ctx.load_verify_locations(self._cafile)
204

    
205
    ctx.set_verify(OpenSSL.SSL.VERIFY_PEER,
206
                   lambda conn, cert, errnum, errdepth, ok: \
207
                     self._VerifySslCertCb(logger, conn, cert,
208
                                           errnum, errdepth, ok))
209

    
210

    
211
class _HTTPSConnectionOpenSSL(httplib.HTTPSConnection):
212
  """HTTPS Connection handler that verifies the SSL certificate.
213

214
  """
215
  def __init__(self, *args, **kwargs):
216
    """Initializes this class.
217

218
    """
219
    httplib.HTTPSConnection.__init__(self, *args, **kwargs)
220
    self._logger = None
221
    self._config_ssl_verification = None
222

    
223
  def Setup(self, logger, config_ssl_verification):
224
    """Sets the SSL verification config function.
225

226
    @param logger: Logging object
227
    @type config_ssl_verification: callable
228

229
    """
230
    assert self._logger is None
231
    assert self._config_ssl_verification is None
232

    
233
    self._logger = logger
234
    self._config_ssl_verification = config_ssl_verification
235

    
236
  def connect(self):
237
    """Connect to the server specified when the object was created.
238

239
    This ensures that SSL certificates are verified.
240

241
    """
242
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
243

    
244
    ctx = OpenSSL.SSL.Context(OpenSSL.SSL.TLSv1_METHOD)
245
    ctx.set_options(OpenSSL.SSL.OP_NO_SSLv2)
246

    
247
    if self._config_ssl_verification:
248
      self._config_ssl_verification(ctx, self._logger)
249

    
250
    ssl = OpenSSL.SSL.Connection(ctx, sock)
251
    ssl.connect((self.host, self.port))
252

    
253
    self.sock = httplib.FakeSocket(sock, ssl)
254

    
255

    
256
class _HTTPSHandler(urllib2.HTTPSHandler):
257
  def __init__(self, logger, config_ssl_verification):
258
    """Initializes this class.
259

260
    @param logger: Logging object
261
    @type config_ssl_verification: callable
262
    @param config_ssl_verification: Function to configure SSL context for
263
                                    certificate verification
264

265
    """
266
    urllib2.HTTPSHandler.__init__(self)
267
    self._logger = logger
268
    self._config_ssl_verification = config_ssl_verification
269

    
270
  def _CreateHttpsConnection(self, *args, **kwargs):
271
    """Wrapper around L{_HTTPSConnectionOpenSSL} to add SSL verification.
272

273
    This wrapper is necessary provide a compatible API to urllib2.
274

275
    """
276
    conn = _HTTPSConnectionOpenSSL(*args, **kwargs)
277
    conn.Setup(self._logger, self._config_ssl_verification)
278
    return conn
279

    
280
  def https_open(self, req):
281
    """Creates HTTPS connection.
282

283
    Called by urllib2.
284

285
    """
286
    return self.do_open(self._CreateHttpsConnection, req)
287

    
288

    
289
class _RapiRequest(urllib2.Request):
290
  def __init__(self, method, url, headers, data):
291
    """Initializes this class.
292

293
    """
294
    urllib2.Request.__init__(self, url, data=data, headers=headers)
295
    self._method = method
296

    
297
  def get_method(self):
298
    """Returns the HTTP request method.
299

300
    """
301
    return self._method
302

    
303

    
304
class GanetiRapiClient(object):
305
  """Ganeti RAPI client.
306

307
  """
308
  USER_AGENT = "Ganeti RAPI Client"
309
  _json_encoder = simplejson.JSONEncoder(sort_keys=True)
310

    
311
  def __init__(self, host, port=GANETI_RAPI_PORT,
312
               username=None, password=None,
313
               config_ssl_verification=None, ignore_proxy=False,
314
               logger=logging):
315
    """Constructor.
316

317
    @type host: string
318
    @param host: the ganeti cluster master to interact with
319
    @type port: int
320
    @param port: the port on which the RAPI is running (default is 5080)
321
    @type username: string
322
    @param username: the username to connect with
323
    @type password: string
324
    @param password: the password to connect with
325
    @type config_ssl_verification: callable
326
    @param config_ssl_verification: Function to configure SSL context for
327
                                    certificate verification
328
    @type ignore_proxy: bool
329
    @param ignore_proxy: Whether to ignore proxy settings
330
    @param logger: Logging object
331

332
    """
333
    self._host = host
334
    self._port = port
335
    self._logger = logger
336

    
337
    self._base_url = "https://%s:%s" % (host, port)
338

    
339
    handlers = [_HTTPSHandler(self._logger, config_ssl_verification)]
340

    
341
    if username is not None:
342
      pwmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
343
      pwmgr.add_password(None, self._base_url, username, password)
344
      handlers.append(urllib2.HTTPBasicAuthHandler(pwmgr))
345
    elif password:
346
      raise Error("Specified password without username")
347

    
348
    if ignore_proxy:
349
      handlers.append(urllib2.ProxyHandler({}))
350

    
351
    self._http = urllib2.build_opener(*handlers) # pylint: disable-msg=W0142
352

    
353
    self._headers = {
354
      "Accept": HTTP_APP_JSON,
355
      "Content-type": HTTP_APP_JSON,
356
      "User-Agent": self.USER_AGENT,
357
      }
358

    
359
  def _SendRequest(self, method, path, query, content):
360
    """Sends an HTTP request.
361

362
    This constructs a full URL, encodes and decodes HTTP bodies, and
363
    handles invalid responses in a pythonic way.
364

365
    @type method: string
366
    @param method: HTTP method to use
367
    @type path: string
368
    @param path: HTTP URL path
369
    @type query: list of two-tuples
370
    @param query: query arguments to pass to urllib.urlencode
371
    @type content: str or None
372
    @param content: HTTP body content
373

374
    @rtype: str
375
    @return: JSON-Decoded response
376

377
    @raises CertificateError: If an invalid SSL certificate is found
378
    @raises GanetiApiError: If an invalid response is returned
379

380
    """
381
    assert path.startswith("/")
382

    
383
    if content:
384
      encoded_content = self._json_encoder.encode(content)
385
    else:
386
      encoded_content = None
387

    
388
    # Build URL
389
    url = [self._base_url, path]
390
    if query:
391
      url.append("?")
392
      url.append(urllib.urlencode(query))
393

    
394
    req = _RapiRequest(method, "".join(url), self._headers, encoded_content)
395

    
396
    try:
397
      resp = self._http.open(req)
398
      encoded_response_content = resp.read()
399
    except (OpenSSL.SSL.Error, OpenSSL.crypto.Error), err:
400
      raise CertificateError("SSL issue: %s" % err)
401

    
402
    if encoded_response_content:
403
      response_content = simplejson.loads(encoded_response_content)
404
    else:
405
      response_content = None
406

    
407
    # TODO: Are there other status codes that are valid? (redirect?)
408
    if resp.code != HTTP_OK:
409
      if isinstance(response_content, dict):
410
        msg = ("%s %s: %s" %
411
               (response_content["code"],
412
                response_content["message"],
413
                response_content["explain"]))
414
      else:
415
        msg = str(response_content)
416

    
417
      raise GanetiApiError(msg)
418

    
419
    return response_content
420

    
421
  @staticmethod
422
  def _CheckStorageType(storage_type):
423
    """Checks a storage type for validity.
424

425
    """
426
    if storage_type not in VALID_STORAGE_TYPES:
427
      raise InvalidStorageType("%s is an invalid storage type" % storage_type)
428

    
429
  def GetVersion(self):
430
    """Gets the Remote API version running on the cluster.
431

432
    @rtype: int
433
    @return: Ganeti Remote API version
434

435
    """
436
    return self._SendRequest(HTTP_GET, "/version", None, None)
437

    
438
  def GetOperatingSystems(self):
439
    """Gets the Operating Systems running in the Ganeti cluster.
440

441
    @rtype: list of str
442
    @return: operating systems
443

444
    """
445
    return self._SendRequest(HTTP_GET, "/2/os", None, None)
446

    
447
  def GetInfo(self):
448
    """Gets info about the cluster.
449

450
    @rtype: dict
451
    @return: information about the cluster
452

453
    """
454
    return self._SendRequest(HTTP_GET, "/2/info", None, None)
455

    
456
  def GetClusterTags(self):
457
    """Gets the cluster tags.
458

459
    @rtype: list of str
460
    @return: cluster tags
461

462
    """
463
    return self._SendRequest(HTTP_GET, "/2/tags", None, None)
464

    
465
  def AddClusterTags(self, tags, dry_run=False):
466
    """Adds tags to the cluster.
467

468
    @type tags: list of str
469
    @param tags: tags to add to the cluster
470
    @type dry_run: bool
471
    @param dry_run: whether to perform a dry run
472

473
    @rtype: int
474
    @return: job id
475

476
    """
477
    query = [("tag", t) for t in tags]
478
    if dry_run:
479
      query.append(("dry-run", 1))
480

    
481
    return self._SendRequest(HTTP_PUT, "/2/tags", query, None)
482

    
483
  def DeleteClusterTags(self, tags, dry_run=False):
484
    """Deletes tags from the cluster.
485

486
    @type tags: list of str
487
    @param tags: tags to delete
488
    @type dry_run: bool
489
    @param dry_run: whether to perform a dry run
490

491
    """
492
    query = [("tag", t) for t in tags]
493
    if dry_run:
494
      query.append(("dry-run", 1))
495

    
496
    return self._SendRequest(HTTP_DELETE, "/2/tags", query, None)
497

    
498
  def GetInstances(self, bulk=False):
499
    """Gets information about instances on the cluster.
500

501
    @type bulk: bool
502
    @param bulk: whether to return all information about all instances
503

504
    @rtype: list of dict or list of str
505
    @return: if bulk is True, info about the instances, else a list of instances
506

507
    """
508
    query = []
509
    if bulk:
510
      query.append(("bulk", 1))
511

    
512
    instances = self._SendRequest(HTTP_GET, "/2/instances", query, None)
513
    if bulk:
514
      return instances
515
    else:
516
      return [i["id"] for i in instances]
517

    
518
  def GetInstanceInfo(self, instance):
519
    """Gets information about an instance.
520

521
    @type instance: str
522
    @param instance: instance whose info to return
523

524
    @rtype: dict
525
    @return: info about the instance
526

527
    """
528
    return self._SendRequest(HTTP_GET, "/2/instances/%s" % instance, None, None)
529

    
530
  def CreateInstance(self, dry_run=False):
531
    """Creates a new instance.
532

533
    @type dry_run: bool
534
    @param dry_run: whether to perform a dry run
535

536
    @rtype: int
537
    @return: job id
538

539
    """
540
    # TODO: Pass arguments needed to actually create an instance.
541
    query = []
542
    if dry_run:
543
      query.append(("dry-run", 1))
544

    
545
    return self._SendRequest(HTTP_POST, "/2/instances", query, None)
546

    
547
  def DeleteInstance(self, instance, dry_run=False):
548
    """Deletes an instance.
549

550
    @type instance: str
551
    @param instance: the instance to delete
552

553
    @rtype: int
554
    @return: job id
555

556
    """
557
    query = []
558
    if dry_run:
559
      query.append(("dry-run", 1))
560

    
561
    return self._SendRequest(HTTP_DELETE, "/2/instances/%s" % instance,
562
                             query, None)
563

    
564
  def GetInstanceTags(self, instance):
565
    """Gets tags for an instance.
566

567
    @type instance: str
568
    @param instance: instance whose tags to return
569

570
    @rtype: list of str
571
    @return: tags for the instance
572

573
    """
574
    return self._SendRequest(HTTP_GET, "/2/instances/%s/tags" % instance,
575
                             None, None)
576

    
577
  def AddInstanceTags(self, instance, tags, dry_run=False):
578
    """Adds tags to an instance.
579

580
    @type instance: str
581
    @param instance: instance to add tags to
582
    @type tags: list of str
583
    @param tags: tags to add to the instance
584
    @type dry_run: bool
585
    @param dry_run: whether to perform a dry run
586

587
    @rtype: int
588
    @return: job id
589

590
    """
591
    query = [("tag", t) for t in tags]
592
    if dry_run:
593
      query.append(("dry-run", 1))
594

    
595
    return self._SendRequest(HTTP_PUT, "/2/instances/%s/tags" % instance,
596
                             query, None)
597

    
598
  def DeleteInstanceTags(self, instance, tags, dry_run=False):
599
    """Deletes tags from an instance.
600

601
    @type instance: str
602
    @param instance: instance to delete tags from
603
    @type tags: list of str
604
    @param tags: tags to delete
605
    @type dry_run: bool
606
    @param dry_run: whether to perform a dry run
607

608
    """
609
    query = [("tag", t) for t in tags]
610
    if dry_run:
611
      query.append(("dry-run", 1))
612

    
613
    return self._SendRequest(HTTP_DELETE, "/2/instances/%s/tags" % instance,
614
                             query, None)
615

    
616
  def RebootInstance(self, instance, reboot_type=None, ignore_secondaries=None,
617
                     dry_run=False):
618
    """Reboots an instance.
619

620
    @type instance: str
621
    @param instance: instance to rebot
622
    @type reboot_type: str
623
    @param reboot_type: one of: hard, soft, full
624
    @type ignore_secondaries: bool
625
    @param ignore_secondaries: if True, ignores errors for the secondary node
626
        while re-assembling disks (in hard-reboot mode only)
627
    @type dry_run: bool
628
    @param dry_run: whether to perform a dry run
629

630
    """
631
    query = []
632
    if reboot_type:
633
      query.append(("type", reboot_type))
634
    if ignore_secondaries is not None:
635
      query.append(("ignore_secondaries", ignore_secondaries))
636
    if dry_run:
637
      query.append(("dry-run", 1))
638

    
639
    return self._SendRequest(HTTP_POST, "/2/instances/%s/reboot" % instance,
640
                             query, None)
641

    
642
  def ShutdownInstance(self, instance, dry_run=False):
643
    """Shuts down an instance.
644

645
    @type instance: str
646
    @param instance: the instance to shut down
647
    @type dry_run: bool
648
    @param dry_run: whether to perform a dry run
649

650
    """
651
    query = []
652
    if dry_run:
653
      query.append(("dry-run", 1))
654

    
655
    return self._SendRequest(HTTP_PUT, "/2/instances/%s/shutdown" % instance,
656
                             query, None)
657

    
658
  def StartupInstance(self, instance, dry_run=False):
659
    """Starts up an instance.
660

661
    @type instance: str
662
    @param instance: the instance to start up
663
    @type dry_run: bool
664
    @param dry_run: whether to perform a dry run
665

666
    """
667
    query = []
668
    if dry_run:
669
      query.append(("dry-run", 1))
670

    
671
    return self._SendRequest(HTTP_PUT, "/2/instances/%s/startup" % instance,
672
                             query, None)
673

    
674
  def ReinstallInstance(self, instance, os, no_startup=False):
675
    """Reinstalls an instance.
676

677
    @type instance: str
678
    @param instance: the instance to reinstall
679
    @type os: str
680
    @param os: the os to reinstall
681
    @type no_startup: bool
682
    @param no_startup: whether to start the instance automatically
683

684
    """
685
    query = [("os", os)]
686
    if no_startup:
687
      query.append(("nostartup", 1))
688
    return self._SendRequest(HTTP_POST, "/2/instances/%s/reinstall" % instance,
689
                             query, None)
690

    
691
  def ReplaceInstanceDisks(self, instance, disks, mode=REPLACE_DISK_AUTO,
692
                           remote_node=None, iallocator=None, dry_run=False):
693
    """Replaces disks on an instance.
694

695
    @type instance: str
696
    @param instance: instance whose disks to replace
697
    @type disks: list of str
698
    @param disks: disks to replace
699
    @type mode: str
700
    @param mode: replacement mode to use (defaults to replace_auto)
701
    @type remote_node: str or None
702
    @param remote_node: new secondary node to use (for use with
703
        replace_new_secondary mode)
704
    @type iallocator: str or None
705
    @param iallocator: instance allocator plugin to use (for use with
706
                       replace_auto mode)
707
    @type dry_run: bool
708
    @param dry_run: whether to perform a dry run
709

710
    @rtype: int
711
    @return: job id
712

713
    @raises InvalidReplacementMode: If an invalid disk replacement mode is given
714
    @raises GanetiApiError: If no secondary node is given with a non-auto
715
        replacement mode is requested.
716

717
    """
718
    if mode not in VALID_REPLACEMENT_MODES:
719
      raise InvalidReplacementMode("%s is not a valid disk replacement mode" %
720
                                   mode)
721

    
722
    query = [
723
      ("mode", mode),
724
      ("disks", ",".join(disks)),
725
      ]
726

    
727
    if mode == REPLACE_DISK_AUTO:
728
      query.append(("iallocator", iallocator))
729
    elif mode == REPLACE_DISK_SECONDARY:
730
      if remote_node is None:
731
        raise GanetiApiError("Missing secondary node")
732
      query.append(("remote_node", remote_node))
733

    
734
    if dry_run:
735
      query.append(("dry-run", 1))
736

    
737
    return self._SendRequest(HTTP_POST,
738
                             "/2/instances/%s/replace-disks" % instance,
739
                             query, None)
740

    
741
  def GetJobs(self):
742
    """Gets all jobs for the cluster.
743

744
    @rtype: list of int
745
    @return: job ids for the cluster
746

747
    """
748
    return [int(j["id"])
749
            for j in self._SendRequest(HTTP_GET, "/2/jobs", None, None)]
750

    
751
  def GetJobStatus(self, job_id):
752
    """Gets the status of a job.
753

754
    @type job_id: int
755
    @param job_id: job id whose status to query
756

757
    @rtype: dict
758
    @return: job status
759

760
    """
761
    return self._SendRequest(HTTP_GET, "/2/jobs/%d" % job_id, None, None)
762

    
763
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
764
    """Waits for job changes.
765

766
    @type job_id: int
767
    @param job_id: Job ID for which to wait
768

769
    """
770
    body = {
771
      "fields": fields,
772
      "previous_job_info": prev_job_info,
773
      "previous_log_serial": prev_log_serial,
774
      }
775

    
776
    return self._SendRequest(HTTP_GET, "/2/jobs/%s/wait" % job_id, None, body)
777

    
778
  def CancelJob(self, job_id, dry_run=False):
779
    """Cancels a job.
780

781
    @type job_id: int
782
    @param job_id: id of the job to delete
783
    @type dry_run: bool
784
    @param dry_run: whether to perform a dry run
785

786
    """
787
    query = []
788
    if dry_run:
789
      query.append(("dry-run", 1))
790

    
791
    return self._SendRequest(HTTP_DELETE, "/2/jobs/%d" % job_id, query, None)
792

    
793
  def GetNodes(self, bulk=False):
794
    """Gets all nodes in the cluster.
795

796
    @type bulk: bool
797
    @param bulk: whether to return all information about all instances
798

799
    @rtype: list of dict or str
800
    @return: if bulk is true, info about nodes in the cluster,
801
        else list of nodes in the cluster
802

803
    """
804
    query = []
805
    if bulk:
806
      query.append(("bulk", 1))
807

    
808
    nodes = self._SendRequest(HTTP_GET, "/2/nodes", query, None)
809
    if bulk:
810
      return nodes
811
    else:
812
      return [n["id"] for n in nodes]
813

    
814
  def GetNodeInfo(self, node):
815
    """Gets information about a node.
816

817
    @type node: str
818
    @param node: node whose info to return
819

820
    @rtype: dict
821
    @return: info about the node
822

823
    """
824
    return self._SendRequest(HTTP_GET, "/2/nodes/%s" % node, None, None)
825

    
826
  def EvacuateNode(self, node, iallocator=None, remote_node=None,
827
                   dry_run=False):
828
    """Evacuates instances from a Ganeti node.
829

830
    @type node: str
831
    @param node: node to evacuate
832
    @type iallocator: str or None
833
    @param iallocator: instance allocator to use
834
    @type remote_node: str
835
    @param remote_node: node to evaucate to
836
    @type dry_run: bool
837
    @param dry_run: whether to perform a dry run
838

839
    @rtype: int
840
    @return: job id
841

842
    @raises GanetiApiError: if an iallocator and remote_node are both specified
843

844
    """
845
    if iallocator and remote_node:
846
      raise GanetiApiError("Only one of iallocator or remote_node can be used")
847

    
848
    query = []
849
    if iallocator:
850
      query.append(("iallocator", iallocator))
851
    if remote_node:
852
      query.append(("remote_node", remote_node))
853
    if dry_run:
854
      query.append(("dry-run", 1))
855

    
856
    return self._SendRequest(HTTP_POST, "/2/nodes/%s/evacuate" % node,
857
                             query, None)
858

    
859
  def MigrateNode(self, node, live=True, dry_run=False):
860
    """Migrates all primary instances from a node.
861

862
    @type node: str
863
    @param node: node to migrate
864
    @type live: bool
865
    @param live: whether to use live migration
866
    @type dry_run: bool
867
    @param dry_run: whether to perform a dry run
868

869
    @rtype: int
870
    @return: job id
871

872
    """
873
    query = []
874
    if live:
875
      query.append(("live", 1))
876
    if dry_run:
877
      query.append(("dry-run", 1))
878

    
879
    return self._SendRequest(HTTP_POST, "/2/nodes/%s/migrate" % node,
880
                             query, None)
881

    
882
  def GetNodeRole(self, node):
883
    """Gets the current role for a node.
884

885
    @type node: str
886
    @param node: node whose role to return
887

888
    @rtype: str
889
    @return: the current role for a node
890

891
    """
892
    return self._SendRequest(HTTP_GET, "/2/nodes/%s/role" % node, None, None)
893

    
894
  def SetNodeRole(self, node, role, force=False):
895
    """Sets the role for a node.
896

897
    @type node: str
898
    @param node: the node whose role to set
899
    @type role: str
900
    @param role: the role to set for the node
901
    @type force: bool
902
    @param force: whether to force the role change
903

904
    @rtype: int
905
    @return: job id
906

907
    @raise InvalidNodeRole: If an invalid node role is specified
908

909
    """
910
    if role not in VALID_NODE_ROLES:
911
      raise InvalidNodeRole("%s is not a valid node role" % role)
912

    
913
    query = [("force", force)]
914

    
915
    return self._SendRequest(HTTP_PUT, "/2/nodes/%s/role" % node,
916
                             query, role)
917

    
918
  def GetNodeStorageUnits(self, node, storage_type, output_fields):
919
    """Gets the storage units for a node.
920

921
    @type node: str
922
    @param node: the node whose storage units to return
923
    @type storage_type: str
924
    @param storage_type: storage type whose units to return
925
    @type output_fields: str
926
    @param output_fields: storage type fields to return
927

928
    @rtype: int
929
    @return: job id where results can be retrieved
930

931
    @raise InvalidStorageType: If an invalid storage type is specified
932

933
    """
934
    # TODO: Add default for storage_type & output_fields
935
    self._CheckStorageType(storage_type)
936

    
937
    query = [
938
      ("storage_type", storage_type),
939
      ("output_fields", output_fields),
940
      ]
941

    
942
    return self._SendRequest(HTTP_GET, "/2/nodes/%s/storage" % node,
943
                             query, None)
944

    
945
  def ModifyNodeStorageUnits(self, node, storage_type, name, allocatable=True):
946
    """Modifies parameters of storage units on the node.
947

948
    @type node: str
949
    @param node: node whose storage units to modify
950
    @type storage_type: str
951
    @param storage_type: storage type whose units to modify
952
    @type name: str
953
    @param name: name of the storage unit
954
    @type allocatable: bool
955
    @param allocatable: TODO: Document me
956

957
    @rtype: int
958
    @return: job id
959

960
    @raise InvalidStorageType: If an invalid storage type is specified
961

962
    """
963
    self._CheckStorageType(storage_type)
964

    
965
    query = [
966
      ("storage_type", storage_type),
967
      ("name", name),
968
      ("allocatable", allocatable),
969
      ]
970

    
971
    return self._SendRequest(HTTP_PUT, "/2/nodes/%s/storage/modify" % node,
972
                             query, None)
973

    
974
  def RepairNodeStorageUnits(self, node, storage_type, name):
975
    """Repairs a storage unit on the node.
976

977
    @type node: str
978
    @param node: node whose storage units to repair
979
    @type storage_type: str
980
    @param storage_type: storage type to repair
981
    @type name: str
982
    @param name: name of the storage unit to repair
983

984
    @rtype: int
985
    @return: job id
986

987
    @raise InvalidStorageType: If an invalid storage type is specified
988

989
    """
990
    self._CheckStorageType(storage_type)
991

    
992
    query = [
993
      ("storage_type", storage_type),
994
      ("name", name),
995
      ]
996

    
997
    return self._SendRequest(HTTP_PUT, "/2/nodes/%s/storage/repair" % node,
998
                             query, None)
999

    
1000
  def GetNodeTags(self, node):
1001
    """Gets the tags for a node.
1002

1003
    @type node: str
1004
    @param node: node whose tags to return
1005

1006
    @rtype: list of str
1007
    @return: tags for the node
1008

1009
    """
1010
    return self._SendRequest(HTTP_GET, "/2/nodes/%s/tags" % node, None, None)
1011

    
1012
  def AddNodeTags(self, node, tags, dry_run=False):
1013
    """Adds tags to a node.
1014

1015
    @type node: str
1016
    @param node: node to add tags to
1017
    @type tags: list of str
1018
    @param tags: tags to add to the node
1019
    @type dry_run: bool
1020
    @param dry_run: whether to perform a dry run
1021

1022
    @rtype: int
1023
    @return: job id
1024

1025
    """
1026
    query = [("tag", t) for t in tags]
1027
    if dry_run:
1028
      query.append(("dry-run", 1))
1029

    
1030
    return self._SendRequest(HTTP_PUT, "/2/nodes/%s/tags" % node,
1031
                             query, tags)
1032

    
1033
  def DeleteNodeTags(self, node, tags, dry_run=False):
1034
    """Delete tags from a node.
1035

1036
    @type node: str
1037
    @param node: node to remove tags from
1038
    @type tags: list of str
1039
    @param tags: tags to remove from the node
1040
    @type dry_run: bool
1041
    @param dry_run: whether to perform a dry run
1042

1043
    @rtype: int
1044
    @return: job id
1045

1046
    """
1047
    query = [("tag", t) for t in tags]
1048
    if dry_run:
1049
      query.append(("dry-run", 1))
1050

    
1051
    return self._SendRequest(HTTP_DELETE, "/2/nodes/%s/tags" % node,
1052
                             query, None)