Statistics
| Branch: | Revision:

root / extensions / httpnio / src / main / java / org / jclouds / http / httpnio / config / NioTransformingHttpCommandExecutorServiceModule.java @ 35e7942d

History | View | Annotate | Download (9.4 kB)

1
/**
2
 *
3
 * Copyright (C) 2009 Cloud Conscious, LLC. <info@cloudconscious.com>
4
 *
5
 * ====================================================================
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 * ====================================================================
18
 */
19
package org.jclouds.http.httpnio.config;
20

    
21
import static com.google.common.base.Preconditions.checkState;
22

    
23
import java.io.Closeable;
24
import java.io.IOException;
25
import java.net.URI;
26
import java.util.concurrent.ArrayBlockingQueue;
27
import java.util.concurrent.BlockingQueue;
28
import java.util.concurrent.ExecutorService;
29
import java.util.concurrent.LinkedBlockingQueue;
30
import java.util.concurrent.Semaphore;
31

    
32
import javax.inject.Inject;
33
import javax.inject.Named;
34
import javax.inject.Provider;
35
import javax.inject.Singleton;
36

    
37
import org.apache.http.ConnectionReuseStrategy;
38
import org.apache.http.HttpEntity;
39
import org.apache.http.HttpVersion;
40
import org.apache.http.impl.DefaultConnectionReuseStrategy;
41
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
42
import org.apache.http.nio.NHttpConnection;
43
import org.apache.http.nio.entity.BufferingNHttpEntity;
44
import org.apache.http.nio.entity.ConsumingNHttpEntity;
45
import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
46
import org.apache.http.nio.protocol.NHttpRequestExecutionHandler;
47
import org.apache.http.nio.reactor.IOReactorException;
48
import org.apache.http.nio.util.ByteBufferAllocator;
49
import org.apache.http.nio.util.HeapByteBufferAllocator;
50
import org.apache.http.params.BasicHttpParams;
51
import org.apache.http.params.CoreConnectionPNames;
52
import org.apache.http.params.CoreProtocolPNames;
53
import org.apache.http.params.HttpParams;
54
import org.apache.http.params.HttpProtocolParams;
55
import org.apache.http.protocol.BasicHttpProcessor;
56
import org.apache.http.protocol.RequestConnControl;
57
import org.apache.http.protocol.RequestContent;
58
import org.apache.http.protocol.RequestExpectContinue;
59
import org.apache.http.protocol.RequestTargetHost;
60
import org.apache.http.protocol.RequestUserAgent;
61
import org.jclouds.Constants;
62
import org.jclouds.http.HttpCommandRendezvous;
63
import org.jclouds.http.HttpUtils;
64
import org.jclouds.http.TransformingHttpCommandExecutorService;
65
import org.jclouds.http.config.ConfiguresHttpCommandExecutorService;
66
import org.jclouds.http.httpnio.pool.NioHttpCommandConnectionPool;
67
import org.jclouds.http.httpnio.pool.NioHttpCommandExecutionHandler;
68
import org.jclouds.http.httpnio.pool.NioTransformingHttpCommandExecutorService;
69
import org.jclouds.http.httpnio.pool.NioHttpCommandExecutionHandler.ConsumingNHttpEntityFactory;
70
import org.jclouds.http.pool.config.ConnectionPoolCommandExecutorServiceModule;
71
import org.jclouds.lifecycle.Closer;
72

    
73
import com.google.inject.Provides;
74
import com.google.inject.Scopes;
75
import com.google.inject.TypeLiteral;
76

    
77
/**
78
 * 
79
 * @author Adrian Cole
80
 */
81
@ConfiguresHttpCommandExecutorService
82
public class NioTransformingHttpCommandExecutorServiceModule extends
83
         ConnectionPoolCommandExecutorServiceModule<NHttpConnection> {
84

    
85
   @Provides
86
   // @Singleton per uri...
87
   public AsyncNHttpClientHandler provideAsyncNttpConnectionHandler(
88
            BasicHttpProcessor httpProcessor, NHttpRequestExecutionHandler execHandler,
89
            ConnectionReuseStrategy connStrategy, ByteBufferAllocator allocator, HttpParams params) {
90
      return new AsyncNHttpClientHandler(httpProcessor, execHandler, connStrategy, allocator,
91
               params);
92

    
93
   }
94

    
95
   @Provides
96
   @Singleton
97
   public BasicHttpProcessor provideConnectionProcessor() {
98
      BasicHttpProcessor httpproc = new BasicHttpProcessor();
99
      httpproc.addInterceptor(new RequestContent());
100
      httpproc.addInterceptor(new RequestTargetHost());
101
      httpproc.addInterceptor(new RequestConnControl());
102
      httpproc.addInterceptor(new RequestUserAgent());
103
      httpproc.addInterceptor(new RequestExpectContinue());
104
      return httpproc;
105
   }
106

    
107
   @Singleton
108
   @Provides
109
   HttpParams newBasicHttpParams(HttpUtils utils) {
110
      BasicHttpParams params = new BasicHttpParams();
111

    
112
      params.setIntParameter(CoreConnectionPNames.SOCKET_BUFFER_SIZE, 8 * 1024)
113
               .setBooleanParameter(CoreConnectionPNames.STALE_CONNECTION_CHECK, true)
114
               .setBooleanParameter(CoreConnectionPNames.TCP_NODELAY, true).setParameter(
115
                        CoreProtocolPNames.ORIGIN_SERVER, "jclouds/1.0");
116

    
117
      if (utils.getConnectionTimeout() > 0) {
118
         params.setIntParameter(CoreConnectionPNames.CONNECTION_TIMEOUT, utils
119
                  .getConnectionTimeout());
120
      }
121

    
122
      if (utils.getSocketOpenTimeout() > 0) {
123
         params.setIntParameter(CoreConnectionPNames.SO_TIMEOUT, utils.getSocketOpenTimeout());
124
      }
125

    
126
      HttpProtocolParams.setVersion(params, HttpVersion.HTTP_1_1);
127
      return params;
128
   }
129

    
130
   protected void configure() {
131
      super.configure();
132
      bind(TransformingHttpCommandExecutorService.class).to(
133
               NioTransformingHttpCommandExecutorService.class);
134
      bind(new TypeLiteral<BlockingQueue<HttpCommandRendezvous<?>>>() {
135
      }).to(new TypeLiteral<LinkedBlockingQueue<HttpCommandRendezvous<?>>>() {
136
      }).in(Scopes.SINGLETON);
137
      bind(NioHttpCommandExecutionHandler.ConsumingNHttpEntityFactory.class).to(
138
               ConsumingNHttpEntityFactoryImpl.class).in(Scopes.SINGLETON);
139
      bind(NHttpRequestExecutionHandler.class).to(NioHttpCommandExecutionHandler.class).in(
140
               Scopes.SINGLETON);
141
      bind(ConnectionReuseStrategy.class).to(DefaultConnectionReuseStrategy.class).in(
142
               Scopes.SINGLETON);
143
      bind(ByteBufferAllocator.class).to(HeapByteBufferAllocator.class);
144
      bind(NioHttpCommandConnectionPool.Factory.class).to(Factory.class).in(Scopes.SINGLETON);
145
   }
146

    
147
   private static class Factory implements NioHttpCommandConnectionPool.Factory {
148

    
149
      Closer closer;
150
      ExecutorService executor;
151
      int maxConnectionReuse;
152
      int maxSessionFailures;
153
      Provider<Semaphore> allConnections;
154
      Provider<BlockingQueue<HttpCommandRendezvous<?>>> commandQueue;
155
      Provider<BlockingQueue<NHttpConnection>> available;
156
      Provider<AsyncNHttpClientHandler> clientHandler;
157
      Provider<DefaultConnectingIOReactor> ioReactor;
158
      HttpParams params;
159

    
160
      @SuppressWarnings("unused")
161
      @Inject
162
      Factory(Closer closer, @Named(Constants.PROPERTY_USER_THREADS) ExecutorService executor,
163
               @Named(Constants.PROPERTY_MAX_CONNECTION_REUSE) int maxConnectionReuse,
164
               @Named(Constants.PROPERTY_MAX_SESSION_FAILURES) int maxSessionFailures,
165
               Provider<Semaphore> allConnections,
166
               Provider<BlockingQueue<HttpCommandRendezvous<?>>> commandQueue,
167
               Provider<BlockingQueue<NHttpConnection>> available,
168
               Provider<AsyncNHttpClientHandler> clientHandler,
169
               Provider<DefaultConnectingIOReactor> ioReactor, HttpParams params) {
170
         this.closer = closer;
171
         this.executor = executor;
172
         this.maxConnectionReuse = maxConnectionReuse;
173
         this.maxSessionFailures = maxSessionFailures;
174
         this.allConnections = allConnections;
175
         this.commandQueue = commandQueue;
176
         this.available = available;
177
         this.clientHandler = clientHandler;
178
         this.ioReactor = ioReactor;
179
         this.params = params;
180
      }
181

    
182
      public NioHttpCommandConnectionPool create(URI endPoint) {
183
         NioHttpCommandConnectionPool pool = new NioHttpCommandConnectionPool(executor,
184
                  allConnections.get(), commandQueue.get(), available.get(), clientHandler.get(),
185
                  ioReactor.get(), params, endPoint, maxConnectionReuse, maxSessionFailures);
186
         pool.start();
187
         closer.addToClose(new PoolCloser(pool));
188
         return pool;
189
      }
190

    
191
      private static class PoolCloser implements Closeable {
192
         private final NioHttpCommandConnectionPool pool;
193

    
194
         protected PoolCloser(NioHttpCommandConnectionPool pool) {
195
            this.pool = pool;
196
         }
197

    
198
         public void close() throws IOException {
199
            pool.shutdown();
200
         }
201
      }
202

    
203
   }
204

    
205
   private static class ConsumingNHttpEntityFactoryImpl implements ConsumingNHttpEntityFactory {
206
      @Inject
207
      javax.inject.Provider<ByteBufferAllocator> allocatorProvider;
208

    
209
      public ConsumingNHttpEntity create(HttpEntity httpEntity) {
210
         return new BufferingNHttpEntity(httpEntity, allocatorProvider.get());
211
      }
212
   }
213

    
214
   @Override
215
   public BlockingQueue<NHttpConnection> provideAvailablePool(HttpUtils utils) throws Exception {
216
      return new ArrayBlockingQueue<NHttpConnection>(utils.getMaxConnectionsPerHost() != 0 ? utils
217
               .getMaxConnectionsPerHost() : utils.getMaxConnections(), true);
218
   }
219

    
220
   @Provides
221
   // uri scope
222
   public DefaultConnectingIOReactor provideDefaultConnectingIOReactor(
223
            @Named(Constants.PROPERTY_IO_WORKER_THREADS) int maxWorkerThreads, HttpParams params)
224
            throws IOReactorException {
225
      checkState(maxWorkerThreads > 0, "io reactor needs at least 1 thread");
226
      return new DefaultConnectingIOReactor(maxWorkerThreads, params);
227
   }
228

    
229
}