Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ 63c73073

History | View | Annotate | Download (16.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the unix socket protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
import socket
33
import collections
34
import time
35
import errno
36
import logging
37

    
38
from ganeti import compat
39
from ganeti import serializer
40
from ganeti import constants
41
from ganeti import errors
42
from ganeti import utils
43
from ganeti import objects
44
from ganeti import pathutils
45

    
46

    
47
KEY_METHOD = "method"
48
KEY_ARGS = "args"
49
KEY_SUCCESS = "success"
50
KEY_RESULT = "result"
51
KEY_VERSION = "version"
52

    
53
REQ_SUBMIT_JOB = "SubmitJob"
54
REQ_SUBMIT_MANY_JOBS = "SubmitManyJobs"
55
REQ_WAIT_FOR_JOB_CHANGE = "WaitForJobChange"
56
REQ_CANCEL_JOB = "CancelJob"
57
REQ_ARCHIVE_JOB = "ArchiveJob"
58
REQ_CHANGE_JOB_PRIORITY = "ChangeJobPriority"
59
REQ_AUTO_ARCHIVE_JOBS = "AutoArchiveJobs"
60
REQ_QUERY = "Query"
61
REQ_QUERY_FIELDS = "QueryFields"
62
REQ_QUERY_JOBS = "QueryJobs"
63
REQ_QUERY_INSTANCES = "QueryInstances"
64
REQ_QUERY_NODES = "QueryNodes"
65
REQ_QUERY_GROUPS = "QueryGroups"
66
REQ_QUERY_NETWORKS = "QueryNetworks"
67
REQ_QUERY_EXPORTS = "QueryExports"
68
REQ_QUERY_CONFIG_VALUES = "QueryConfigValues"
69
REQ_QUERY_CLUSTER_INFO = "QueryClusterInfo"
70
REQ_QUERY_TAGS = "QueryTags"
71
REQ_SET_DRAIN_FLAG = "SetDrainFlag"
72
REQ_SET_WATCHER_PAUSE = "SetWatcherPause"
73

    
74
#: List of all LUXI requests
75
REQ_ALL = compat.UniqueFrozenset([
76
  REQ_ARCHIVE_JOB,
77
  REQ_AUTO_ARCHIVE_JOBS,
78
  REQ_CANCEL_JOB,
79
  REQ_CHANGE_JOB_PRIORITY,
80
  REQ_QUERY,
81
  REQ_QUERY_CLUSTER_INFO,
82
  REQ_QUERY_CONFIG_VALUES,
83
  REQ_QUERY_EXPORTS,
84
  REQ_QUERY_FIELDS,
85
  REQ_QUERY_GROUPS,
86
  REQ_QUERY_INSTANCES,
87
  REQ_QUERY_JOBS,
88
  REQ_QUERY_NODES,
89
  REQ_QUERY_TAGS,
90
  REQ_SET_DRAIN_FLAG,
91
  REQ_SET_WATCHER_PAUSE,
92
  REQ_SUBMIT_JOB,
93
  REQ_SUBMIT_MANY_JOBS,
94
  REQ_WAIT_FOR_JOB_CHANGE,
95
  ])
96

    
97
DEF_CTMO = 10
98
DEF_RWTO = 60
99

    
100
# WaitForJobChange timeout
101
WFJC_TIMEOUT = (DEF_RWTO - 1) / 2
102

    
103

    
104
class ProtocolError(errors.LuxiError):
105
  """Denotes an error in the LUXI protocol."""
106

    
107

    
108
class ConnectionClosedError(ProtocolError):
109
  """Connection closed error."""
110

    
111

    
112
class TimeoutError(ProtocolError):
113
  """Operation timeout error."""
114

    
115

    
116
class RequestError(ProtocolError):
117
  """Error on request.
118

119
  This signifies an error in the request format or request handling,
120
  but not (e.g.) an error in starting up an instance.
121

122
  Some common conditions that can trigger this exception:
123
    - job submission failed because the job data was wrong
124
    - query failed because required fields were missing
125

126
  """
127

    
128

    
129
class NoMasterError(ProtocolError):
130
  """The master cannot be reached.
131

132
  This means that the master daemon is not running or the socket has
133
  been removed.
134

135
  """
136

    
137

    
138
class PermissionError(ProtocolError):
139
  """Permission denied while connecting to the master socket.
140

141
  This means the user doesn't have the proper rights.
142

143
  """
144

    
145

    
146
class Transport:
147
  """Low-level transport class.
148

149
  This is used on the client side.
150

151
  This could be replace by any other class that provides the same
152
  semantics to the Client. This means:
153
    - can send messages and receive messages
154
    - safe for multithreading
155

156
  """
157

    
158
  def __init__(self, address, timeouts=None):
159
    """Constructor for the Client class.
160

161
    Arguments:
162
      - address: a valid address the the used transport class
163
      - timeout: a list of timeouts, to be used on connect and read/write
164

165
    There are two timeouts used since we might want to wait for a long
166
    time for a response, but the connect timeout should be lower.
167

168
    If not passed, we use a default of 10 and respectively 60 seconds.
169

170
    Note that on reading data, since the timeout applies to an
171
    invidual receive, it might be that the total duration is longer
172
    than timeout value passed (we make a hard limit at twice the read
173
    timeout).
174

175
    """
176
    self.address = address
177
    if timeouts is None:
178
      self._ctimeout, self._rwtimeout = DEF_CTMO, DEF_RWTO
179
    else:
180
      self._ctimeout, self._rwtimeout = timeouts
181

    
182
    self.socket = None
183
    self._buffer = ""
184
    self._msgs = collections.deque()
185

    
186
    try:
187
      self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
188

    
189
      # Try to connect
190
      try:
191
        utils.Retry(self._Connect, 1.0, self._ctimeout,
192
                    args=(self.socket, address, self._ctimeout))
193
      except utils.RetryTimeout:
194
        raise TimeoutError("Connect timed out")
195

    
196
      self.socket.settimeout(self._rwtimeout)
197
    except (socket.error, NoMasterError):
198
      if self.socket is not None:
199
        self.socket.close()
200
      self.socket = None
201
      raise
202

    
203
  @staticmethod
204
  def _Connect(sock, address, timeout):
205
    sock.settimeout(timeout)
206
    try:
207
      sock.connect(address)
208
    except socket.timeout, err:
209
      raise TimeoutError("Connect timed out: %s" % str(err))
210
    except socket.error, err:
211
      error_code = err.args[0]
212
      if error_code in (errno.ENOENT, errno.ECONNREFUSED):
213
        raise NoMasterError(address)
214
      elif error_code in (errno.EPERM, errno.EACCES):
215
        raise PermissionError(address)
216
      elif error_code == errno.EAGAIN:
217
        # Server's socket backlog is full at the moment
218
        raise utils.RetryAgain()
219
      raise
220

    
221
  def _CheckSocket(self):
222
    """Make sure we are connected.
223

224
    """
225
    if self.socket is None:
226
      raise ProtocolError("Connection is closed")
227

    
228
  def Send(self, msg):
229
    """Send a message.
230

231
    This just sends a message and doesn't wait for the response.
232

233
    """
234
    if constants.LUXI_EOM in msg:
235
      raise ProtocolError("Message terminator found in payload")
236

    
237
    self._CheckSocket()
238
    try:
239
      # TODO: sendall is not guaranteed to send everything
240
      self.socket.sendall(msg + constants.LUXI_EOM)
241
    except socket.timeout, err:
242
      raise TimeoutError("Sending timeout: %s" % str(err))
243

    
244
  def Recv(self):
245
    """Try to receive a message from the socket.
246

247
    In case we already have messages queued, we just return from the
248
    queue. Otherwise, we try to read data with a _rwtimeout network
249
    timeout, and making sure we don't go over 2x_rwtimeout as a global
250
    limit.
251

252
    """
253
    self._CheckSocket()
254
    etime = time.time() + self._rwtimeout
255
    while not self._msgs:
256
      if time.time() > etime:
257
        raise TimeoutError("Extended receive timeout")
258
      while True:
259
        try:
260
          data = self.socket.recv(4096)
261
        except socket.timeout, err:
262
          raise TimeoutError("Receive timeout: %s" % str(err))
263
        except socket.error, err:
264
          if err.args and err.args[0] == errno.EAGAIN:
265
            continue
266
          raise
267
        break
268
      if not data:
269
        raise ConnectionClosedError("Connection closed while reading")
270
      new_msgs = (self._buffer + data).split(constants.LUXI_EOM)
271
      self._buffer = new_msgs.pop()
272
      self._msgs.extend(new_msgs)
273
    return self._msgs.popleft()
274

    
275
  def Call(self, msg):
276
    """Send a message and wait for the response.
277

278
    This is just a wrapper over Send and Recv.
279

280
    """
281
    self.Send(msg)
282
    return self.Recv()
283

    
284
  def Close(self):
285
    """Close the socket"""
286
    if self.socket is not None:
287
      self.socket.close()
288
      self.socket = None
289

    
290

    
291
def ParseRequest(msg):
292
  """Parses a LUXI request message.
293

294
  """
295
  try:
296
    request = serializer.LoadJson(msg)
297
  except ValueError, err:
298
    raise ProtocolError("Invalid LUXI request (parsing error): %s" % err)
299

    
300
  logging.debug("LUXI request: %s", request)
301

    
302
  if not isinstance(request, dict):
303
    logging.error("LUXI request not a dict: %r", msg)
304
    raise ProtocolError("Invalid LUXI request (not a dict)")
305

    
306
  method = request.get(KEY_METHOD, None) # pylint: disable=E1103
307
  args = request.get(KEY_ARGS, None) # pylint: disable=E1103
308
  version = request.get(KEY_VERSION, None) # pylint: disable=E1103
309

    
310
  if method is None or args is None:
311
    logging.error("LUXI request missing method or arguments: %r", msg)
312
    raise ProtocolError(("Invalid LUXI request (no method or arguments"
313
                         " in request): %r") % msg)
314

    
315
  return (method, args, version)
316

    
317

    
318
def ParseResponse(msg):
319
  """Parses a LUXI response message.
320

321
  """
322
  # Parse the result
323
  try:
324
    data = serializer.LoadJson(msg)
325
  except KeyboardInterrupt:
326
    raise
327
  except Exception, err:
328
    raise ProtocolError("Error while deserializing response: %s" % str(err))
329

    
330
  # Validate response
331
  if not (isinstance(data, dict) and
332
          KEY_SUCCESS in data and
333
          KEY_RESULT in data):
334
    raise ProtocolError("Invalid response from server: %r" % data)
335

    
336
  return (data[KEY_SUCCESS], data[KEY_RESULT],
337
          data.get(KEY_VERSION, None)) # pylint: disable=E1103
338

    
339

    
340
def FormatResponse(success, result, version=None):
341
  """Formats a LUXI response message.
342

343
  """
344
  response = {
345
    KEY_SUCCESS: success,
346
    KEY_RESULT: result,
347
    }
348

    
349
  if version is not None:
350
    response[KEY_VERSION] = version
351

    
352
  logging.debug("LUXI response: %s", response)
353

    
354
  return serializer.DumpJson(response)
355

    
356

    
357
def FormatRequest(method, args, version=None):
358
  """Formats a LUXI request message.
359

360
  """
361
  # Build request
362
  request = {
363
    KEY_METHOD: method,
364
    KEY_ARGS: args,
365
    }
366

    
367
  if version is not None:
368
    request[KEY_VERSION] = version
369

    
370
  # Serialize the request
371
  return serializer.DumpJson(request)
372

    
373

    
374
def CallLuxiMethod(transport_cb, method, args, version=None):
375
  """Send a LUXI request via a transport and return the response.
376

377
  """
378
  assert callable(transport_cb)
379

    
380
  request_msg = FormatRequest(method, args, version=version)
381

    
382
  # Send request and wait for response
383
  response_msg = transport_cb(request_msg)
384

    
385
  (success, result, resp_version) = ParseResponse(response_msg)
386

    
387
  # Verify version if there was one in the response
388
  if resp_version is not None and resp_version != version:
389
    raise errors.LuxiError("LUXI version mismatch, client %s, response %s" %
390
                           (version, resp_version))
391

    
392
  if success:
393
    return result
394

    
395
  errors.MaybeRaise(result)
396
  raise RequestError(result)
397

    
398

    
399
class Client(object):
400
  """High-level client implementation.
401

402
  This uses a backing Transport-like class on top of which it
403
  implements data serialization/deserialization.
404

405
  """
406
  def __init__(self, address=None, timeouts=None, transport=Transport):
407
    """Constructor for the Client class.
408

409
    Arguments:
410
      - address: a valid address the the used transport class
411
      - timeout: a list of timeouts, to be used on connect and read/write
412
      - transport: a Transport-like class
413

414

415
    If timeout is not passed, the default timeouts of the transport
416
    class are used.
417

418
    """
419
    if address is None:
420
      address = pathutils.MASTER_SOCKET
421
    self.address = address
422
    self.timeouts = timeouts
423
    self.transport_class = transport
424
    self.transport = None
425
    self._InitTransport()
426

    
427
  def _InitTransport(self):
428
    """(Re)initialize the transport if needed.
429

430
    """
431
    if self.transport is None:
432
      self.transport = self.transport_class(self.address,
433
                                            timeouts=self.timeouts)
434

    
435
  def _CloseTransport(self):
436
    """Close the transport, ignoring errors.
437

438
    """
439
    if self.transport is None:
440
      return
441
    try:
442
      old_transp = self.transport
443
      self.transport = None
444
      old_transp.Close()
445
    except Exception: # pylint: disable=W0703
446
      pass
447

    
448
  def _SendMethodCall(self, data):
449
    # Send request and wait for response
450
    try:
451
      self._InitTransport()
452
      return self.transport.Call(data)
453
    except Exception:
454
      self._CloseTransport()
455
      raise
456

    
457
  def Close(self):
458
    """Close the underlying connection.
459

460
    """
461
    self._CloseTransport()
462

    
463
  def CallMethod(self, method, args):
464
    """Send a generic request and return the response.
465

466
    """
467
    if not isinstance(args, (list, tuple)):
468
      raise errors.ProgrammerError("Invalid parameter passed to CallMethod:"
469
                                   " expected list, got %s" % type(args))
470
    return CallLuxiMethod(self._SendMethodCall, method, args,
471
                          version=constants.LUXI_VERSION)
472

    
473
  def SetQueueDrainFlag(self, drain_flag):
474
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
475

    
476
  def SetWatcherPause(self, until):
477
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
478

    
479
  def SubmitJob(self, ops):
480
    ops_state = map(lambda op: op.__getstate__(), ops)
481
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
482

    
483
  def SubmitManyJobs(self, jobs):
484
    jobs_state = []
485
    for ops in jobs:
486
      jobs_state.append([op.__getstate__() for op in ops])
487
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
488

    
489
  def CancelJob(self, job_id):
490
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
491

    
492
  def ArchiveJob(self, job_id):
493
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
494

    
495
  def ChangeJobPriority(self, job_id, priority):
496
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
497

    
498
  def AutoArchiveJobs(self, age):
499
    timeout = (DEF_RWTO - 1) / 2
500
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
501

    
502
  def WaitForJobChangeOnce(self, job_id, fields,
503
                           prev_job_info, prev_log_serial,
504
                           timeout=WFJC_TIMEOUT):
505
    """Waits for changes on a job.
506

507
    @param job_id: Job ID
508
    @type fields: list
509
    @param fields: List of field names to be observed
510
    @type prev_job_info: None or list
511
    @param prev_job_info: Previously received job information
512
    @type prev_log_serial: None or int/long
513
    @param prev_log_serial: Highest log serial number previously received
514
    @type timeout: int/float
515
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
516
                    be capped to that value)
517

518
    """
519
    assert timeout >= 0, "Timeout can not be negative"
520
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
521
                           (job_id, fields, prev_job_info,
522
                            prev_log_serial,
523
                            min(WFJC_TIMEOUT, timeout)))
524

    
525
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
526
    while True:
527
      result = self.WaitForJobChangeOnce(job_id, fields,
528
                                         prev_job_info, prev_log_serial)
529
      if result != constants.JOB_NOTCHANGED:
530
        break
531
    return result
532

    
533
  def Query(self, what, fields, qfilter):
534
    """Query for resources/items.
535

536
    @param what: One of L{constants.QR_VIA_LUXI}
537
    @type fields: List of strings
538
    @param fields: List of requested fields
539
    @type qfilter: None or list
540
    @param qfilter: Query filter
541
    @rtype: L{objects.QueryResponse}
542

543
    """
544
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
545
    return objects.QueryResponse.FromDict(result)
546

    
547
  def QueryFields(self, what, fields):
548
    """Query for available fields.
549

550
    @param what: One of L{constants.QR_VIA_LUXI}
551
    @type fields: None or list of strings
552
    @param fields: List of requested fields
553
    @rtype: L{objects.QueryFieldsResponse}
554

555
    """
556
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
557
    return objects.QueryFieldsResponse.FromDict(result)
558

    
559
  def QueryJobs(self, job_ids, fields):
560
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
561

    
562
  def QueryInstances(self, names, fields, use_locking):
563
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
564

    
565
  def QueryNodes(self, names, fields, use_locking):
566
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
567

    
568
  def QueryGroups(self, names, fields, use_locking):
569
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
570

    
571
  def QueryNetworks(self, names, fields, use_locking):
572
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
573

    
574
  def QueryExports(self, nodes, use_locking):
575
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
576

    
577
  def QueryClusterInfo(self):
578
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
579

    
580
  def QueryConfigValues(self, fields):
581
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
582

    
583
  def QueryTags(self, kind, name):
584
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))