Revision 95ab4de9

b/Makefile.am
129 129
rapi_PYTHON = \
130 130
	lib/rapi/__init__.py \
131 131
	lib/rapi/baserlib.py \
132
	lib/rapi/client.py \
132 133
	lib/rapi/connector.py \
133 134
	lib/rapi/rlib2.py
134 135

  
......
333 334
	test/ganeti.mcpu_unittest.py \
334 335
	test/ganeti.objects_unittest.py \
335 336
	test/ganeti.opcodes_unittest.py \
337
	test/ganeti.rapi.client_unittest.py \
336 338
	test/ganeti.rapi.resources_unittest.py \
337 339
	test/ganeti.serializer_unittest.py \
338 340
	test/ganeti.ssh_unittest.py \
b/lib/rapi/client.py
1
#
2
#
3

  
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Ganeti RAPI client."""
23

  
24
import httplib
25
import httplib2
26
import simplejson
27
import socket
28
import urllib
29
from OpenSSL import SSL
30
from OpenSSL import crypto
31

  
32

  
33
HTTP_DELETE = "DELETE"
34
HTTP_GET = "GET"
35
HTTP_PUT = "PUT"
36
HTTP_POST = "POST"
37
REPLACE_DISK_PRI = "replace_on_primary"
38
REPLACE_DISK_SECONDARY = "replace_on_secondary"
39
REPLACE_DISK_CHG = "replace_new_secondary"
40
REPLACE_DISK_AUTO = "replace_auto"
41
VALID_REPLACEMENT_MODES = frozenset([
42
    REPLACE_DISK_PRI, REPLACE_DISK_SECONDARY, REPLACE_DISK_CHG,
43
    REPLACE_DISK_AUTO
44
    ])
45
VALID_NODE_ROLES = frozenset([
46
    "drained", "master", "master-candidate", "offline", "regular"
47
    ])
48
VALID_STORAGE_TYPES = frozenset(["file", "lvm-pv", "lvm-vg"])
49

  
50

  
51
class Error(Exception):
52
  """Base error class for this module.
53

  
54
  """
55
  pass
56

  
57

  
58
class CertificateError(Error):
59
  """Raised when a problem is found with the SSL certificate.
60

  
61
  """
62
  pass
63

  
64

  
65
class GanetiApiError(Error):
66
  """Generic error raised from Ganeti API.
67

  
68
  """
69
  pass
70

  
71

  
72
class InvalidReplacementMode(Error):
73
  """Raised when an invalid disk replacement mode is attempted.
74

  
75
  """
76
  pass
77

  
78

  
79
class InvalidStorageType(Error):
80
  """Raised when an invalid storage type is used.
81

  
82
  """
83
  pass
84

  
85

  
86
class InvalidNodeRole(Error):
87
  """Raised when an invalid node role is used.
88

  
89
  """
90
  pass
91

  
92

  
93
class GanetiRapiClient(object):
94
  """Ganeti RAPI client.
95

  
96
  """
97

  
98
  USER_AGENT = "Ganeti RAPI Client"
99

  
100
  def __init__(self, master_hostname, port=5080, username=None, password=None,
101
               ssl_cert=None):
102
    """Constructor.
103

  
104
    @type master_hostname: str
105
    @param master_hostname: the ganeti cluster master to interact with
106
    @type port: int
107
    @param port: the port on which the RAPI is running. (default is 5080)
108
    @type username: str
109
    @param username: the username to connect with
110
    @type password: str
111
    @param password: the password to connect with
112
    @type ssl_cert: str or None
113
    @param ssl_cert: the expected SSL certificate. if None, SSL certificate
114
        will not be verified
115

  
116
    """
117
    self._master_hostname = master_hostname
118
    self._port = port
119
    if ssl_cert:
120
      _VerifyCertificate(self._master_hostname, self._port, ssl_cert)
121

  
122
    self._http = httplib2.Http()
123
    self._headers = {
124
        "Accept": "text/plain",
125
        "Content-type": "application/x-www-form-urlencoded",
126
        "User-Agent": self.USER_AGENT}
127
    self._version = None
128
    if username and password:
129
      self._http.add_credentials(username, password)
130

  
131
  def _MakeUrl(self, path, query=None, prepend_version=True):
132
    """Constructs the URL to pass to the HTTP client.
133

  
134
    @type path: str
135
    @param path: HTTP URL path
136
    @type query: list of two-tuples
137
    @param query: query arguments to pass to urllib.urlencode
138
    @type prepend_version: bool
139
    @param prepend_version: whether to automatically fetch and prepend the
140
        Ganeti version to the URL path
141

  
142
    @rtype:  str
143
    @return: URL path
144

  
145
    """
146
    if prepend_version:
147
      if not self._version:
148
        self._GetVersionInternal()
149
      path = "/%d%s" % (self._version, path)
150

  
151
    return "https://%(host)s:%(port)d%(path)s?%(query)s" % {
152
        "host": self._master_hostname,
153
        "port": self._port,
154
        "path": path,
155
        "query": urllib.urlencode(query or [])}
156

  
157
  def _SendRequest(self, method, path, query=None, content=None,
158
                   prepend_version=True):
159
    """Sends an HTTP request.
160

  
161
    This constructs a full URL, encodes and decodes HTTP bodies, and
162
    handles invalid responses in a pythonic way.
163

  
164
    @type method: str
165
    @param method: HTTP method to use
166
    @type path: str
167
    @param path: HTTP URL path
168
    @type query: list of two-tuples
169
    @param query: query arguments to pass to urllib.urlencode
170
    @type content: str or None
171
    @param content: HTTP body content
172
    @type prepend_version: bool
173
    @param prepend_version: whether to automatically fetch and prepend the
174
        Ganeti version to the URL path
175

  
176
    @rtype: str
177
    @return: JSON-Decoded response
178

  
179
    @raises GanetiApiError: If an invalid response is returned
180

  
181
    """
182
    if content:
183
      simplejson.JSONEncoder(sort_keys=True).encode(content)
184

  
185
    url = self._MakeUrl(path, query, prepend_version)
186
    resp_headers, resp_content = self._http.request(
187
        url, method, body=content, headers=self._headers)
188

  
189
    if resp_content:
190
      resp_content = simplejson.loads(resp_content)
191

  
192
    # TODO: Are there other status codes that are valid? (redirect?)
193
    if resp_headers.status != 200:
194
      if isinstance(resp_content, dict):
195
        msg = ("%s %s: %s" %
196
            (resp_content["code"], resp_content["message"],
197
             resp_content["explain"]))
198
      else:
199
        msg = resp_content
200
      raise GanetiApiError(msg)
201

  
202
    return resp_content
203

  
204
  def _GetVersionInternal(self):
205
    """Gets the Remote API version running on the cluster.
206

  
207
    @rtype: int
208
    @return: Ganeti version
209

  
210
    """
211
    self._version = self._SendRequest(HTTP_GET, "/version",
212
                                      prepend_version=False)
213
    return self._version
214

  
215
  def GetVersion(self):
216
    """Gets the ganeti version running on the cluster.
217

  
218
    @rtype: int
219
    @return: Ganeti version
220

  
221
    """
222
    if not self._version:
223
      self._GetVersionInternal()
224
    return self._version
225

  
226
  def GetOperatingSystems(self):
227
    """Gets the Operating Systems running in the Ganeti cluster.
228

  
229
    @rtype: list of str
230
    @return: operating systems
231

  
232
    """
233
    return self._SendRequest(HTTP_GET, "/os")
234

  
235
  def GetInfo(self):
236
    """Gets info about the cluster.
237

  
238
    @rtype: dict
239
    @return: information about the cluster
240

  
241
    """
242
    return self._SendRequest(HTTP_GET, "/info")
243

  
244
  def GetClusterTags(self):
245
    """Gets the cluster tags.
246

  
247
    @rtype: list of str
248
    @return: cluster tags
249

  
250
    """
251
    return self._SendRequest(HTTP_GET, "/tags")
252

  
253
  def AddClusterTags(self, tags, dry_run=False):
254
    """Adds tags to the cluster.
255

  
256
    @type tags: list of str
257
    @param tags: tags to add to the cluster
258
    @type dry_run: bool
259
    @param dry_run: whether to perform a dry run
260

  
261
    @rtype: int
262
    @return: job id
263

  
264
    """
265
    query = [("tag", t) for t in tags]
266
    if dry_run:
267
      query.append(("dry-run", 1))
268

  
269
    self._SendRequest(HTTP_PUT, "/tags", query)
270

  
271
  def DeleteClusterTags(self, tags, dry_run=False):
272
    """Deletes tags from the cluster.
273

  
274
    @type tags: list of str
275
    @param tags: tags to delete
276
    @type dry_run: bool
277
    @param dry_run: whether to perform a dry run
278

  
279
    """
280
    query = [("tag", t) for t in tags]
281
    if dry_run:
282
      query.append(("dry-run", 1))
283

  
284
    self._SendRequest(HTTP_DELETE, "/tags", query)
285

  
286
  def GetInstances(self, bulk=False):
287
    """Gets information about instances on the cluster.
288

  
289
    @type bulk: bool
290
    @param bulk: whether to return all information about all instances
291

  
292
    @rtype: list of dict or list of str
293
    @return: if bulk is True, info about the instances, else a list of instances
294

  
295
    """
296
    query = []
297
    if bulk:
298
      query.append(("bulk", 1))
299

  
300
    instances = self._SendRequest(HTTP_GET, "/instances", query)
301
    if bulk:
302
      return instances
303
    else:
304
      return [i["id"] for i in instances]
305

  
306

  
307
  def GetInstanceInfo(self, instance):
308
    """Gets information about an instance.
309

  
310
    @type instance: str
311
    @param instance: instance whose info to return
312

  
313
    @rtype: dict
314
    @return: info about the instance
315

  
316
    """
317
    return self._SendRequest(HTTP_GET, "/instances/%s" % instance)
318

  
319
  def CreateInstance(self, dry_run=False):
320
    """Creates a new instance.
321

  
322
    @type dry_run: bool
323
    @param dry_run: whether to perform a dry run
324

  
325
    @rtype: int
326
    @return: job id
327

  
328
    """
329
    # TODO: Pass arguments needed to actually create an instance.
330
    query = []
331
    if dry_run:
332
      query.append(("dry-run", 1))
333

  
334
    return self._SendRequest(HTTP_POST, "/instances", query)
335

  
336
  def DeleteInstance(self, instance, dry_run=False):
337
    """Deletes an instance.
338

  
339
    @type instance: str
340
    @param instance: the instance to delete
341

  
342
    """
343
    query = []
344
    if dry_run:
345
      query.append(("dry-run", 1))
346

  
347
    self._SendRequest(HTTP_DELETE, "/instances/%s" % instance, query)
348

  
349
  def GetInstanceTags(self, instance):
350
    """Gets tags for an instance.
351

  
352
    @type instance: str
353
    @param instance: instance whose tags to return
354

  
355
    @rtype: list of str
356
    @return: tags for the instance
357

  
358
    """
359
    return self._SendRequest(HTTP_GET, "/instances/%s/tags" % instance)
360

  
361
  def AddInstanceTags(self, instance, tags, dry_run=False):
362
    """Adds tags to an instance.
363

  
364
    @type instance: str
365
    @param instance: instance to add tags to
366
    @type tags: list of str
367
    @param tags: tags to add to the instance
368
    @type dry_run: bool
369
    @param dry_run: whether to perform a dry run
370

  
371
    @rtype: int
372
    @return: job id
373

  
374
    """
375
    query = [("tag", t) for t in tags]
376
    if dry_run:
377
      query.append(("dry-run", 1))
378

  
379
    self._SendRequest(HTTP_PUT, "/instances/%s/tags" % instance, query)
380

  
381
  def DeleteInstanceTags(self, instance, tags, dry_run=False):
382
    """Deletes tags from an instance.
383

  
384
    @type instance: str
385
    @param instance: instance to delete tags from
386
    @type tags: list of str
387
    @param tags: tags to delete
388
    @type dry_run: bool
389
    @param dry_run: whether to perform a dry run
390

  
391
    """
392
    query = [("tag", t) for t in tags]
393
    if dry_run:
394
      query.append(("dry-run", 1))
395

  
396
    self._SendRequest(HTTP_DELETE, "/instances/%s/tags" % instance, query)
397

  
398
  def RebootInstance(self, instance, reboot_type=None, ignore_secondaries=None,
399
                     dry_run=False):
400
    """Reboots an instance.
401

  
402
    @type instance: str
403
    @param instance: instance to rebot
404
    @type reboot_type: str
405
    @param reboot_type: one of: hard, soft, full
406
    @type ignore_secondaries: bool
407
    @param ignore_secondaries: if True, ignores errors for the secondary node
408
        while re-assembling disks (in hard-reboot mode only)
409
    @type dry_run: bool
410
    @param dry_run: whether to perform a dry run
411

  
412
    """
413
    query = []
414
    if reboot_type:
415
      query.append(("type", reboot_type))
416
    if ignore_secondaries is not None:
417
      query.append(("ignore_secondaries", ignore_secondaries))
418
    if dry_run:
419
      query.append(("dry-run", 1))
420

  
421
    self._SendRequest(HTTP_POST, "/instances/%s/reboot" % instance, query)
422

  
423
  def ShutdownInstance(self, instance, dry_run=False):
424
    """Shuts down an instance.
425

  
426
    @type instance: str
427
    @param instance: the instance to shut down
428
    @type dry_run: bool
429
    @param dry_run: whether to perform a dry run
430

  
431
    """
432
    query = []
433
    if dry_run:
434
      query.append(("dry-run", 1))
435

  
436
    self._SendRequest(HTTP_PUT, "/instances/%s/shutdown" % instance, query)
437

  
438
  def StartupInstance(self, instance, dry_run=False):
439
    """Starts up an instance.
440

  
441
    @type instance: str
442
    @param instance: the instance to start up
443
    @type dry_run: bool
444
    @param dry_run: whether to perform a dry run
445

  
446
    """
447
    query = []
448
    if dry_run:
449
      query.append(("dry-run", 1))
450

  
451
    self._SendRequest(HTTP_PUT, "/instances/%s/startup" % instance, query)
452

  
453
  def ReinstallInstance(self, instance, os, no_startup=False):
454
    """Reinstalls an instance.
455

  
456
    @type instance: str
457
    @param instance: the instance to reinstall
458
    @type os: str
459
    @param os: the os to reinstall
460
    @type no_startup: bool
461
    @param no_startup: whether to start the instance automatically
462

  
463
    """
464
    query = [("os", os)]
465
    if no_startup:
466
      query.append(("nostartup", 1))
467
    self._SendRequest(HTTP_POST, "/instances/%s/reinstall" % instance, query)
468

  
469
  def ReplaceInstanceDisks(self, instance, disks, mode="replace_auto",
470
                           remote_node=None, iallocator="hail", dry_run=False):
471
    """Replaces disks on an instance.
472

  
473
    @type instance: str
474
    @param instance: instance whose disks to replace
475
    @type disks: list of str
476
    @param disks: disks to replace
477
    @type mode: str
478
    @param mode: replacement mode to use. defaults to replace_auto
479
    @type remote_node: str or None
480
    @param remote_node: new secondary node to use (for use with
481
        replace_new_secondary mdoe)
482
    @type iallocator: str or None
483
    @param iallocator: instance allocator plugin to use (for use with
484
        replace_auto mdoe).  default is hail
485
    @type dry_run: bool
486
    @param dry_run: whether to perform a dry run
487

  
488
    @rtype: int
489
    @return: job id
490

  
491
    @raises InvalidReplacementMode: If an invalid disk replacement mode is given
492
    @raises GanetiApiError: If no secondary node is given with a non-auto
493
        replacement mode is requested.
494

  
495
    """
496
    if mode not in VALID_REPLACEMENT_MODES:
497
      raise InvalidReplacementMode("%s is not a valid disk replacement mode.",
498
                                   mode)
499

  
500
    query = [("mode", mode), ("disks", ",".join(disks))]
501

  
502
    if mode is REPLACE_DISK_AUTO:
503
      query.append(("iallocator", iallocator))
504
    elif mode is REPLACE_DISK_SECONDARY:
505
      if remote_node is None:
506
        raise GanetiApiError("You must supply a new secondary node.")
507
      query.append(("remote_node", remote_node))
508

  
509
    if dry_run:
510
      query.append(("dry-run", 1))
511

  
512
    return self._SendRequest(HTTP_POST,
513
                             "/instances/%s/replace-disks" % instance, query)
514

  
515
  def GetJobs(self):
516
    """Gets all jobs for the cluster.
517

  
518
    @rtype: list of int
519
    @return: job ids for the cluster
520

  
521
    """
522
    return [int(j["id"]) for j in self._SendRequest(HTTP_GET, "/jobs")]
523

  
524
  def GetJobStatus(self, job_id):
525
    """Gets the status of a job.
526

  
527
    @type job_id: int
528
    @param job_id: job id whose status to query
529

  
530
    @rtype: dict
531
    @return: job status
532

  
533
    """
534
    return self._SendRequest(HTTP_GET, "/jobs/%d" % job_id)
535

  
536
  def DeleteJob(self, job_id, dry_run=False):
537
    """Deletes a job.
538

  
539
    @type job_id: int
540
    @param job_id: id of the job to delete
541
    @type dry_run: bool
542
    @param dry_run: whether to perform a dry run
543

  
544
    """
545
    query = []
546
    if dry_run:
547
      query.append(("dry-run", 1))
548

  
549
    self._SendRequest(HTTP_DELETE, "/jobs/%d" % job_id, query)
550

  
551
  def GetNodes(self, bulk=False):
552
    """Gets all nodes in the cluster.
553

  
554
    @type bulk: bool
555
    @param bulk: whether to return all information about all instances
556

  
557
    @rtype: list of dict or str
558
    @return: if bulk is true, info about nodes in the cluster,
559
        else list of nodes in the cluster
560

  
561
    """
562
    query = []
563
    if bulk:
564
      query.append(("bulk", 1))
565

  
566
    nodes = self._SendRequest(HTTP_GET, "/nodes", query)
567
    if bulk:
568
      return nodes
569
    else:
570
      return [n["id"] for n in nodes]
571

  
572
  def GetNodeInfo(self, node):
573
    """Gets information about a node.
574

  
575
    @type node: str
576
    @param node: node whose info to return
577

  
578
    @rtype: dict
579
    @return: info about the node
580

  
581
    """
582
    return self._SendRequest(HTTP_GET, "/nodes/%s" % node)
583

  
584
  def EvacuateNode(self, node, iallocator=None, remote_node=None,
585
                   dry_run=False):
586
    """Evacuates instances from a Ganeti node.
587

  
588
    @type node: str
589
    @param node: node to evacuate
590
    @type iallocator: str or None
591
    @param iallocator: instance allocator to use
592
    @type remote_node: str
593
    @param remote_node: node to evaucate to
594
    @type dry_run: bool
595
    @param dry_run: whether to perform a dry run
596

  
597
    @rtype: int
598
    @return: job id
599

  
600
    @raises GanetiApiError: if an iallocator and remote_node are both specified
601

  
602
    """
603
    query = []
604
    if iallocator and remote_node:
605
      raise GanetiApiError("Only one of iallocator or remote_node can be used.")
606

  
607
    if iallocator:
608
      query.append(("iallocator", iallocator))
609
    if remote_node:
610
      query.append(("remote_node", remote_node))
611
    if dry_run:
612
      query.append(("dry-run", 1))
613

  
614
    return self._SendRequest(HTTP_POST, "/nodes/%s/evacuate" % node, query)
615

  
616
  def MigrateNode(self, node, live=True, dry_run=False):
617
    """Migrates all primary instances from a node.
618

  
619
    @type node: str
620
    @param node: node to migrate
621
    @type live: bool
622
    @param live: whether to use live migration
623
    @type dry_run: bool
624
    @param dry_run: whether to perform a dry run
625

  
626
    @rtype: int
627
    @return: job id
628

  
629
    """
630
    query = []
631
    if live:
632
      query.append(("live", 1))
633
    if dry_run:
634
      query.append(("dry-run", 1))
635

  
636
    return self._SendRequest(HTTP_POST, "/nodes/%s/migrate" % node, query)
637

  
638
  def GetNodeRole(self, node):
639
    """Gets the current role for a node.
640

  
641
    @type node: str
642
    @param node: node whose role to return
643

  
644
    @rtype: str
645
    @return: the current role for a node
646

  
647
    """
648
    return self._SendRequest(HTTP_GET, "/nodes/%s/role" % node)
649

  
650
  def SetNodeRole(self, node, role, force=False):
651
    """Sets the role for a node.
652

  
653
    @type node: str
654
    @param node: the node whose role to set
655
    @type role: str
656
    @param role: the role to set for the node
657
    @type force: bool
658
    @param force: whether to force the role change
659

  
660
    @rtype: int
661
    @return: job id
662

  
663
    @raise InvalidNodeRole: If an invalid node role is specified
664

  
665
    """
666
    if role not in VALID_NODE_ROLES:
667
      raise InvalidNodeRole("%s is not a valid node role.", role)
668

  
669
    query = [("force", force)]
670
    return self._SendRequest(HTTP_PUT, "/nodes/%s/role" % node, query,
671
                             content=role)
672

  
673
  def GetNodeStorageUnits(self, node, storage_type, output_fields):
674
    """Gets the storage units for a node.
675

  
676
    @type node: str
677
    @param node: the node whose storage units to return
678
    @type storage_type: str
679
    @param storage_type: storage type whose units to return
680
    @type output_fields: str
681
    @param output_fields: storage type fields to return
682

  
683
    @rtype: int
684
    @return: job id where results can be retrieved
685

  
686
    @raise InvalidStorageType: If an invalid storage type is specified
687

  
688
    """
689
    # TODO: Add default for storage_type & output_fields
690
    if storage_type not in VALID_STORAGE_TYPES:
691
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
692

  
693
    query = [("storage_type", storage_type), ("output_fields", output_fields)]
694
    return self._SendRequest(HTTP_GET, "/nodes/%s/storage" % node, query)
695

  
696
  def ModifyNodeStorageUnits(self, node, storage_type, name, allocatable=True):
697
    """Modifies parameters of storage units on the node.
698

  
699
    @type node: str
700
    @param node: node whose storage units to modify
701
    @type storage_type: str
702
    @param storage_type: storage type whose units to modify
703
    @type name: str
704
    @param name: name of the storage unit
705
    @type allocatable: bool
706
    @param allocatable: TODO: Document me
707

  
708
    @rtype: int
709
    @return: job id
710

  
711
    @raise InvalidStorageType: If an invalid storage type is specified
712

  
713
    """
714
    if storage_type not in VALID_STORAGE_TYPES:
715
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
716

  
717
    query = [
718
        ("storage_type", storage_type), ("name", name),
719
        ("allocatable", allocatable)
720
        ]
721
    return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/modify" % node, query)
722

  
723
  def RepairNodeStorageUnits(self, node, storage_type, name):
724
    """Repairs a storage unit on the node.
725

  
726
    @type node: str
727
    @param node: node whose storage units to repair
728
    @type storage_type: str
729
    @param storage_type: storage type to repair
730
    @type name: str
731
    @param name: name of the storage unit to repair
732

  
733
    @rtype: int
734
    @return: job id
735

  
736
    @raise InvalidStorageType: If an invalid storage type is specified
737

  
738
    """
739
    if storage_type not in VALID_STORAGE_TYPES:
740
      raise InvalidStorageType("%s is an invalid storage type.", storage_type)
741

  
742
    query = [("storage_type", storage_type), ("name", name)]
743
    return self._SendRequest(HTTP_PUT, "/nodes/%s/storage/repair" % node, query)
744

  
745
  def GetNodeTags(self, node):
746
    """Gets the tags for a node.
747

  
748
    @type node: str
749
    @param node: node whose tags to return
750

  
751
    @rtype: list of str
752
    @return: tags for the node
753

  
754
    """
755
    return self._SendRequest(HTTP_GET, "/nodes/%s/tags" % node)
756

  
757
  def AddNodeTags(self, node, tags, dry_run=False):
758
    """Adds tags to a node.
759

  
760
    @type node: str
761
    @param node: node to add tags to
762
    @type tags: list of str
763
    @param tags: tags to add to the node
764
    @type dry_run: bool
765
    @param dry_run: whether to perform a dry run
766

  
767
    @rtype: int
768
    @return: job id
769

  
770
    """
771
    query = [("tag", t) for t in tags]
772
    if dry_run:
773
      query.append(("dry-run", 1))
774

  
775
    return self._SendRequest(HTTP_PUT, "/nodes/%s/tags" % node, query,
776
                             content=tags)
777

  
778
  def DeleteNodeTags(self, node, tags, dry_run=False):
779
    """Delete tags from a node.
780

  
781
    @type node: str
782
    @param node: node to remove tags from
783
    @type tags: list of str
784
    @param tags: tags to remove from the node
785
    @type dry_run: bool
786
    @param dry_run: whether to perform a dry run
787

  
788
    @rtype: int
789
    @return: job id
790

  
791
    """
792
    query = [("tag", t) for t in tags]
793
    if dry_run:
794
      query.append(("dry-run", 1))
795

  
796
    return self._SendRequest(HTTP_DELETE, "/nodes/%s/tags" % node, query)
797

  
798

  
799
class HTTPSConnectionOpenSSL(httplib.HTTPSConnection):
800
  """HTTPS Connection handler that verifies the SSL certificate.
801

  
802
  """
803

  
804
  # pylint: disable-msg=W0142
805
  def __init__(self, *args, **kwargs):
806
    """Constructor.
807

  
808
    """
809
    httplib.HTTPSConnection.__init__(self, *args, **kwargs)
810

  
811
    self._ssl_cert = None
812
    if self.cert_file:
813
      f = open(self.cert_file, "r")
814
      self._ssl_cert = crypto.load_certificate(crypto.FILETYPE_PEM, f.read())
815
      f.close()
816

  
817
  # pylint: disable-msg=W0613
818
  def _VerifySSLCertCallback(self, conn, cert, errnum, errdepth, ok):
819
    """Verifies the SSL certificate provided by the peer.
820

  
821
    """
822
    return (self._ssl_cert.digest("sha1") == cert.digest("sha1") and
823
            self._ssl_cert.digest("md5") == cert.digest("md5"))
824

  
825
  def connect(self):
826
    """Connect to the server specified when the object was created.
827

  
828
    This ensures that SSL certificates are verified.
829

  
830
    """
831
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
832
    ctx = SSL.Context(SSL.SSLv23_METHOD)
833
    ctx.set_options(SSL.OP_NO_SSLv2)
834
    ctx.use_certificate(self._ssl_cert)
835
    ctx.set_verify(SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
836
                   self._VerifySSLCertCallback)
837

  
838
    ssl = SSL.Connection(ctx, sock)
839
    ssl.connect((self.host, self.port))
840
    self.sock = httplib.FakeSocket(sock, ssl)
841

  
842

  
843
def _VerifyCertificate(hostname, port, cert_file):
844
  """Verifies the SSL certificate for the given host/port.
845

  
846
  @type hostname: str
847
  @param hostname: the ganeti cluster master whose certificate to verify
848
  @type port: int
849
  @param port: the port on which the RAPI is running
850
  @type cert_file: str
851
  @param cert_file: filename of the expected SSL certificate
852

  
853
  @raises CertificateError: If an invalid SSL certificate is found
854

  
855
  """
856
  https = HTTPSConnectionOpenSSL(hostname, port, cert_file=cert_file)
857
  try:
858
    try:
859
      https.request(HTTP_GET, "/version")
860
    except (crypto.Error, SSL.Error):
861
      raise CertificateError("Invalid SSL certificate.")
862
  finally:
863
    https.close()
b/test/ganeti.rapi.client_unittest.py
1
#!/usr/bin/python
2
#
3

  
4
# Copyright (C) 2010 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

  
21

  
22
"""Script for unittesting the RAPI client module"""
23

  
24

  
25
try:
26
  import httplib2
27
  BaseHttp = httplib2.Http
28
  from ganeti.rapi import client
29
except ImportError:
30
  httplib2 = None
31
  BaseHttp = object
32

  
33
import re
34
import unittest
35
import warnings
36

  
37
from ganeti import http
38

  
39
from ganeti.rapi import connector
40
from ganeti.rapi import rlib2
41

  
42
import testutils
43

  
44

  
45
_URI_RE = re.compile(r"https://(?P<host>.*):(?P<port>\d+)(?P<path>/.*)")
46

  
47

  
48
def _GetPathFromUri(uri):
49
  """Gets the path and query from a URI.
50

  
51
  """
52
  match = _URI_RE.match(uri)
53
  if match:
54
    return match.groupdict()["path"]
55
  else:
56
    return None
57

  
58

  
59
class HttpResponseMock(dict):
60
  """Dumb mock of httplib2.Response.
61

  
62
  """
63

  
64
  def __init__(self, status):
65
    self.status = status
66
    self['status'] = status
67

  
68

  
69
class HttpMock(BaseHttp):
70
  """Mock for httplib.Http.
71

  
72
  """
73

  
74
  def __init__(self, rapi):
75
    self._rapi = rapi
76
    self._last_request = None
77

  
78
  last_request_url = property(lambda self: self._last_request[0])
79
  last_request_method = property(lambda self: self._last_request[1])
80
  last_request_body = property(lambda self: self._last_request[2])
81

  
82
  def request(self, url, method, body, headers):
83
    self._last_request = (url, method, body)
84
    code, resp_body = self._rapi.FetchResponse(_GetPathFromUri(url), method)
85
    return HttpResponseMock(code), resp_body
86

  
87

  
88
class RapiMock(object):
89

  
90
  def __init__(self):
91
    self._mapper = connector.Mapper()
92
    self._responses = []
93
    self._last_handler = None
94

  
95
  def AddResponse(self, response):
96
    self._responses.insert(0, response)
97

  
98
  def PopResponse(self):
99
    if len(self._responses) > 0:
100
      return self._responses.pop()
101
    else:
102
      return None
103

  
104
  def GetLastHandler(self):
105
    return self._last_handler
106

  
107
  def FetchResponse(self, path, method):
108
    code = 200
109
    response = None
110

  
111
    try:
112
      HandlerClass, items, args = self._mapper.getController(path)
113
      self._last_handler = HandlerClass(items, args, None)
114
      if not hasattr(self._last_handler, method.upper()):
115
        code = 400
116
        response = "Bad request"
117
    except http.HttpException, ex:
118
      code = ex.code
119
      response = ex.message
120

  
121
    if not response:
122
      response = self.PopResponse()
123

  
124
    return code, response
125

  
126

  
127
class RapiMockTest(unittest.TestCase):
128

  
129
  def test(self):
130
    rapi = RapiMock()
131
    path = "/version"
132
    self.assertEqual((404, None), rapi.FetchResponse("/foo", "GET"))
133
    self.assertEqual((400, "Bad request"),
134
                     rapi.FetchResponse("/version", "POST"))
135
    rapi.AddResponse("2")
136
    code, response = rapi.FetchResponse("/version", "GET")
137
    self.assertEqual(200, code)
138
    self.assertEqual("2", response)
139
    self.failUnless(isinstance(rapi.GetLastHandler(), rlib2.R_version))
140

  
141

  
142
class GanetiRapiClientTests(unittest.TestCase):
143
  """Tests for remote API client.
144

  
145
  """
146

  
147
  def setUp(self):
148
    # Monkey-patch a fake VerifyCertificate function
149
    self._verify_certificate = client._VerifyCertificate
150
    client._VerifyCertificate = lambda x, y, z: True
151

  
152
    self.rapi = RapiMock()
153
    self.http = HttpMock(self.rapi)
154
    self.client = client.GanetiRapiClient('master.foo.com')
155
    self.client._http = self.http
156
    # Hard-code the version for easier testing.
157
    self.client._version = 2
158

  
159
  def tearDown(self):
160
    # Un-do the monkey-patch
161
    client._VerifyCertificate = self._verify_certificate
162

  
163
  def assertHandler(self, handler_cls):
164
    self.failUnless(isinstance(self.rapi.GetLastHandler(), handler_cls))
165

  
166
  def assertQuery(self, key, value):
167
    self.assertEqual(value, self.rapi.GetLastHandler().queryargs.get(key, None))
168

  
169
  def assertItems(self, items):
170
    self.assertEqual(items, self.rapi.GetLastHandler().items)
171

  
172
  def assertBulk(self):
173
    self.assertTrue(self.rapi.GetLastHandler().useBulk())
174

  
175
  def assertDryRun(self):
176
    self.assertTrue(self.rapi.GetLastHandler().dryRun())
177

  
178
  def testGetVersion(self):
179
    self.client._version = None
180
    self.rapi.AddResponse("2")
181
    self.assertEqual(2, self.client.GetVersion())
182
    self.assertHandler(rlib2.R_version)
183

  
184
  def testGetOperatingSystems(self):
185
    self.rapi.AddResponse("[\"beos\"]")
186
    self.assertEqual(["beos"], self.client.GetOperatingSystems())
187
    self.assertHandler(rlib2.R_2_os)
188

  
189
  def testGetClusterTags(self):
190
    self.rapi.AddResponse("[\"tag\"]")
191
    self.assertEqual(["tag"], self.client.GetClusterTags())
192
    self.assertHandler(rlib2.R_2_tags)
193

  
194
  def testAddClusterTags(self):
195
    self.client.AddClusterTags(["awesome"], dry_run=True)
196
    self.assertHandler(rlib2.R_2_tags)
197
    self.assertDryRun()
198
    self.assertQuery("tag", ["awesome"])
199

  
200
  def testDeleteClusterTags(self):
201
    self.client.DeleteClusterTags(["awesome"], dry_run=True)
202
    self.assertHandler(rlib2.R_2_tags)
203
    self.assertDryRun()
204
    self.assertQuery("tag", ["awesome"])
205

  
206
  def testGetInfo(self):
207
    self.rapi.AddResponse("{}")
208
    self.assertEqual({}, self.client.GetInfo())
209
    self.assertHandler(rlib2.R_2_info)
210

  
211
  def testGetInstances(self):
212
    self.rapi.AddResponse("[]")
213
    self.assertEqual([], self.client.GetInstances(bulk=True))
214
    self.assertHandler(rlib2.R_2_instances)
215
    self.assertBulk()
216

  
217
  def testGetInstanceInfo(self):
218
    self.rapi.AddResponse("[]")
219
    self.assertEqual([], self.client.GetInstanceInfo("instance"))
220
    self.assertHandler(rlib2.R_2_instances_name)
221
    self.assertItems(["instance"])
222

  
223
  def testCreateInstance(self):
224
    self.rapi.AddResponse("1234")
225
    self.assertEqual(1234, self.client.CreateInstance(dry_run=True))
226
    self.assertHandler(rlib2.R_2_instances)
227
    self.assertDryRun()
228

  
229
  def testDeleteInstance(self):
230
    self.client.DeleteInstance("instance", dry_run=True)
231
    self.assertHandler(rlib2.R_2_instances_name)
232
    self.assertItems(["instance"])
233
    self.assertDryRun()
234

  
235
  def testGetInstanceTags(self):
236
    self.rapi.AddResponse("[]")
237
    self.assertEqual([], self.client.GetInstanceTags("fooinstance"))
238
    self.assertHandler(rlib2.R_2_instances_name_tags)
239
    self.assertItems(["fooinstance"])
240

  
241
  def testAddInstanceTags(self):
242
    self.client.AddInstanceTags("fooinstance", ["awesome"], dry_run=True)
243
    self.assertHandler(rlib2.R_2_instances_name_tags)
244
    self.assertItems(["fooinstance"])
245
    self.assertDryRun()
246
    self.assertQuery("tag", ["awesome"])
247

  
248
  def testDeleteInstanceTags(self):
249
    self.client.DeleteInstanceTags("foo", ["awesome"], dry_run=True)
250
    self.assertHandler(rlib2.R_2_instances_name_tags)
251
    self.assertItems(["foo"])
252
    self.assertDryRun()
253
    self.assertQuery("tag", ["awesome"])
254

  
255
  def testRebootInstance(self):
256
    self.client.RebootInstance("i-bar", reboot_type="hard",
257
                               ignore_secondaries=True, dry_run=True)
258
    self.assertHandler(rlib2.R_2_instances_name_reboot)
259
    self.assertItems(["i-bar"])
260
    self.assertDryRun()
261
    self.assertQuery("type", ["hard"])
262
    self.assertQuery("ignore_secondaries", ["True"])
263

  
264
  def testShutdownInstance(self):
265
    self.client.ShutdownInstance("foo-instance", dry_run=True)
266
    self.assertHandler(rlib2.R_2_instances_name_shutdown)
267
    self.assertItems(["foo-instance"])
268
    self.assertDryRun()
269

  
270
  def testStartupInstance(self):
271
    self.client.StartupInstance("bar-instance", dry_run=True)
272
    self.assertHandler(rlib2.R_2_instances_name_startup)
273
    self.assertItems(["bar-instance"])
274
    self.assertDryRun()
275

  
276
  def testReinstallInstance(self):
277
    self.client.ReinstallInstance("baz-instance", "DOS", no_startup=True)
278
    self.assertHandler(rlib2.R_2_instances_name_reinstall)
279
    self.assertItems(["baz-instance"])
280
    self.assertQuery("os", ["DOS"])
281
    self.assertQuery("nostartup", ["1"])
282

  
283
  def testReplaceInstanceDisks(self):
284
    self.rapi.AddResponse("999")
285
    job_id = self.client.ReplaceInstanceDisks("instance-name",
286
        ["hda", "hdc"], dry_run=True)
287
    self.assertEqual(999, job_id)
288
    self.assertHandler(rlib2.R_2_instances_name_replace_disks)
289
    self.assertItems(["instance-name"])
290
    self.assertQuery("disks", ["hda,hdc"])
291
    self.assertQuery("mode", ["replace_auto"])
292
    self.assertQuery("iallocator", ["hail"])
293
    self.assertDryRun()
294

  
295
    self.assertRaises(client.InvalidReplacementMode,
296
                      self.client.ReplaceInstanceDisks,
297
                      "instance_a", ["hda"], mode="invalid_mode")
298
    self.assertRaises(client.GanetiApiError,
299
                      self.client.ReplaceInstanceDisks,
300
                      "instance-foo", ["hda"], mode="replace_on_secondary")
301

  
302
    self.rapi.AddResponse("1000")
303
    job_id = self.client.ReplaceInstanceDisks("instance-bar",
304
        ["hda"], mode="replace_on_secondary", remote_node="foo-node",
305
        dry_run=True)
306
    self.assertEqual(1000, job_id)
307
    self.assertItems(["instance-bar"])
308
    self.assertQuery("disks", ["hda"])
309
    self.assertQuery("remote_node", ["foo-node"])
310
    self.assertDryRun()
311

  
312
  def testGetJobs(self):
313
    self.rapi.AddResponse("[ { \"id\": \"123\", \"uri\": \"\/2\/jobs\/123\" },"
314
                          " { \"id\": \"124\", \"uri\": \"\2\/jobs\/124\" } ]")
315
    self.assertEqual([123, 124], self.client.GetJobs())
316
    self.assertHandler(rlib2.R_2_jobs)
317

  
318
  def testGetJobStatus(self):
319
    self.rapi.AddResponse("{\"foo\": \"bar\"}")
320
    self.assertEqual({"foo": "bar"}, self.client.GetJobStatus(1234))
321
    self.assertHandler(rlib2.R_2_jobs_id)
322
    self.assertItems(["1234"])
323

  
324
  def testDeleteJob(self):
325
    self.client.DeleteJob(999, dry_run=True)
326
    self.assertHandler(rlib2.R_2_jobs_id)
327
    self.assertItems(["999"])
328
    self.assertDryRun()
329

  
330
  def testGetNodes(self):
331
    self.rapi.AddResponse("[ { \"id\": \"node1\", \"uri\": \"uri1\" },"
332
                          " { \"id\": \"node2\", \"uri\": \"uri2\" } ]")
333
    self.assertEqual(["node1", "node2"], self.client.GetNodes())
334
    self.assertHandler(rlib2.R_2_nodes)
335

  
336
    self.rapi.AddResponse("[ { \"id\": \"node1\", \"uri\": \"uri1\" },"
337
                          " { \"id\": \"node2\", \"uri\": \"uri2\" } ]")
338
    self.assertEqual([{"id": "node1", "uri": "uri1"},
339
                      {"id": "node2", "uri": "uri2"}],
340
                     self.client.GetNodes(bulk=True))
341
    self.assertHandler(rlib2.R_2_nodes)
342
    self.assertBulk()
343

  
344
  def testGetNodeInfo(self):
345
    self.rapi.AddResponse("{}")
346
    self.assertEqual({}, self.client.GetNodeInfo("node-foo"))
347
    self.assertHandler(rlib2.R_2_nodes_name)
348
    self.assertItems(["node-foo"])
349

  
350
  def testEvacuateNode(self):
351
    self.rapi.AddResponse("9876")
352
    job_id = self.client.EvacuateNode("node-1", remote_node="node-2")
353
    self.assertEqual(9876, job_id)
354
    self.assertHandler(rlib2.R_2_nodes_name_evacuate)
355
    self.assertItems(["node-1"])
356
    self.assertQuery("remote_node", ["node-2"])
357

  
358
    self.rapi.AddResponse("8888")
359
    job_id = self.client.EvacuateNode("node-3", iallocator="hail", dry_run=True)
360
    self.assertEqual(8888, job_id)
361
    self.assertItems(["node-3"])
362
    self.assertQuery("iallocator", ["hail"])
363
    self.assertDryRun()
364

  
365
    self.assertRaises(client.GanetiApiError,
366
                      self.client.EvacuateNode,
367
                      "node-4", iallocator="hail", remote_node="node-5")
368

  
369
  def testMigrateNode(self):
370
    self.rapi.AddResponse("1111")
371
    self.assertEqual(1111, self.client.MigrateNode("node-a", dry_run=True))
372
    self.assertHandler(rlib2.R_2_nodes_name_migrate)
373
    self.assertItems(["node-a"])
374
    self.assertQuery("live", ["1"])
375
    self.assertDryRun()
376

  
377
  def testGetNodeRole(self):
378
    self.rapi.AddResponse("\"master\"")
379
    self.assertEqual("master", self.client.GetNodeRole("node-a"))
380
    self.assertHandler(rlib2.R_2_nodes_name_role)
381
    self.assertItems(["node-a"])
382

  
383
  def testSetNodeRole(self):
384
    self.rapi.AddResponse("789")
385
    self.assertEqual(789,
386
        self.client.SetNodeRole("node-foo", "master-candidate", force=True))
387
    self.assertHandler(rlib2.R_2_nodes_name_role)
388
    self.assertItems(["node-foo"])
389
    self.assertQuery("force", ["True"])
390
    self.assertEqual("master-candidate", self.http.last_request_body)
391

  
392
    self.assertRaises(client.InvalidNodeRole,
393
                      self.client.SetNodeRole, "node-bar", "fake-role")
394

  
395
  def testGetNodeStorageUnits(self):
396
    self.rapi.AddResponse("42")
397
    self.assertEqual(42,
398
        self.client.GetNodeStorageUnits("node-x", "lvm-pv", "fields"))
399
    self.assertHandler(rlib2.R_2_nodes_name_storage)
400
    self.assertItems(["node-x"])
401
    self.assertQuery("storage_type", ["lvm-pv"])
402
    self.assertQuery("output_fields", ["fields"])
403

  
404
    self.assertRaises(client.InvalidStorageType,
405
                      self.client.GetNodeStorageUnits,
406
                      "node-y", "floppy-disk", "fields")
407

  
408
  def testModifyNodeStorageUnits(self):
409
    self.rapi.AddResponse("14")
410
    self.assertEqual(14,
411
        self.client.ModifyNodeStorageUnits("node-z", "lvm-pv", "hda"))
412
    self.assertHandler(rlib2.R_2_nodes_name_storage_modify)
413
    self.assertItems(["node-z"])
414
    self.assertQuery("storage_type", ["lvm-pv"])
415
    self.assertQuery("name", ["hda"])
416

  
417
    self.assertRaises(client.InvalidStorageType,
418
                      self.client.ModifyNodeStorageUnits,
419
                      "node-n", "floppy-disk", "hdc")
420

  
421
  def testGetNodeTags(self):
422
    self.rapi.AddResponse("[\"fry\", \"bender\"]")
423
    self.assertEqual(["fry", "bender"], self.client.GetNodeTags("node-k"))
424
    self.assertHandler(rlib2.R_2_nodes_name_tags)
425
    self.assertItems(["node-k"])
426

  
427
  def testAddNodeTags(self):
428
    self.client.AddNodeTags("node-v", ["awesome"], dry_run=True)
429
    self.assertHandler(rlib2.R_2_nodes_name_tags)
430
    self.assertItems(["node-v"])
431
    self.assertDryRun()
432
    self.assertQuery("tag", ["awesome"])
433

  
434
  def testDeleteNodeTags(self):
435
    self.client.DeleteNodeTags("node-w", ["awesome"], dry_run=True)
436
    self.assertHandler(rlib2.R_2_nodes_name_tags)
437
    self.assertItems(["node-w"])
438
    self.assertDryRun()
439
    self.assertQuery("tag", ["awesome"])
440

  
441

  
442
if __name__ == '__main__':
443
  if httplib2 is None:
444
    warnings.warn("These tests require the httplib2 library")
445
  else:
446
    testutils.GanetiTestProgram()

Also available in: Unified diff