Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;
import java.nio.charset.Charset;
Expand All @@ -38,6 +39,7 @@
import org.apache.activemq.util.ByteArrayOutputStream;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.Data;
Expand Down Expand Up @@ -213,6 +215,12 @@ public static Binary getBinaryFromMessageBody(ActiveMQBytesMessage message) thro

if (message.isCompressed()) {
int length = (int) message.getBodyLength();
// before we allocate the buffer ensure it's not too large
try {
MarshallingSupport.validateMaxInflatedDataSize(message.getMaxInflatedDataSize(), length);
} catch (IOException cause) {
throw JMSExceptionSupport.create(cause);
}
byte[] uncompressed = new byte[length];
message.readBytes(uncompressed);

Expand Down Expand Up @@ -244,7 +252,9 @@ public static Binary getBinaryFromMessageBody(ActiveMQObjectMessage message) thr
if (message.isCompressed()) {
try (ByteArrayOutputStream os = new ByteArrayOutputStream();
ByteArrayInputStream is = new ByteArrayInputStream(contents);
InflaterInputStream iis = new InflaterInputStream(is);) {
// wrap to prevent allocating more than maxInflatedDataSize
InputStream iis = MarshallingSupport.createInflaterInputStream(
message.getMaxInflatedDataSize(), is)) {

byte value;
while ((value = (byte) iis.read()) != -1) {
Expand Down Expand Up @@ -282,10 +292,14 @@ public static Binary getBinaryFromMessageBody(ActiveMQTextMessage message) throw

if (message.isCompressed()) {
try (ByteArrayInputStream is = new ByteArrayInputStream(contents);
// We do not need to wrap this stream, the size is validated below
// before allocation
InflaterInputStream iis = new InflaterInputStream(is);
DataInputStream dis = new DataInputStream(iis);) {

int size = dis.readInt();
// before we allocate the buffer ensure it's not too large
MarshallingSupport.validateMaxInflatedDataSize(message.getMaxInflatedDataSize(), size);
byte[] uncompressed = new byte[size];
dis.readFully(uncompressed);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -496,6 +496,42 @@ public void testOpenWireToQpidObjectMessageWithOpenWireCompression() throws Exce
}
}

@Test
public void testOpenWireToQpidCompressionFailure() throws Exception {

// Raw Transformer doesn't expand message properties.
assumeFalse(!transformer.equals("jms"));

// set to 512 bytes
brokerService.setMaxInflatedDataSize(512);

try (Connection openwire = createJMSConnection(); Connection amqp = createConnection()) {
((ActiveMQConnection) openwire).setUseCompression(true);
openwire.start();
amqp.start();

Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination queue = openwireSession.createQueue(getDestinationName());
MessageProducer openwireProducer = openwireSession.createProducer(queue);
MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);

StringBuilder builder = new StringBuilder();
// generate a string longer than 512 bytes
for (int i = 1; i <= 50; i++) {
builder.append("compresedpayload");
}
// Create and send the Message
openwireProducer.send(openwireSession.createTextMessage(builder.toString()));

// There should be an error triggered on dispatch during decompression
// and message should go to the DLQ
assertNull(amqpConsumer.receive(1000));
assertTrue(sentToDlq.get());
}
}

// The following tests for corruption will corrupt the headers or body
// to test that the AMQP protocol correctly passes the error during
// dispatch to allow the Transport Connection to properly handle
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ public class BrokerService implements Service {
private final List<Runnable> preShutdownHooks = new CopyOnWriteArrayList<>();

private int maxUncommittedCount = DEFAULT_MAX_UNCOMMITTED_COUNT;
private int maxInflatedDataSize = OpenWireFormat.DEFAULT_MAX_INFLATED_DATA_SIZE;

static {

Expand Down Expand Up @@ -3349,4 +3350,17 @@ public void setMaxUncommittedCount(int maxUncommittedCount) {
this.maxUncommittedCount = maxUncommittedCount;
}

public int getMaxInflatedDataSize() {
return maxInflatedDataSize;
}

/**
* Set the maximum size that a compressed message can inflate to
* if a message has to be decompressed.
*
* @param maxInflatedDataSize
*/
public void setMaxInflatedDataSize(int maxInflatedDataSize) {
this.maxInflatedDataSize = maxInflatedDataSize;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -932,4 +932,9 @@ public MessageInterceptorStrategy getMessageInterceptorStrategy() {
public void setMessageInterceptorStrategy(MessageInterceptorStrategy messageInterceptorStrategy) {
this.messageInterceptorStrategy = messageInterceptorStrategy;
}

@Override
public int getMaxInflatedDataSize() {
return brokerService.getMaxInflatedDataSize();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,11 @@ public void deleteSubscription(ConnectionContext context, SubscriptionKey key) t
}
}

@Override
public int getMaxInflatedDataSize() {
return next.getMaxInflatedDataSize();
}

public Destination getNext() {
return next;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ public class ActiveMQConnection implements Connection, TopicConnection, QueueCon
private boolean optimizedMessageDispatch = true;
private boolean copyMessageOnSend = true;
private boolean useCompression;
private double maxInflatedDataSizeRatio = ActiveMQConnectionFactory.DEFAULT_MAX_INFLATED_DATA_SIZE_RATIO;
// This will be configured during negotiation if maxFrameSize has been configured.
private int maxInflatedDataSize = Integer.MAX_VALUE;
private boolean objectMessageSerializationDefered;
private boolean useAsyncSend;
private boolean optimizeAcknowledge;
Expand Down Expand Up @@ -2034,6 +2037,15 @@ protected void onWireFormatInfo(WireFormatInfo info) {
if(tmpMaxFrameSize > 0) {
maxFrameSize.set(tmpMaxFrameSize);
}

// Compute the maxInflatedData size as a ratio of maxFrameSize
// This prevents overflow and sets to Integer.MAX_VALUE if too large
double updatedMaxInflated = (double)tmpMaxFrameSize * maxInflatedDataSizeRatio;
if (Double.isInfinite(updatedMaxInflated) || updatedMaxInflated > Integer.MAX_VALUE) {
this.maxInflatedDataSize = Integer.MAX_VALUE;
} else {
this.maxInflatedDataSize = (int) updatedMaxInflated;
}
}

/**
Expand Down Expand Up @@ -2210,6 +2222,18 @@ public void setUseCompression(boolean useCompression) {
this.useCompression = useCompression;
}

public int getMaxInflatedDataSize() {
return maxInflatedDataSize;
}

public double getMaxInflatedDataSizeRatio() {
return maxInflatedDataSizeRatio;
}

public void setMaxInflatedDataSizeRatio(double maxInflatedDataSizeRatio) {
this.maxInflatedDataSizeRatio = maxInflatedDataSizeRatio;
}

public void destroyDestination(ActiveMQDestination destination) throws JMSException {

checkClosedOrFailed();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
public static final String DEFAULT_USER = null;
public static final String DEFAULT_PASSWORD = null;
public static final int DEFAULT_PRODUCER_WINDOW_SIZE = 0;
// The default ratio for maxInflatedDataSize. The default is 10x the size
// of maxFrameSize
public static final double DEFAULT_MAX_INFLATED_DATA_SIZE_RATIO = 10.0;

protected URI brokerURL;
protected String userName;
Expand Down Expand Up @@ -135,6 +138,7 @@ public class ActiveMQConnectionFactory extends JNDIBaseStorable implements Conne
private long optimizedAckScheduledAckInterval = 0;
private boolean copyMessageOnSend = true;
private boolean useCompression;
private double maxInflatedDataSizeRatio = DEFAULT_MAX_INFLATED_DATA_SIZE_RATIO;
private boolean objectMessageSerializationDefered;
private boolean useAsyncSend;
private boolean optimizeAcknowledge;
Expand Down Expand Up @@ -428,6 +432,7 @@ protected void configureConnection(ActiveMQConnection connection) throws JMSExce
connection.setOptimizedMessageDispatch(isOptimizedMessageDispatch());
connection.setCopyMessageOnSend(isCopyMessageOnSend());
connection.setUseCompression(isUseCompression());
connection.setMaxInflatedDataSizeRatio(getMaxInflatedDataSizeRatio());
connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
connection.setDispatchAsync(isDispatchAsync());
connection.setUseAsyncSend(isUseAsyncSend());
Expand Down Expand Up @@ -876,6 +881,7 @@ public void populateProperties(Properties props) {

props.setProperty("useAsyncSend", Boolean.toString(isUseAsyncSend()));
props.setProperty("useCompression", Boolean.toString(isUseCompression()));
props.setProperty("maxInflatedDataSizeRatio", Double.toString(getMaxInflatedDataSizeRatio()));
props.setProperty("useRetroactiveConsumer", Boolean.toString(isUseRetroactiveConsumer()));
props.setProperty("watchTopicAdvisories", Boolean.toString(isWatchTopicAdvisories()));

Expand Down Expand Up @@ -917,6 +923,21 @@ public void setUseCompression(boolean useCompression) {
this.useCompression = useCompression;
}

public double getMaxInflatedDataSizeRatio() {
return maxInflatedDataSizeRatio;
}

/**
* Set the ratio to use to compute maxInflatedDataSize which controls
* how large a decompressed message buffer can be. maxInflatedDataSize
* is computed as maxFrameSize * maxInflatedDataSizeRatio.
*
* @param maxInflatedDataSizeRatio
*/
public void setMaxInflatedDataSizeRatio(double maxInflatedDataSizeRatio) {
this.maxInflatedDataSizeRatio = maxInflatedDataSizeRatio;
}

public boolean isObjectMessageSerializationDefered() {
return objectMessageSerializationDefered;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ByteSequenceData;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.MarshallingSupport;

/**
* A <CODE>BytesMessage</CODE> object is used to send a message containing a
Expand Down Expand Up @@ -901,11 +902,15 @@ protected byte[] decompress(ByteSequence dataSequence) throws IOException {
ByteArrayOutputStream decompressed = new ByteArrayOutputStream();
try {
length = ByteSequenceData.readIntBig(dataSequence);
// verify the length of the buffer is not larger than maxInflatedDataSize
MarshallingSupport.validateMaxInflatedDataSize(getMaxInflatedDataSize(), length);
dataSequence.offset = 0;
byte[] data = Arrays.copyOfRange(dataSequence.getData(), 4, dataSequence.getLength());
inflater.setInput(data);
inflater.setInput(dataSequence.getData(), 4, dataSequence.getLength() - 4);
byte[] buffer = new byte[length];
int count = inflater.inflate(buffer);
if (count != length) {
throw new IllegalStateException("Inflated buffer size is different than expected size of " + length);
}
decompressed.write(buffer, 0, count);
return decompressed.toByteArray();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ private Map<String, Object> deserialize(ByteSequence content) throws JMSExceptio
if (content != null) {
InputStream is = new ByteArrayInputStream(content);
if (isCompressed()) {
is = MarshallingSupport.createInflaterInputStream(is);
// wrap the stream so we don't inflate past maxInflatedDataSize
is = MarshallingSupport.createInflaterInputStream(getMaxInflatedDataSize(), is);
}
DataInputStream dataIn = new DataInputStream(is);
map = MarshallingSupport.unmarshalPrimitiveMap(dataIn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.ClassLoadingAwareObjectInputStream;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;

/**
Expand Down Expand Up @@ -208,7 +209,8 @@ private Serializable deserialize(ByteSequence content) throws JMSException {
try {
InputStream is = new ByteArrayInputStream(content);
if (isCompressed()) {
is = new InflaterInputStream(is);
// wrap the stream so we don't inflate past maxInflatedDataSize
is = MarshallingSupport.createInflaterInputStream(getMaxInflatedDataSize(), is);
}
DataInputStream dataIn = new DataInputStream(is);
ClassLoadingAwareObjectInputStream objIn = new ClassLoadingAwareObjectInputStream(dataIn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,8 @@ public Object readObject() throws JMSException {
}
if (type == MarshallingSupport.BYTE_ARRAY_TYPE) {
int len = this.dataIn.readInt();
// verify that there are enough bytes remaining before allocation
MarshallingSupport.validateBufferSizeRemaining(dataIn, len);
byte[] value = new byte[len];
this.dataIn.readFully(value);
return value;
Expand Down Expand Up @@ -1165,10 +1167,15 @@ private void initializeWriting() throws JMSException {
if (compressed) {
ByteArrayInputStream input = new ByteArrayInputStream(this.content.getData(), this.content.getOffset(), this.content.getLength());
InflaterInputStream inflater = new InflaterInputStream(input);
int total = 0;
try {
byte[] buffer = new byte[8*1024];
int read = 0;
while ((read = inflater.read(buffer)) != -1) {
total = Math.addExact(total, read);
// each time through the loop see if we are >= max inflated size so we stop
// by doing this here we might go slightly pass the limit (up to 8 KB) but that is fine
MarshallingSupport.validateMaxInflatedDataSize(getMaxInflatedDataSize(), total);
this.dataOut.write(buffer, 0, read);
}
} finally {
Expand Down Expand Up @@ -1203,7 +1210,10 @@ private void initializeReading() throws MessageNotReadableException {
if (isCompressed()) {
is = new InflaterInputStream(is);
is = new BufferedInputStream(is);
is = MarshallingSupport.createFrameLimitedInputStream(Integer.MAX_VALUE, is);
// Wrap the buffered stream in a frame limited stream so we can error if we exceed
// max inflate size
is = MarshallingSupport.createFrameLimitedInputStream(getMaxInflatedDataSize(), is);

}
this.dataIn = new DataInputStream(is);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ private String decodeContent(ByteSequence bodyAsBytes) throws JMSException {
try {
is = new ByteArrayInputStream(bodyAsBytes);
if (isCompressed()) {
is = MarshallingSupport.createInflaterInputStream(is);
// wrap the stream so we don't inflate past maxInflatedDataSize
is = MarshallingSupport.createInflaterInputStream(getMaxInflatedDataSize(), is);
}
DataInputStream dataIn = new DataInputStream(is);
text = MarshallingSupport.readUTF8(dataIn);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.region.MessageReference;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.usage.MemoryUsage;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
Expand Down Expand Up @@ -102,9 +103,10 @@ public abstract class Message extends BaseCommand implements MarshallAware, Mess
private BrokerId[] brokerPath;
private BrokerId[] cluster;

public static interface MessageDestination {
public interface MessageDestination {
int getMinimumMessageSize();
MemoryUsage getMemoryUsage();
int getMaxInflatedDataSize();
}

public abstract Message copy();
Expand Down Expand Up @@ -871,4 +873,15 @@ protected Object readResolve() throws ObjectStreamException {
}
return this;
}

public int getMaxInflatedDataSize() {
// If this is set then this is on a broker
if (regionDestination != null) {
return regionDestination.getMaxInflatedDataSize();
// connection is set on Clients
} else if (connection != null) {
return connection.getMaxInflatedDataSize();
}
return OpenWireFormat.DEFAULT_MAX_INFLATED_DATA_SIZE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ public final class OpenWireFormat implements WireFormat {
public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
public static final int DEFAULT_LEGACY_VERSION = CommandTypes.PROTOCOL_LEGACY_STORE_VERSION;
public static final long DEFAULT_MAX_FRAME_SIZE = Long.MAX_VALUE;
public static final int DEFAULT_MAX_INFLATED_DATA_SIZE = 1024 * 1024 * 100;

static final byte NULL_TYPE = CommandTypes.NULL;
private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
Expand Down
Loading
Loading