blob: 1cd7adbef5d4cbd2818dad1aef7aff6c18506532 [file] [log] [blame]
package org.jetbrains.io.fastCgi;
import com.intellij.util.Consumer;
import gnu.trove.TIntObjectHashMap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.CharsetUtil;
import org.jetbrains.io.Decoder;
import static org.jetbrains.io.fastCgi.FastCgiService.LOG;
public class FastCgiDecoder extends Decoder {
private enum State {
HEADER, CONTENT
}
private State state = State.HEADER;
private enum ProtocolStatus {
REQUEST_COMPLETE, CANT_MPX_CONN, OVERLOADED, UNKNOWN_ROLE
}
public static final class RecordType {
public static final int END_REQUEST = 3;
public static final int STDOUT = 6;
public static final int STDERR = 7;
}
private int type;
private int id;
private int contentLength;
private int paddingLength;
private final TIntObjectHashMap<ByteBuf> dataBuffers = new TIntObjectHashMap<ByteBuf>();
private final Consumer<String> errorOutputConsumer;
public FastCgiDecoder(Consumer<String> errorOutputConsumer) {
this.errorOutputConsumer = errorOutputConsumer;
}
@Override
protected void messageReceived(ChannelHandlerContext context, ByteBuf input) throws Exception {
while (true) {
switch (state) {
case HEADER: {
if (paddingLength > 0) {
if (input.readableBytes() >= paddingLength) {
input.skipBytes(paddingLength);
paddingLength = 0;
}
else {
paddingLength -= input.readableBytes();
input.skipBytes(input.readableBytes());
input.release();
return;
}
}
ByteBuf buffer = getBufferIfSufficient(input, FastCgiConstants.HEADER_LENGTH, context);
if (buffer == null) {
input.release();
return;
}
decodeHeader(buffer);
state = State.CONTENT;
}
case CONTENT: {
if (contentLength > 0) {
ByteBuf buffer = getBufferIfSufficient(input, contentLength, context);
if (buffer == null) {
input.release();
return;
}
FastCgiResponse response = readContent(buffer);
if (response != null) {
context.fireChannelRead(response);
}
}
state = State.HEADER;
}
}
}
}
private void decodeHeader(ByteBuf buffer) {
buffer.skipBytes(1);
type = buffer.readUnsignedByte();
id = buffer.readUnsignedShort();
contentLength = buffer.readUnsignedShort();
paddingLength = buffer.readUnsignedByte();
buffer.skipBytes(1);
}
private FastCgiResponse readContent(ByteBuf buffer) {
switch (type) {
case RecordType.END_REQUEST:
int appStatus = buffer.readInt();
int protocolStatus = buffer.readUnsignedByte();
buffer.skipBytes(3);
if (appStatus != 0 || protocolStatus != ProtocolStatus.REQUEST_COMPLETE.ordinal()) {
LOG.warn("Protocol status " + protocolStatus);
dataBuffers.remove(id);
return new FastCgiResponse(id, null);
}
else if (protocolStatus == ProtocolStatus.REQUEST_COMPLETE.ordinal()) {
return new FastCgiResponse(id, dataBuffers.remove(id));
}
break;
case RecordType.STDOUT:
ByteBuf data = dataBuffers.get(id);
ByteBuf sliced = buffer.slice(buffer.readerIndex(), contentLength);
if (data == null) {
dataBuffers.put(id, sliced);
}
else if (data instanceof CompositeByteBuf) {
((CompositeByteBuf)data).addComponent(sliced);
data.writerIndex(data.writerIndex() + sliced.readableBytes());
}
else {
dataBuffers.put(id, Unpooled.wrappedBuffer(data, sliced));
}
sliced.retain();
buffer.skipBytes(contentLength);
break;
case RecordType.STDERR:
try {
errorOutputConsumer.consume(buffer.toString(buffer.readerIndex(), contentLength, CharsetUtil.UTF_8));
}
catch (Throwable e) {
LOG.error(e);
}
buffer.skipBytes(contentLength);
break;
default:
LOG.error("Unknown type " + type);
break;
}
return null;
}
}