Statistics
| Branch: | Tag: | Revision:

root / lib / luxi.py @ fc6ccde4

History | View | Annotate | Download (7.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Module for the LUXI protocol
23

24
This module implements the local unix socket protocol. You only need
25
this module and the opcodes module in the client program in order to
26
communicate with the master.
27

28
The module is also used by the master daemon.
29

30
"""
31

    
32
from ganeti import constants
33
from ganeti import objects
34
import ganeti.rpc.client as cl
35
from ganeti.rpc.transport import Transport
36

    
37
__all__ = [
38
  # classes:
39
  "Client"
40
  ]
41

    
42
REQ_SUBMIT_JOB = constants.LUXI_REQ_SUBMIT_JOB
43
REQ_SUBMIT_JOB_TO_DRAINED_QUEUE = constants.LUXI_REQ_SUBMIT_JOB_TO_DRAINED_QUEUE
44
REQ_SUBMIT_MANY_JOBS = constants.LUXI_REQ_SUBMIT_MANY_JOBS
45
REQ_PICKUP_JOB = constants.LUXI_REQ_PICKUP_JOB
46
REQ_WAIT_FOR_JOB_CHANGE = constants.LUXI_REQ_WAIT_FOR_JOB_CHANGE
47
REQ_CANCEL_JOB = constants.LUXI_REQ_CANCEL_JOB
48
REQ_ARCHIVE_JOB = constants.LUXI_REQ_ARCHIVE_JOB
49
REQ_CHANGE_JOB_PRIORITY = constants.LUXI_REQ_CHANGE_JOB_PRIORITY
50
REQ_AUTO_ARCHIVE_JOBS = constants.LUXI_REQ_AUTO_ARCHIVE_JOBS
51
REQ_QUERY = constants.LUXI_REQ_QUERY
52
REQ_QUERY_FIELDS = constants.LUXI_REQ_QUERY_FIELDS
53
REQ_QUERY_JOBS = constants.LUXI_REQ_QUERY_JOBS
54
REQ_QUERY_INSTANCES = constants.LUXI_REQ_QUERY_INSTANCES
55
REQ_QUERY_NODES = constants.LUXI_REQ_QUERY_NODES
56
REQ_QUERY_GROUPS = constants.LUXI_REQ_QUERY_GROUPS
57
REQ_QUERY_NETWORKS = constants.LUXI_REQ_QUERY_NETWORKS
58
REQ_QUERY_EXPORTS = constants.LUXI_REQ_QUERY_EXPORTS
59
REQ_QUERY_CONFIG_VALUES = constants.LUXI_REQ_QUERY_CONFIG_VALUES
60
REQ_QUERY_CLUSTER_INFO = constants.LUXI_REQ_QUERY_CLUSTER_INFO
61
REQ_QUERY_TAGS = constants.LUXI_REQ_QUERY_TAGS
62
REQ_SET_DRAIN_FLAG = constants.LUXI_REQ_SET_DRAIN_FLAG
63
REQ_SET_WATCHER_PAUSE = constants.LUXI_REQ_SET_WATCHER_PAUSE
64
REQ_ALL = constants.LUXI_REQ_ALL
65

    
66
DEF_RWTO = constants.LUXI_DEF_RWTO
67
WFJC_TIMEOUT = constants.LUXI_WFJC_TIMEOUT
68

    
69

    
70
class Client(cl.AbstractClient):
71
  """High-level client implementation.
72

73
  This uses a backing Transport-like class on top of which it
74
  implements data serialization/deserialization.
75

76
  """
77
  def __init__(self, address=None, timeouts=None, transport=Transport):
78
    """Constructor for the Client class.
79

80
    Arguments are the same as for L{AbstractClient}.
81

82
    """
83
    super(Client, self).__init__(address, timeouts, transport)
84
    # Override the version of the protocol:
85
    self.version = constants.LUXI_VERSION
86

    
87
  def SetQueueDrainFlag(self, drain_flag):
88
    return self.CallMethod(REQ_SET_DRAIN_FLAG, (drain_flag, ))
89

    
90
  def SetWatcherPause(self, until):
91
    return self.CallMethod(REQ_SET_WATCHER_PAUSE, (until, ))
92

    
93
  def PickupJob(self, job):
94
    return self.CallMethod(REQ_PICKUP_JOB, (job,))
95

    
96
  def SubmitJob(self, ops):
97
    ops_state = map(lambda op: op.__getstate__(), ops)
98
    return self.CallMethod(REQ_SUBMIT_JOB, (ops_state, ))
99

    
100
  def SubmitJobToDrainedQueue(self, ops):
101
    ops_state = map(lambda op: op.__getstate__(), ops)
102
    return self.CallMethod(REQ_SUBMIT_JOB_TO_DRAINED_QUEUE, (ops_state, ))
103

    
104
  def SubmitManyJobs(self, jobs):
105
    jobs_state = []
106
    for ops in jobs:
107
      jobs_state.append([op.__getstate__() for op in ops])
108
    return self.CallMethod(REQ_SUBMIT_MANY_JOBS, (jobs_state, ))
109

    
110
  def CancelJob(self, job_id):
111
    return self.CallMethod(REQ_CANCEL_JOB, (job_id, ))
112

    
113
  def ArchiveJob(self, job_id):
114
    return self.CallMethod(REQ_ARCHIVE_JOB, (job_id, ))
115

    
116
  def ChangeJobPriority(self, job_id, priority):
117
    return self.CallMethod(REQ_CHANGE_JOB_PRIORITY, (job_id, priority))
118

    
119
  def AutoArchiveJobs(self, age):
120
    timeout = (DEF_RWTO - 1) / 2
121
    return self.CallMethod(REQ_AUTO_ARCHIVE_JOBS, (age, timeout))
122

    
123
  def WaitForJobChangeOnce(self, job_id, fields,
124
                           prev_job_info, prev_log_serial,
125
                           timeout=WFJC_TIMEOUT):
126
    """Waits for changes on a job.
127

128
    @param job_id: Job ID
129
    @type fields: list
130
    @param fields: List of field names to be observed
131
    @type prev_job_info: None or list
132
    @param prev_job_info: Previously received job information
133
    @type prev_log_serial: None or int/long
134
    @param prev_log_serial: Highest log serial number previously received
135
    @type timeout: int/float
136
    @param timeout: Timeout in seconds (values larger than L{WFJC_TIMEOUT} will
137
                    be capped to that value)
138

139
    """
140
    assert timeout >= 0, "Timeout can not be negative"
141
    return self.CallMethod(REQ_WAIT_FOR_JOB_CHANGE,
142
                           (job_id, fields, prev_job_info,
143
                            prev_log_serial,
144
                            min(WFJC_TIMEOUT, timeout)))
145

    
146
  def WaitForJobChange(self, job_id, fields, prev_job_info, prev_log_serial):
147
    while True:
148
      result = self.WaitForJobChangeOnce(job_id, fields,
149
                                         prev_job_info, prev_log_serial)
150
      if result != constants.JOB_NOTCHANGED:
151
        break
152
    return result
153

    
154
  def Query(self, what, fields, qfilter):
155
    """Query for resources/items.
156

157
    @param what: One of L{constants.QR_VIA_LUXI}
158
    @type fields: List of strings
159
    @param fields: List of requested fields
160
    @type qfilter: None or list
161
    @param qfilter: Query filter
162
    @rtype: L{objects.QueryResponse}
163

164
    """
165
    result = self.CallMethod(REQ_QUERY, (what, fields, qfilter))
166
    return objects.QueryResponse.FromDict(result)
167

    
168
  def QueryFields(self, what, fields):
169
    """Query for available fields.
170

171
    @param what: One of L{constants.QR_VIA_LUXI}
172
    @type fields: None or list of strings
173
    @param fields: List of requested fields
174
    @rtype: L{objects.QueryFieldsResponse}
175

176
    """
177
    result = self.CallMethod(REQ_QUERY_FIELDS, (what, fields))
178
    return objects.QueryFieldsResponse.FromDict(result)
179

    
180
  def QueryJobs(self, job_ids, fields):
181
    return self.CallMethod(REQ_QUERY_JOBS, (job_ids, fields))
182

    
183
  def QueryInstances(self, names, fields, use_locking):
184
    return self.CallMethod(REQ_QUERY_INSTANCES, (names, fields, use_locking))
185

    
186
  def QueryNodes(self, names, fields, use_locking):
187
    return self.CallMethod(REQ_QUERY_NODES, (names, fields, use_locking))
188

    
189
  def QueryGroups(self, names, fields, use_locking):
190
    return self.CallMethod(REQ_QUERY_GROUPS, (names, fields, use_locking))
191

    
192
  def QueryNetworks(self, names, fields, use_locking):
193
    return self.CallMethod(REQ_QUERY_NETWORKS, (names, fields, use_locking))
194

    
195
  def QueryExports(self, nodes, use_locking):
196
    return self.CallMethod(REQ_QUERY_EXPORTS, (nodes, use_locking))
197

    
198
  def QueryClusterInfo(self):
199
    return self.CallMethod(REQ_QUERY_CLUSTER_INFO, ())
200

    
201
  def QueryConfigValues(self, fields):
202
    return self.CallMethod(REQ_QUERY_CONFIG_VALUES, (fields, ))
203

    
204
  def QueryTags(self, kind, name):
205
    return self.CallMethod(REQ_QUERY_TAGS, (kind, name))