root / extensions / httpnio / src / main / java / org / jclouds / http / httpnio / config / NioTransformingHttpCommandExecutorServiceModule.java @ 35e7942d
History | View | Annotate | Download (9.4 kB)
1 |
/**
|
---|---|
2 |
*
|
3 |
* Copyright (C) 2009 Cloud Conscious, LLC. <info@cloudconscious.com>
|
4 |
*
|
5 |
* ====================================================================
|
6 |
* Licensed under the Apache License, Version 2.0 (the "License");
|
7 |
* you may not use this file except in compliance with the License.
|
8 |
* You may obtain a copy of the License at
|
9 |
*
|
10 |
* http://www.apache.org/licenses/LICENSE-2.0
|
11 |
*
|
12 |
* Unless required by applicable law or agreed to in writing, software
|
13 |
* distributed under the License is distributed on an "AS IS" BASIS,
|
14 |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
15 |
* See the License for the specific language governing permissions and
|
16 |
* limitations under the License.
|
17 |
* ====================================================================
|
18 |
*/
|
19 |
package org.jclouds.http.httpnio.config; |
20 |
|
21 |
import static com.google.common.base.Preconditions.checkState; |
22 |
|
23 |
import java.io.Closeable; |
24 |
import java.io.IOException; |
25 |
import java.net.URI; |
26 |
import java.util.concurrent.ArrayBlockingQueue; |
27 |
import java.util.concurrent.BlockingQueue; |
28 |
import java.util.concurrent.ExecutorService; |
29 |
import java.util.concurrent.LinkedBlockingQueue; |
30 |
import java.util.concurrent.Semaphore; |
31 |
|
32 |
import javax.inject.Inject; |
33 |
import javax.inject.Named; |
34 |
import javax.inject.Provider; |
35 |
import javax.inject.Singleton; |
36 |
|
37 |
import org.apache.http.ConnectionReuseStrategy; |
38 |
import org.apache.http.HttpEntity; |
39 |
import org.apache.http.HttpVersion; |
40 |
import org.apache.http.impl.DefaultConnectionReuseStrategy; |
41 |
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor; |
42 |
import org.apache.http.nio.NHttpConnection; |
43 |
import org.apache.http.nio.entity.BufferingNHttpEntity; |
44 |
import org.apache.http.nio.entity.ConsumingNHttpEntity; |
45 |
import org.apache.http.nio.protocol.AsyncNHttpClientHandler; |
46 |
import org.apache.http.nio.protocol.NHttpRequestExecutionHandler; |
47 |
import org.apache.http.nio.reactor.IOReactorException; |
48 |
import org.apache.http.nio.util.ByteBufferAllocator; |
49 |
import org.apache.http.nio.util.HeapByteBufferAllocator; |
50 |
import org.apache.http.params.BasicHttpParams; |
51 |
import org.apache.http.params.CoreConnectionPNames; |
52 |
import org.apache.http.params.CoreProtocolPNames; |
53 |
import org.apache.http.params.HttpParams; |
54 |
import org.apache.http.params.HttpProtocolParams; |
55 |
import org.apache.http.protocol.BasicHttpProcessor; |
56 |
import org.apache.http.protocol.RequestConnControl; |
57 |
import org.apache.http.protocol.RequestContent; |
58 |
import org.apache.http.protocol.RequestExpectContinue; |
59 |
import org.apache.http.protocol.RequestTargetHost; |
60 |
import org.apache.http.protocol.RequestUserAgent; |
61 |
import org.jclouds.Constants; |
62 |
import org.jclouds.http.HttpCommandRendezvous; |
63 |
import org.jclouds.http.HttpUtils; |
64 |
import org.jclouds.http.TransformingHttpCommandExecutorService; |
65 |
import org.jclouds.http.config.ConfiguresHttpCommandExecutorService; |
66 |
import org.jclouds.http.httpnio.pool.NioHttpCommandConnectionPool; |
67 |
import org.jclouds.http.httpnio.pool.NioHttpCommandExecutionHandler; |
68 |
import org.jclouds.http.httpnio.pool.NioTransformingHttpCommandExecutorService; |
69 |
import org.jclouds.http.httpnio.pool.NioHttpCommandExecutionHandler.ConsumingNHttpEntityFactory; |
70 |
import org.jclouds.http.pool.config.ConnectionPoolCommandExecutorServiceModule; |
71 |
import org.jclouds.lifecycle.Closer; |
72 |
|
73 |
import com.google.inject.Provides; |
74 |
import com.google.inject.Scopes; |
75 |
import com.google.inject.TypeLiteral; |
76 |
|
77 |
/**
|
78 |
*
|
79 |
* @author Adrian Cole
|
80 |
*/
|
81 |
@ConfiguresHttpCommandExecutorService
|
82 |
public class NioTransformingHttpCommandExecutorServiceModule extends |
83 |
ConnectionPoolCommandExecutorServiceModule<NHttpConnection> { |
84 |
|
85 |
@Provides
|
86 |
// @Singleton per uri...
|
87 |
public AsyncNHttpClientHandler provideAsyncNttpConnectionHandler(
|
88 |
BasicHttpProcessor httpProcessor, NHttpRequestExecutionHandler execHandler, |
89 |
ConnectionReuseStrategy connStrategy, ByteBufferAllocator allocator, HttpParams params) { |
90 |
return new AsyncNHttpClientHandler(httpProcessor, execHandler, connStrategy, allocator, |
91 |
params); |
92 |
|
93 |
} |
94 |
|
95 |
@Provides
|
96 |
@Singleton
|
97 |
public BasicHttpProcessor provideConnectionProcessor() {
|
98 |
BasicHttpProcessor httpproc = new BasicHttpProcessor();
|
99 |
httpproc.addInterceptor(new RequestContent());
|
100 |
httpproc.addInterceptor(new RequestTargetHost());
|
101 |
httpproc.addInterceptor(new RequestConnControl());
|
102 |
httpproc.addInterceptor(new RequestUserAgent());
|
103 |
httpproc.addInterceptor(new RequestExpectContinue());
|
104 |
return httpproc;
|
105 |
} |
106 |
|
107 |
@Singleton
|
108 |
@Provides
|
109 |
HttpParams newBasicHttpParams(HttpUtils utils) { |
110 |
BasicHttpParams params = new BasicHttpParams();
|
111 |
|
112 |
params.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024) |
113 |
.setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, true)
|
114 |
.setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true).setParameter(
|
115 |
CoreProtocolPNames.ORIGIN_SERVER, "jclouds/1.0");
|
116 |
|
117 |
if (utils.getConnectionTimeout() > 0) { |
118 |
params.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, utils |
119 |
.getConnectionTimeout()); |
120 |
} |
121 |
|
122 |
if (utils.getSocketOpenTimeout() > 0) { |
123 |
params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, utils.getSocketOpenTimeout()); |
124 |
} |
125 |
|
126 |
HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1); |
127 |
return params;
|
128 |
} |
129 |
|
130 |
protected void configure() { |
131 |
super.configure();
|
132 |
bind(TransformingHttpCommandExecutorService.class).to( |
133 |
NioTransformingHttpCommandExecutorService.class); |
134 |
bind(new TypeLiteral<BlockingQueue<HttpCommandRendezvous<?>>>() { |
135 |
}).to(new TypeLiteral<LinkedBlockingQueue<HttpCommandRendezvous<?>>>() { |
136 |
}).in(Scopes.SINGLETON); |
137 |
bind(NioHttpCommandExecutionHandler.ConsumingNHttpEntityFactory.class).to( |
138 |
ConsumingNHttpEntityFactoryImpl.class).in(Scopes.SINGLETON); |
139 |
bind(NHttpRequestExecutionHandler.class).to(NioHttpCommandExecutionHandler.class).in( |
140 |
Scopes.SINGLETON); |
141 |
bind(ConnectionReuseStrategy.class).to(DefaultConnectionReuseStrategy.class).in( |
142 |
Scopes.SINGLETON); |
143 |
bind(ByteBufferAllocator.class).to(HeapByteBufferAllocator.class); |
144 |
bind(NioHttpCommandConnectionPool.Factory.class).to(Factory.class).in(Scopes.SINGLETON); |
145 |
} |
146 |
|
147 |
private static class Factory implements NioHttpCommandConnectionPool.Factory { |
148 |
|
149 |
Closer closer; |
150 |
ExecutorService executor;
|
151 |
int maxConnectionReuse;
|
152 |
int maxSessionFailures;
|
153 |
Provider<Semaphore> allConnections; |
154 |
Provider<BlockingQueue<HttpCommandRendezvous<?>>> commandQueue; |
155 |
Provider<BlockingQueue<NHttpConnection>> available; |
156 |
Provider<AsyncNHttpClientHandler> clientHandler;
|
157 |
Provider<DefaultConnectingIOReactor> ioReactor;
|
158 |
HttpParams params; |
159 |
|
160 |
@SuppressWarnings("unused") |
161 |
@Inject
|
162 |
Factory(Closer closer, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService executor, |
163 |
@Named(Constants.PROPERTY_MAX_CONNECTION_REUSE) int maxConnectionReuse, |
164 |
@Named(Constants.PROPERTY_MAX_SESSION_FAILURES) int maxSessionFailures, |
165 |
Provider<Semaphore> allConnections, |
166 |
Provider<BlockingQueue<HttpCommandRendezvous<?>>> commandQueue, |
167 |
Provider<BlockingQueue<NHttpConnection>> available, |
168 |
Provider<AsyncNHttpClientHandler> clientHandler,
|
169 |
Provider<DefaultConnectingIOReactor> ioReactor, HttpParams params) {
|
170 |
this.closer = closer;
|
171 |
this.executor = executor;
|
172 |
this.maxConnectionReuse = maxConnectionReuse;
|
173 |
this.maxSessionFailures = maxSessionFailures;
|
174 |
this.allConnections = allConnections;
|
175 |
this.commandQueue = commandQueue;
|
176 |
this.available = available;
|
177 |
this.clientHandler = clientHandler;
|
178 |
this.ioReactor = ioReactor;
|
179 |
this.params = params;
|
180 |
} |
181 |
|
182 |
public NioHttpCommandConnectionPool create(URI endPoint) { |
183 |
NioHttpCommandConnectionPool pool = new NioHttpCommandConnectionPool(executor,
|
184 |
allConnections.get(), commandQueue.get(), available.get(), clientHandler.get(), |
185 |
ioReactor.get(), params, endPoint, maxConnectionReuse, maxSessionFailures); |
186 |
pool.start(); |
187 |
closer.addToClose(new PoolCloser(pool));
|
188 |
return pool;
|
189 |
} |
190 |
|
191 |
private static class PoolCloser implements Closeable { |
192 |
private final NioHttpCommandConnectionPool pool; |
193 |
|
194 |
protected PoolCloser(NioHttpCommandConnectionPool pool) {
|
195 |
this.pool = pool;
|
196 |
} |
197 |
|
198 |
public void close() throws IOException { |
199 |
pool.shutdown(); |
200 |
} |
201 |
} |
202 |
|
203 |
} |
204 |
|
205 |
private static class ConsumingNHttpEntityFactoryImpl implements ConsumingNHttpEntityFactory { |
206 |
@Inject
|
207 |
javax.inject.Provider<ByteBufferAllocator> allocatorProvider; |
208 |
|
209 |
public ConsumingNHttpEntity create(HttpEntity httpEntity) {
|
210 |
return new BufferingNHttpEntity(httpEntity, allocatorProvider.get()); |
211 |
} |
212 |
} |
213 |
|
214 |
@Override
|
215 |
public BlockingQueue<NHttpConnection> provideAvailablePool(HttpUtils utils) throws Exception { |
216 |
return new ArrayBlockingQueue<NHttpConnection>(utils.getMaxConnectionsPerHost() != 0 ? utils |
217 |
.getMaxConnectionsPerHost() : utils.getMaxConnections(), true);
|
218 |
} |
219 |
|
220 |
@Provides
|
221 |
// uri scope
|
222 |
public DefaultConnectingIOReactor provideDefaultConnectingIOReactor(
|
223 |
@Named(Constants.PROPERTY_IO_WORKER_THREADS) int maxWorkerThreads, HttpParams params) |
224 |
throws IOReactorException {
|
225 |
checkState(maxWorkerThreads > 0, "io reactor needs at least 1 thread"); |
226 |
return new DefaultConnectingIOReactor(maxWorkerThreads, params); |
227 |
} |
228 |
|
229 |
} |