001/** 002 * Licensed to the Apache Software Foundation (ASF) under one or more 003 * contributor license agreements. See the NOTICE file distributed with 004 * this work for additional information regarding copyright ownership. 005 * The ASF licenses this file to You under the Apache License, Version 2.0 006 * (the "License"); you may not use this file except in compliance with 007 * the License. You may obtain a copy of the License at 008 * 009 * http://www.apache.org/licenses/LICENSE-2.0 010 * 011 * Unless required by applicable law or agreed to in writing, software 012 * distributed under the License is distributed on an "AS IS" BASIS, 013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 014 * See the License for the specific language governing permissions and 015 * limitations under the License. 016 */ 017package org.apache.activemq.store.jdbc; 018 019import java.io.IOException; 020import java.sql.Connection; 021import java.sql.PreparedStatement; 022import java.sql.SQLException; 023import java.sql.Statement; 024import java.util.LinkedList; 025import java.util.List; 026 027import javax.sql.DataSource; 028 029import org.apache.activemq.util.IOExceptionSupport; 030import org.slf4j.Logger; 031import org.slf4j.LoggerFactory; 032 033/** 034 * Helps keep track of the current transaction/JDBC connection. 035 */ 036public class TransactionContext { 037 038 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class); 039 040 private final DataSource dataSource; 041 private final JDBCPersistenceAdapter persistenceAdapter; 042 private Connection connection; 043 private boolean inTx; 044 private PreparedStatement addMessageStatement; 045 private PreparedStatement removedMessageStatement; 046 private PreparedStatement updateLastAckStatement; 047 // a cheap dirty level that we can live with 048 private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED; 049 private LinkedList<Runnable> completions = new LinkedList<Runnable>(); 050 051 public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException { 052 this.persistenceAdapter = persistenceAdapter; 053 this.dataSource = persistenceAdapter.getDataSource(); 054 } 055 056 public Connection getConnection() throws IOException { 057 if (connection == null) { 058 try { 059 connection = dataSource.getConnection(); 060 if (persistenceAdapter.isChangeAutoCommitAllowed()) { 061 boolean autoCommit = !inTx; 062 if (connection.getAutoCommit() != autoCommit) { 063 LOG.trace("Setting auto commit to {} on connection {}", autoCommit, connection); 064 connection.setAutoCommit(autoCommit); 065 } 066 } 067 } catch (SQLException e) { 068 JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e); 069 inTx = false; 070 close(); 071 IOException ioe = IOExceptionSupport.create(e); 072 if (persistenceAdapter.getBrokerService() != null) { 073 persistenceAdapter.getBrokerService().handleIOException(ioe); 074 } 075 throw ioe; 076 } 077 078 try { 079 connection.setTransactionIsolation(transactionIsolation); 080 } catch (Throwable e) { 081 // ignore 082 LOG.trace("Cannot set transaction isolation to " + transactionIsolation + " due " + e.getMessage() 083 + ". This exception is ignored.", e); 084 } 085 } 086 return connection; 087 } 088 089 public void executeBatch() throws SQLException { 090 try { 091 executeBatch(addMessageStatement, "Failed add a message"); 092 } finally { 093 addMessageStatement = null; 094 try { 095 executeBatch(removedMessageStatement, "Failed to remove a message"); 096 } finally { 097 removedMessageStatement = null; 098 try { 099 executeBatch(updateLastAckStatement, "Failed to ack a message"); 100 } finally { 101 updateLastAckStatement = null; 102 } 103 } 104 } 105 } 106 107 private void executeBatch(PreparedStatement p, String message) throws SQLException { 108 if (p == null) { 109 return; 110 } 111 112 try { 113 int[] rc = p.executeBatch(); 114 for (int i = 0; i < rc.length; i++) { 115 int code = rc[i]; 116 if (code < 0 && code != Statement.SUCCESS_NO_INFO) { 117 throw new SQLException(message + ". Response code: " + code); 118 } 119 } 120 } finally { 121 try { 122 p.close(); 123 } catch (Throwable e) { 124 } 125 } 126 } 127 128 public void close() throws IOException { 129 if (!inTx) { 130 try { 131 132 /** 133 * we are not in a transaction so should not be committing ?? 134 * This was previously commented out - but had adverse affects 135 * on testing - so it's back! 136 * 137 */ 138 try { 139 executeBatch(); 140 } finally { 141 if (connection != null && !connection.getAutoCommit()) { 142 connection.commit(); 143 } 144 } 145 146 } catch (SQLException e) { 147 JDBCPersistenceAdapter.log("Error while closing connection: ", e); 148 IOException ioe = IOExceptionSupport.create(e); 149 persistenceAdapter.getBrokerService().handleIOException(ioe); 150 throw ioe; 151 } finally { 152 try { 153 if (connection != null) { 154 connection.close(); 155 } 156 } catch (Throwable e) { 157 // ignore 158 LOG.trace("Closing connection failed due: " + e.getMessage() + ". This exception is ignored.", e); 159 } finally { 160 connection = null; 161 } 162 for (Runnable completion: completions) { 163 completion.run(); 164 } 165 } 166 } 167 } 168 169 public void begin() throws IOException { 170 if (inTx) { 171 throw new IOException("Already started."); 172 } 173 inTx = true; 174 connection = getConnection(); 175 } 176 177 public void commit() throws IOException { 178 if (!inTx) { 179 throw new IOException("Not started."); 180 } 181 try { 182 executeBatch(); 183 if (!connection.getAutoCommit()) { 184 connection.commit(); 185 } 186 } catch (SQLException e) { 187 JDBCPersistenceAdapter.log("Commit failed: ", e); 188 try { 189 doRollback(); 190 } catch (Exception ignored) {} 191 IOException ioe = IOExceptionSupport.create(e); 192 persistenceAdapter.getBrokerService().handleIOException(ioe); 193 throw ioe; 194 } finally { 195 inTx = false; 196 close(); 197 } 198 } 199 200 public void rollback() throws IOException { 201 if (!inTx) { 202 throw new IOException("Not started."); 203 } 204 try { 205 doRollback(); 206 } catch (SQLException e) { 207 JDBCPersistenceAdapter.log("Rollback failed: ", e); 208 throw IOExceptionSupport.create(e); 209 } finally { 210 inTx = false; 211 close(); 212 } 213 } 214 215 private void doRollback() throws SQLException { 216 if (addMessageStatement != null) { 217 addMessageStatement.close(); 218 addMessageStatement = null; 219 } 220 if (removedMessageStatement != null) { 221 removedMessageStatement.close(); 222 removedMessageStatement = null; 223 } 224 if (updateLastAckStatement != null) { 225 updateLastAckStatement.close(); 226 updateLastAckStatement = null; 227 } 228 connection.rollback(); 229 } 230 231 public PreparedStatement getAddMessageStatement() { 232 return addMessageStatement; 233 } 234 235 public void setAddMessageStatement(PreparedStatement addMessageStatement) { 236 this.addMessageStatement = addMessageStatement; 237 } 238 239 public PreparedStatement getUpdateLastAckStatement() { 240 return updateLastAckStatement; 241 } 242 243 public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) { 244 this.updateLastAckStatement = ackMessageStatement; 245 } 246 247 public PreparedStatement getRemovedMessageStatement() { 248 return removedMessageStatement; 249 } 250 251 public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) { 252 this.removedMessageStatement = removedMessageStatement; 253 } 254 255 public void setTransactionIsolation(int transactionIsolation) { 256 this.transactionIsolation = transactionIsolation; 257 } 258 259 public void onCompletion(Runnable runnable) { 260 completions.add(runnable); 261 } 262}