001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.transport.http; 018 019import java.io.DataInputStream; 020import java.io.IOException; 021import java.io.InterruptedIOException; 022import java.net.URI; 023import java.util.zip.GZIPInputStream; 024import java.util.zip.GZIPOutputStream; 025 026import org.apache.activemq.command.ShutdownInfo; 027import org.apache.activemq.transport.FutureResponse; 028import org.apache.activemq.transport.util.TextWireFormat; 029import org.apache.activemq.util.ByteArrayOutputStream; 030import org.apache.activemq.util.IOExceptionSupport; 031import org.apache.activemq.util.IdGenerator; 032import org.apache.activemq.util.ServiceStopper; 033import org.apache.http.Header; 034import org.apache.http.HttpHost; 035import org.apache.http.HttpRequest; 036import org.apache.http.HttpRequestInterceptor; 037import org.apache.http.HttpResponse; 038import org.apache.http.HttpStatus; 039import org.apache.http.auth.AuthScope; 040import org.apache.http.auth.UsernamePasswordCredentials; 041import org.apache.http.client.HttpClient; 042import org.apache.http.client.HttpResponseException; 043import org.apache.http.client.ResponseHandler; 044import org.apache.http.client.methods.HttpGet; 045import org.apache.http.client.methods.HttpHead; 046import org.apache.http.client.methods.HttpOptions; 047import org.apache.http.client.methods.HttpPost; 048import org.apache.http.conn.ClientConnectionManager; 049import org.apache.http.conn.params.ConnRoutePNames; 050import org.apache.http.entity.ByteArrayEntity; 051import org.apache.http.impl.client.BasicResponseHandler; 052import org.apache.http.impl.client.DefaultHttpClient; 053import org.apache.http.impl.conn.PoolingClientConnectionManager; 054import org.apache.http.message.AbstractHttpMessage; 055import org.apache.http.params.HttpConnectionParams; 056import org.apache.http.params.HttpParams; 057import org.apache.http.protocol.HttpContext; 058import org.apache.http.util.EntityUtils; 059import org.slf4j.Logger; 060import org.slf4j.LoggerFactory; 061 062/** 063 * A HTTP {@link org.apache.activemq.transport.Transport} which uses the 064 * <a href="http://hc.apache.org/index.html">Apache HTTP Client</a> 065 * library 066 */ 067public class HttpClientTransport extends HttpTransportSupport { 068 069 public static final int MAX_CLIENT_TIMEOUT = 30000; 070 private static final Logger LOG = LoggerFactory.getLogger(HttpClientTransport.class); 071 private static final IdGenerator CLIENT_ID_GENERATOR = new IdGenerator(); 072 073 private HttpClient sendHttpClient; 074 private HttpClient receiveHttpClient; 075 076 private final String clientID = CLIENT_ID_GENERATOR.generateId(); 077 private boolean trace; 078 private HttpGet httpMethod; 079 private volatile int receiveCounter; 080 081 private int soTimeout = MAX_CLIENT_TIMEOUT; 082 083 private boolean useCompression = false; 084 protected boolean canSendCompressed = false; 085 private int minSendAsCompressedSize = 0; 086 087 public HttpClientTransport(TextWireFormat wireFormat, URI remoteUrl) { 088 super(wireFormat, remoteUrl); 089 } 090 091 public FutureResponse asyncRequest(Object command) throws IOException { 092 return null; 093 } 094 095 @Override 096 public void oneway(Object command) throws IOException { 097 098 if (isStopped()) { 099 throw new IOException("stopped."); 100 } 101 HttpPost httpMethod = new HttpPost(getRemoteUrl().toString()); 102 configureMethod(httpMethod); 103 String data = getTextWireFormat().marshalText(command); 104 byte[] bytes = data.getBytes("UTF-8"); 105 if (useCompression && canSendCompressed && bytes.length > minSendAsCompressedSize) { 106 ByteArrayOutputStream bytesOut = new ByteArrayOutputStream(); 107 GZIPOutputStream stream = new GZIPOutputStream(bytesOut); 108 stream.write(bytes); 109 stream.close(); 110 httpMethod.addHeader("Content-Type", "application/x-gzip"); 111 if (LOG.isTraceEnabled()) { 112 LOG.trace("Sending compressed, size = " + bytes.length + ", compressed size = " + bytesOut.size()); 113 } 114 bytes = bytesOut.toByteArray(); 115 } 116 ByteArrayEntity entity = new ByteArrayEntity(bytes); 117 httpMethod.setEntity(entity); 118 119 HttpClient client = null; 120 HttpResponse answer = null; 121 try { 122 client = getSendHttpClient(); 123 HttpParams params = client.getParams(); 124 HttpConnectionParams.setSoTimeout(params, soTimeout); 125 answer = client.execute(httpMethod); 126 int status = answer.getStatusLine().getStatusCode(); 127 if (status != HttpStatus.SC_OK) { 128 throw new IOException("Failed to post command: " + command + " as response was: " + answer); 129 } 130 if (command instanceof ShutdownInfo) { 131 try { 132 stop(); 133 } catch (Exception e) { 134 LOG.warn("Error trying to stop HTTP client: "+ e, e); 135 } 136 } 137 } catch (IOException e) { 138 throw IOExceptionSupport.create("Could not post command: " + command + " due to: " + e, e); 139 } finally { 140 if (answer != null) { 141 EntityUtils.consume(answer.getEntity()); 142 } 143 } 144 } 145 146 @Override 147 public Object request(Object command) throws IOException { 148 return null; 149 } 150 151 private DataInputStream createDataInputStream(HttpResponse answer) throws IOException { 152 Header encoding = answer.getEntity().getContentEncoding(); 153 if (encoding != null && "gzip".equalsIgnoreCase(encoding.getValue())) { 154 return new DataInputStream(new GZIPInputStream(answer.getEntity().getContent())); 155 } else { 156 return new DataInputStream(answer.getEntity().getContent()); 157 } 158 } 159 160 @Override 161 public void run() { 162 163 if (LOG.isTraceEnabled()) { 164 LOG.trace("HTTP GET consumer thread starting: " + this); 165 } 166 HttpClient httpClient = getReceiveHttpClient(); 167 URI remoteUrl = getRemoteUrl(); 168 169 while (!isStopped() && !isStopping()) { 170 171 httpMethod = new HttpGet(remoteUrl.toString()); 172 configureMethod(httpMethod); 173 HttpResponse answer = null; 174 175 try { 176 answer = httpClient.execute(httpMethod); 177 int status = answer.getStatusLine().getStatusCode(); 178 if (status != HttpStatus.SC_OK) { 179 if (status == HttpStatus.SC_REQUEST_TIMEOUT) { 180 LOG.debug("GET timed out"); 181 try { 182 Thread.sleep(1000); 183 } catch (InterruptedException e) { 184 onException(new InterruptedIOException()); 185 break; 186 } 187 } else { 188 onException(new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + answer)); 189 break; 190 } 191 } else { 192 receiveCounter++; 193 DataInputStream stream = createDataInputStream(answer); 194 Object command = getTextWireFormat().unmarshal(stream); 195 if (command == null) { 196 LOG.debug("Received null command from url: " + remoteUrl); 197 } else { 198 doConsume(command); 199 } 200 stream.close(); 201 } 202 } catch (IOException e) { 203 onException(IOExceptionSupport.create("Failed to perform GET on: " + remoteUrl + " Reason: " + e.getMessage(), e)); 204 break; 205 } finally { 206 if (answer != null) { 207 try { 208 EntityUtils.consume(answer.getEntity()); 209 } catch (IOException e) { 210 } 211 } 212 } 213 } 214 } 215 216 // Properties 217 // ------------------------------------------------------------------------- 218 public HttpClient getSendHttpClient() { 219 if (sendHttpClient == null) { 220 sendHttpClient = createHttpClient(); 221 } 222 return sendHttpClient; 223 } 224 225 public void setSendHttpClient(HttpClient sendHttpClient) { 226 this.sendHttpClient = sendHttpClient; 227 } 228 229 public HttpClient getReceiveHttpClient() { 230 if (receiveHttpClient == null) { 231 receiveHttpClient = createHttpClient(); 232 } 233 return receiveHttpClient; 234 } 235 236 public void setReceiveHttpClient(HttpClient receiveHttpClient) { 237 this.receiveHttpClient = receiveHttpClient; 238 } 239 240 // Implementation methods 241 // ------------------------------------------------------------------------- 242 @Override 243 protected void doStart() throws Exception { 244 245 if (LOG.isTraceEnabled()) { 246 LOG.trace("HTTP GET consumer thread starting: " + this); 247 } 248 HttpClient httpClient = getReceiveHttpClient(); 249 URI remoteUrl = getRemoteUrl(); 250 251 HttpHead httpMethod = new HttpHead(remoteUrl.toString()); 252 configureMethod(httpMethod); 253 254 // Request the options from the server so we can find out if the broker we are 255 // talking to supports GZip compressed content. If so and useCompression is on 256 // then we can compress our POST data, otherwise we must send it uncompressed to 257 // ensure backwards compatibility. 258 HttpOptions optionsMethod = new HttpOptions(remoteUrl.toString()); 259 ResponseHandler<String> handler = new BasicResponseHandler() { 260 @Override 261 public String handleResponse(HttpResponse response) throws HttpResponseException, IOException { 262 263 for(Header header : response.getAllHeaders()) { 264 if (header.getName().equals("Accepts-Encoding") && header.getValue().contains("gzip")) { 265 LOG.info("Broker Servlet supports GZip compression."); 266 canSendCompressed = true; 267 break; 268 } 269 } 270 271 return super.handleResponse(response); 272 } 273 }; 274 275 try { 276 httpClient.execute(httpMethod, new BasicResponseHandler()); 277 httpClient.execute(optionsMethod, handler); 278 } catch(Exception e) { 279 throw new IOException("Failed to perform GET on: " + remoteUrl + " as response was: " + e.getMessage()); 280 } 281 282 super.doStart(); 283 } 284 285 @Override 286 protected void doStop(ServiceStopper stopper) throws Exception { 287 if (httpMethod != null) { 288 // In some versions of the JVM a race between the httpMethod and the completion 289 // of the method when using HTTPS can lead to a deadlock. This hack attempts to 290 // detect that and interrupt the thread that's locked so that they can complete 291 // on another attempt. 292 for (int i = 0; i < 3; ++i) { 293 Thread abortThread = new Thread(new Runnable() { 294 295 @Override 296 public void run() { 297 try { 298 httpMethod.abort(); 299 } catch (Exception e) { 300 } 301 } 302 }); 303 304 abortThread.start(); 305 abortThread.join(2000); 306 if (abortThread.isAlive() && !httpMethod.isAborted()) { 307 abortThread.interrupt(); 308 } 309 } 310 } 311 } 312 313 protected HttpClient createHttpClient() { 314 DefaultHttpClient client = new DefaultHttpClient(createClientConnectionManager()); 315 if (useCompression) { 316 client.addRequestInterceptor( new HttpRequestInterceptor() { 317 @Override 318 public void process(HttpRequest request, HttpContext context) { 319 // We expect to received a compression response that we un-gzip 320 request.addHeader("Accept-Encoding", "gzip"); 321 } 322 }); 323 } 324 if (getProxyHost() != null) { 325 HttpHost proxy = new HttpHost(getProxyHost(), getProxyPort()); 326 client.getParams().setParameter(ConnRoutePNames.DEFAULT_PROXY, proxy); 327 328 if(getProxyUser() != null && getProxyPassword() != null) { 329 client.getCredentialsProvider().setCredentials( 330 new AuthScope(getProxyHost(), getProxyPort()), 331 new UsernamePasswordCredentials(getProxyUser(), getProxyPassword())); 332 } 333 } 334 return client; 335 } 336 337 protected ClientConnectionManager createClientConnectionManager() { 338 return new PoolingClientConnectionManager(); 339 } 340 341 protected void configureMethod(AbstractHttpMessage method) { 342 method.setHeader("clientID", clientID); 343 } 344 345 public boolean isTrace() { 346 return trace; 347 } 348 349 public void setTrace(boolean trace) { 350 this.trace = trace; 351 } 352 353 @Override 354 public int getReceiveCounter() { 355 return receiveCounter; 356 } 357 358 public int getSoTimeout() { 359 return soTimeout; 360 } 361 362 public void setSoTimeout(int soTimeout) { 363 this.soTimeout = soTimeout; 364 } 365 366 public void setUseCompression(boolean useCompression) { 367 this.useCompression = useCompression; 368 } 369 370 public boolean isUseCompression() { 371 return this.useCompression; 372 } 373 374 public int getMinSendAsCompressedSize() { 375 return minSendAsCompressedSize; 376 } 377 378 /** 379 * Sets the minimum size that must be exceeded on a send before compression is used if 380 * the useCompression option is specified. For very small payloads compression can be 381 * inefficient compared to the transmission size savings. 382 * 383 * Default value is 0. 384 * 385 * @param minSendAsCompressedSize 386 */ 387 public void setMinSendAsCompressedSize(int minSendAsCompressedSize) { 388 this.minSendAsCompressedSize = minSendAsCompressedSize; 389 } 390 391}