root / extensions / httpnio / src / main / java / org / jclouds / http / pool / ConnectionPoolTransformingHttpCommandExecutorService.java @ 35e7942d
History | View | Annotate | Download (8.1 kB)
1 |
/**
|
---|---|
2 |
*
|
3 |
* Copyright (C) 2009 Cloud Conscious, LLC. <info@cloudconscious.com>
|
4 |
*
|
5 |
* ====================================================================
|
6 |
* Licensed under the Apache License, Version 2.0 (the "License");
|
7 |
* you may not use this file except in compliance with the License.
|
8 |
* You may obtain a copy of the License at
|
9 |
*
|
10 |
* http://www.apache.org/licenses/LICENSE-2.0
|
11 |
*
|
12 |
* Unless required by applicable law or agreed to in writing, software
|
13 |
* distributed under the License is distributed on an "AS IS" BASIS,
|
14 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
15 |
* See the License for the specific language governing permissions and
|
16 |
* limitations under the License.
|
17 |
* ====================================================================
|
18 |
*/
|
19 |
package org.jclouds.http.pool; |
20 |
|
21 |
import static com.google.common.base.Preconditions.checkArgument; |
22 |
import static org.jclouds.concurrent.ConcurrentUtils.makeListenable; |
23 |
|
24 |
import java.net.URI; |
25 |
import java.util.concurrent.BlockingQueue; |
26 |
import java.util.concurrent.Callable; |
27 |
import java.util.concurrent.CancellationException; |
28 |
import java.util.concurrent.ConcurrentMap; |
29 |
import java.util.concurrent.ExecutorService; |
30 |
import java.util.concurrent.SynchronousQueue; |
31 |
import java.util.concurrent.TimeUnit; |
32 |
import java.util.concurrent.TimeoutException; |
33 |
|
34 |
import javax.inject.Inject; |
35 |
|
36 |
import org.jclouds.http.HttpCommand; |
37 |
import org.jclouds.http.HttpCommandRendezvous; |
38 |
import org.jclouds.http.HttpResponse; |
39 |
import org.jclouds.http.TransformingHttpCommandExecutorService; |
40 |
import org.jclouds.lifecycle.BaseLifeCycle; |
41 |
|
42 |
import com.google.common.base.Function; |
43 |
import com.google.common.base.Throwables; |
44 |
import com.google.common.collect.MapMaker; |
45 |
import com.google.common.util.concurrent.ListenableFuture; |
46 |
|
47 |
/**
|
48 |
*
|
49 |
* @author Adrian Cole
|
50 |
*/
|
51 |
public class ConnectionPoolTransformingHttpCommandExecutorService<C> extends BaseLifeCycle |
52 |
implements TransformingHttpCommandExecutorService {
|
53 |
|
54 |
private final ConcurrentMap<URI, HttpCommandConnectionPool<C>> poolMap; |
55 |
private final BlockingQueue<HttpCommandRendezvous<?>> commandQueue; |
56 |
private final HttpCommandConnectionPool.Factory<C> poolFactory; |
57 |
|
58 |
@Inject
|
59 |
public ConnectionPoolTransformingHttpCommandExecutorService(ExecutorService executor, |
60 |
HttpCommandConnectionPool.Factory<C> pf, |
61 |
BlockingQueue<HttpCommandRendezvous<?>> commandQueue) {
|
62 |
super(executor);
|
63 |
this.poolFactory = pf;
|
64 |
// TODO inject this.
|
65 |
poolMap = new MapMaker().makeComputingMap(new Function<URI, HttpCommandConnectionPool<C>>() { |
66 |
public HttpCommandConnectionPool<C> apply(URI endPoint) { |
67 |
checkArgument(endPoint.getHost() != null, String.format( |
68 |
"endPoint.getHost() is null for %s", endPoint));
|
69 |
try {
|
70 |
HttpCommandConnectionPool<C> pool = poolFactory.create(endPoint); |
71 |
addDependency(pool); |
72 |
return pool;
|
73 |
} catch (RuntimeException e) { |
74 |
logger.error(e, "error creating entry for %s", endPoint);
|
75 |
throw e;
|
76 |
} |
77 |
} |
78 |
}); |
79 |
this.commandQueue = commandQueue;
|
80 |
} |
81 |
|
82 |
/**
|
83 |
* {@inheritDoc}
|
84 |
*
|
85 |
* If the reason we are shutting down is due an exception, we set that exception on all pending
|
86 |
* commands. Otherwise, we cancel the pending commands.
|
87 |
*/
|
88 |
@Override
|
89 |
protected void doShutdown() { |
90 |
exception.compareAndSet(null, getExceptionFromDependenciesOrNull());
|
91 |
while (!commandQueue.isEmpty()) {
|
92 |
HttpCommandRendezvous<?> rendezvous = (HttpCommandRendezvous<?>) commandQueue.remove(); |
93 |
if (rendezvous != null) { |
94 |
try {
|
95 |
if (exception.get() != null) |
96 |
rendezvous.setException(exception.get()); |
97 |
else
|
98 |
rendezvous.setException(new CancellationException("shutdown")); |
99 |
} catch (InterruptedException e) { |
100 |
logger.error(e, "Error cancelling command %s", rendezvous.getCommand());
|
101 |
} |
102 |
} |
103 |
} |
104 |
} |
105 |
|
106 |
@Override
|
107 |
protected void doWork() throws InterruptedException { |
108 |
takeACommandOffTheQueueAndInvokeIt(); |
109 |
} |
110 |
|
111 |
private void takeACommandOffTheQueueAndInvokeIt() throws InterruptedException { |
112 |
HttpCommandRendezvous<?> rendezvous = commandQueue.poll(1, TimeUnit.SECONDS); |
113 |
if (rendezvous != null) { |
114 |
try {
|
115 |
invoke(rendezvous); |
116 |
} catch (Exception e) { |
117 |
Throwables.propagateIfPossible(e, InterruptedException.class);
|
118 |
logger.error(e, "Error processing command %s", rendezvous.getCommand());
|
119 |
} |
120 |
} |
121 |
} |
122 |
|
123 |
/**
|
124 |
* This is an asynchronous operation that puts the <code>command</code> onto a queue. Later, it
|
125 |
* will be processed via the {@link #invoke(HttpCommandRendezvous) invoke}
|
126 |
* method.
|
127 |
*/
|
128 |
public <T> ListenableFuture<T> submit(HttpCommand command,
|
129 |
final Function<HttpResponse, T> responseTransformer) {
|
130 |
exceptionIfNotActive(); |
131 |
final SynchronousQueue<?> channel = new SynchronousQueue<Object>(); |
132 |
// should block and immediately parse the response on exit.
|
133 |
ListenableFuture<T> future = makeListenable(executorService.submit(new Callable<T>() { |
134 |
public T call() throws Exception { |
135 |
Object o = channel.take();
|
136 |
if (o instanceof Exception) { |
137 |
throw (Exception) o; |
138 |
} |
139 |
return responseTransformer.apply((HttpResponse) o);
|
140 |
} |
141 |
}), executorService); |
142 |
|
143 |
HttpCommandRendezvous<T> rendezvous = new HttpCommandRendezvous<T>(command, channel, future);
|
144 |
commandQueue.add(rendezvous); |
145 |
return rendezvous.getListenableFuture();
|
146 |
} |
147 |
|
148 |
/**
|
149 |
* Invoke binds a command with a connection from the pool. This binding is called a
|
150 |
* {@link HttpCommandConnectionHandle handle}. The handle will keep this binding until the
|
151 |
* command's response is parsed or an exception is set on the Command object.
|
152 |
*
|
153 |
* @param command
|
154 |
*/
|
155 |
protected void invoke(HttpCommandRendezvous<?> command) { |
156 |
exceptionIfNotActive(); |
157 |
|
158 |
URI endpoint = createBaseEndpointFor(command);
|
159 |
|
160 |
HttpCommandConnectionPool<C> pool = poolMap.get(endpoint); |
161 |
if (pool == null) { |
162 |
// TODO limit;
|
163 |
logger.warn("pool not available for command %s; retrying", command.getCommand());
|
164 |
commandQueue.add(command); |
165 |
return;
|
166 |
} |
167 |
|
168 |
HttpCommandConnectionHandle<C> connectionHandle = null;
|
169 |
try {
|
170 |
connectionHandle = pool.getHandle(command); |
171 |
} catch (InterruptedException e) { |
172 |
logger.warn(e, "Interrupted getting a connection for command %s; retrying", command
|
173 |
.getCommand()); |
174 |
commandQueue.add(command); |
175 |
return;
|
176 |
} catch (TimeoutException e) { |
177 |
logger.warn(e, "Timeout getting a connection for command %s on pool %s; retrying", command
|
178 |
.getCommand(), pool); |
179 |
commandQueue.add(command); |
180 |
return;
|
181 |
} catch (RuntimeException e) { |
182 |
logger.warn(e, "Error getting a connection for command %s on pool %s; retrying", command
|
183 |
.getCommand(), pool); |
184 |
discardPool(endpoint, pool); |
185 |
commandQueue.add(command); |
186 |
return;
|
187 |
} |
188 |
|
189 |
if (connectionHandle == null) { |
190 |
logger.error("Failed to obtain connection for command %s; retrying", command.getCommand());
|
191 |
commandQueue.add(command); |
192 |
return;
|
193 |
} |
194 |
connectionHandle.startConnection(); |
195 |
} |
196 |
|
197 |
private void discardPool(URI endpoint, HttpCommandConnectionPool<C> pool) { |
198 |
poolMap.remove(endpoint, pool); |
199 |
pool.shutdown(); |
200 |
this.dependencies.remove(pool);
|
201 |
} |
202 |
|
203 |
/**
|
204 |
* keys to the map are only used for socket information, not path. In this case, you should
|
205 |
* remove any path or query details from the URI.
|
206 |
*/
|
207 |
private URI createBaseEndpointFor(HttpCommandRendezvous<?> command) { |
208 |
URI endpoint = command.getCommand().getRequest().getEndpoint();
|
209 |
if (endpoint.getPort() == -1) { |
210 |
return URI.create(String.format("%s://%s", endpoint.getScheme(), endpoint.getHost())); |
211 |
} else {
|
212 |
return URI.create(String.format("%s://%s:%d", endpoint.getScheme(), endpoint.getHost(), |
213 |
endpoint.getPort())); |
214 |
} |
215 |
} |
216 |
|
217 |
} |