Statistics
| Branch: | Revision:

root / extensions / httpnio / src / main / java / org / jclouds / http / httpnio / pool / NioHttpCommandConnectionPool.java @ 35e7942d

History | View | Annotate | Download (12 kB)

1
/**
2
 *
3
 * Copyright (C) 2009 Cloud Conscious, LLC. <info@cloudconscious.com>
4
 *
5
 * ====================================================================
6
 * Licensed under the Apache License, Version 2.0 (the "License");
7
 * you may not use this file except in compliance with the License.
8
 * You may obtain a copy of the License at
9
 *
10
 * http://www.apache.org/licenses/LICENSE-2.0
11
 *
12
 * Unless required by applicable law or agreed to in writing, software
13
 * distributed under the License is distributed on an "AS IS" BASIS,
14
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15
 * See the License for the specific language governing permissions and
16
 * limitations under the License.
17
 * ====================================================================
18
 */
19
package org.jclouds.http.httpnio.pool;
20

    
21
import static com.google.common.base.Preconditions.checkArgument;
22
import static com.google.common.base.Preconditions.checkNotNull;
23

    
24
import java.io.IOException;
25
import java.net.InetSocketAddress;
26
import java.net.URI;
27
import java.nio.charset.UnmappableCharacterException;
28
import java.security.KeyManagementException;
29
import java.security.NoSuchAlgorithmException;
30
import java.util.concurrent.BlockingQueue;
31
import java.util.concurrent.CancellationException;
32
import java.util.concurrent.ExecutorService;
33
import java.util.concurrent.Semaphore;
34
import java.util.concurrent.TimeUnit;
35
import java.util.concurrent.TimeoutException;
36

    
37
import javax.inject.Inject;
38
import javax.inject.Named;
39
import javax.net.ssl.SSLContext;
40

    
41
import org.apache.http.HttpException;
42
import org.apache.http.impl.nio.DefaultClientIOEventDispatch;
43
import org.apache.http.impl.nio.SSLClientIOEventDispatch;
44
import org.apache.http.impl.nio.reactor.DefaultConnectingIOReactor;
45
import org.apache.http.nio.NHttpConnection;
46
import org.apache.http.nio.protocol.AsyncNHttpClientHandler;
47
import org.apache.http.nio.protocol.EventListener;
48
import org.apache.http.nio.reactor.IOEventDispatch;
49
import org.apache.http.nio.reactor.IOReactorStatus;
50
import org.apache.http.nio.reactor.SessionRequest;
51
import org.apache.http.nio.reactor.SessionRequestCallback;
52
import org.apache.http.params.HttpParams;
53
import org.jclouds.Constants;
54
import org.jclouds.http.HttpCommandRendezvous;
55
import org.jclouds.http.TransformingHttpCommand;
56
import org.jclouds.http.pool.HttpCommandConnectionHandle;
57
import org.jclouds.http.pool.HttpCommandConnectionPool;
58

    
59
import com.google.common.annotations.VisibleForTesting;
60

    
61
/**
62
 * Connection Pool for HTTP requests that utilizes Apache HTTPNio
63
 * 
64
 * @author Adrian Cole
65
 */
66
public class NioHttpCommandConnectionPool extends HttpCommandConnectionPool<NHttpConnection>
67
         implements EventListener {
68

    
69
   @Override
70
   public String toString() {
71
      return "NioHttpCommandConnectionPool [ target=" + target + ", endPoint=" + getEndPoint()
72
               + ", hashCode=" + hashCode() + " ]";
73
   }
74

    
75
   private final NHttpClientConnectionPoolSessionRequestCallback sessionCallback;
76
   private final DefaultConnectingIOReactor ioReactor;
77
   private final IOEventDispatch dispatch;
78
   private final InetSocketAddress target;
79

    
80
   public static interface Factory extends HttpCommandConnectionPool.Factory<NHttpConnection> {
81
      NioHttpCommandConnectionPool create(URI endPoint);
82
   }
83

    
84
   @Inject
85
   public NioHttpCommandConnectionPool(ExecutorService executor, Semaphore allConnections,
86
            BlockingQueue<HttpCommandRendezvous<?>> commandQueue,
87
            BlockingQueue<NHttpConnection> available, AsyncNHttpClientHandler clientHandler,
88
            DefaultConnectingIOReactor ioReactor, HttpParams params, URI endPoint,
89
            @Named(Constants.PROPERTY_MAX_CONNECTION_REUSE) int maxConnectionReuse,
90
            @Named(Constants.PROPERTY_MAX_SESSION_FAILURES) int maxSessionFailures) {
91
      super(executor, allConnections, commandQueue, available, endPoint, maxConnectionReuse,
92
               maxSessionFailures);
93
      String host = checkNotNull(checkNotNull(endPoint, "endPoint").getHost(), String.format(
94
               "Host null for endpoint %s", endPoint));
95
      int port = endPoint.getPort();
96
      if (endPoint.getScheme().equals("https")) {
97
         try {
98
            this.dispatch = provideSSLClientEventDispatch(clientHandler, params);
99
         } catch (KeyManagementException e) {
100
            throw new RuntimeException("SSL error creating a connection to " + endPoint, e);
101
         } catch (NoSuchAlgorithmException e) {
102
            throw new RuntimeException("SSL error creating a connection to " + endPoint, e);
103
         }
104
         if (port == -1)
105
            port = 443;
106
      } else {
107
         this.dispatch = provideClientEventDispatch(clientHandler, params);
108
         if (port == -1)
109
            port = 80;
110
      }
111
      checkArgument(port > 0, String.format("Port %d not in range for endpoint %s", endPoint
112
               .getPort(), endPoint));
113
      this.ioReactor = ioReactor;
114
      this.sessionCallback = new NHttpClientConnectionPoolSessionRequestCallback();
115
      this.target = new InetSocketAddress(host, port);
116
      clientHandler.setEventListener(this);
117
   }
118

    
119
   public static IOEventDispatch provideSSLClientEventDispatch(AsyncNHttpClientHandler handler,
120
            HttpParams params) throws NoSuchAlgorithmException, KeyManagementException {
121
      SSLContext context = SSLContext.getInstance("TLS");
122
      context.init(null, null, null);
123
      return new SSLClientIOEventDispatch(handler, context, params);
124
   }
125

    
126
   public static IOEventDispatch provideClientEventDispatch(AsyncNHttpClientHandler handler,
127
            HttpParams params) {
128
      return new DefaultClientIOEventDispatch(handler, params);
129
   }
130

    
131
   @Override
132
   public void start() {
133
      synchronized (this.statusLock) {
134
         if (this.status.compareTo(Status.INACTIVE) == 0) {
135
            executorService.execute(new Runnable() {
136
               public void run() {
137
                  try {
138
                     ioReactor.execute(dispatch);
139
                  } catch (IOException e) {
140
                     exception.set(e);
141
                     logger.error(e, "Error dispatching %1$s", dispatch);
142
                     status = Status.SHUTDOWN_REQUEST;
143
                  }
144
               }
145
            });
146
         }
147
         super.start();
148
      }
149
   }
150

    
151
   public void shutdownReactor(long waitMs) {
152
      try {
153
         this.ioReactor.shutdown(waitMs);
154
      } catch (IOException e) {
155
         logger.error(e, "Error shutting down reactor");
156
      }
157
   }
158

    
159
   @Override
160
   public boolean connectionValid(NHttpConnection conn) {
161
      boolean isOpen = conn.isOpen();
162
      boolean isStale = conn.isStale();
163
      long requestCount = conn.getMetrics().getRequestCount();
164
      return isOpen && !isStale && requestCount < maxConnectionReuse;
165
   }
166

    
167
   @Override
168
   public void shutdownConnection(NHttpConnection conn) {
169
      if (conn.getStatus() == NHttpConnection.ACTIVE) {
170
         try {
171
            conn.shutdown();
172
         } catch (IOException e) {
173
            logger.error(e, "Error shutting down connection");
174
         }
175
      }
176
   }
177

    
178
   @Override
179
   protected void doWork() throws Exception {
180
      createNewConnection();
181
   }
182

    
183
   @Override
184
   protected void doShutdown() {
185
      // Give the I/O reactor 1 sec to shut down
186
      shutdownReactor(1000);
187
      assert this.ioReactor.getStatus().equals(IOReactorStatus.SHUT_DOWN) : "incorrect status after io reactor shutdown :"
188
               + this.ioReactor.getStatus();
189
   }
190

    
191
   @Override
192
   protected void createNewConnection() throws InterruptedException {
193
      boolean acquired = allConnections.tryAcquire(1, TimeUnit.SECONDS);
194
      if (acquired) {
195
         if (shouldDoWork()) {
196
            logger.trace("Opening: %s", getTarget());
197
            ioReactor.connect(getTarget(), null, null, sessionCallback);
198
         } else {
199
            allConnections.release();
200
         }
201
      }
202
   }
203

    
204
   @Override
205
   protected void associateHandleWithConnection(
206
            HttpCommandConnectionHandle<NHttpConnection> handle, NHttpConnection connection) {
207
      connection.getContext().setAttribute("command-handle", handle);
208
   }
209

    
210
   @Override
211
   protected NioHttpCommandConnectionHandle getHandleFromConnection(NHttpConnection connection) {
212
      return (NioHttpCommandConnectionHandle) connection.getContext()
213
               .getAttribute("command-handle");
214
   }
215

    
216
   class NHttpClientConnectionPoolSessionRequestCallback implements SessionRequestCallback {
217

    
218
      /**
219
       * {@inheritDoc}
220
       */
221
      @Override
222
      public void completed(SessionRequest request) {
223

    
224
      }
225

    
226
      /**
227
       * @see releaseConnectionAndSetResponseException
228
       */
229
      @Override
230
      public void cancelled(SessionRequest request) {
231
         releaseConnectionAndSetResponseException(request, new CancellationException(
232
                  "Cancelled request: " + request.getRemoteAddress()));
233
      }
234

    
235
      /**
236
       * Releases a connection and associates the current exception with the request using the
237
       * session.
238
       */
239
      @VisibleForTesting
240
      void releaseConnectionAndSetResponseException(SessionRequest request, Exception e) {
241
         allConnections.release();
242
         TransformingHttpCommand<?> frequest = (TransformingHttpCommand<?>) request.getAttachment();
243
         if (frequest != null) {
244
            frequest.setException(e);
245
         }
246
      }
247

    
248
      /**
249
       * Disables the pool, if {@code maxSessionFailures} is reached}
250
       * 
251
       * @see releaseConnectionAndSetResponseException
252
       */
253
      @Override
254
      public void failed(SessionRequest request) {
255
         int count = currentSessionFailures.getAndIncrement();
256
         releaseConnectionAndSetResponseException(request, request.getException());
257
         if (count >= maxSessionFailures) {
258
            logger.error(request.getException(),
259
                     "Exceeded maximum Session failures: %d, Disabling pool for %s",
260
                     maxSessionFailures, getTarget());
261
            exception.set(request.getException());
262
         }
263

    
264
      }
265

    
266
      /**
267
       * @see releaseConnectionAndSetResponseException
268
       */
269
      @Override
270
      public void timeout(SessionRequest request) {
271
         releaseConnectionAndSetResponseException(request, new TimeoutException("Timeout on: "
272
                  + request.getRemoteAddress()));
273
      }
274

    
275
   }
276

    
277
   public void connectionOpen(NHttpConnection conn) {
278
      conn.setSocketTimeout(0);
279
      available.offer(conn);
280
      logger.trace("Opened: %s", getTarget());
281
   }
282

    
283
   public void connectionTimeout(NHttpConnection conn) {
284
      String message = String.format("Timeout on : %s - timeout %d", getTarget(), conn
285
               .getSocketTimeout());
286
      logger.warn(message);
287
      resubmitIfRequestIsReplayable(conn, new TimeoutException(message));
288
   }
289

    
290
   public void connectionClosed(NHttpConnection conn) {
291
      logger.trace("Closed: %s", getTarget());
292
   }
293

    
294
   public void fatalIOException(IOException ex, NHttpConnection conn) {
295
      logger.error(ex, "IO Exception: %s", getTarget());
296
      HttpCommandRendezvous<?> rendezvous = getCommandFromConnection(conn);
297
      if (rendezvous != null) {
298
         /**
299
          * these exceptions, while technically i/o are unresolvable. set the error on the command
300
          * itself so that it doesn't replay.
301
          */
302
         if (ex instanceof UnmappableCharacterException) {
303
            setExceptionOnCommand(ex, rendezvous);
304
         } else {
305
            resubmitIfRequestIsReplayable(conn, ex);
306
         }
307
      }
308
   }
309

    
310
   public void fatalProtocolException(HttpException ex, NHttpConnection conn) {
311
      logger.error(ex, "Protocol Exception: %s", getTarget());
312
      setExceptionOnCommand(conn, ex);
313
   }
314

    
315
   @Override
316
   protected NioHttpCommandConnectionHandle createHandle(HttpCommandRendezvous<?> command,
317
            NHttpConnection conn) {
318
      try {
319
         return new NioHttpCommandConnectionHandle(allConnections, available, endPoint, command,
320
                  conn);
321
      } catch (InterruptedException e) {
322
         throw new RuntimeException("Interrupted creating a handle to " + conn, e);
323
      }
324
   }
325

    
326
   @Override
327
   protected boolean isReplayable(HttpCommandRendezvous<?> rendezvous) {
328
      return rendezvous.getCommand().isReplayable();
329
   }
330

    
331
   @VisibleForTesting
332
   InetSocketAddress getTarget() {
333
      return target;
334
   }
335

    
336
}