blob: fe0c5ba9b9b6737ebc84bf6352b5794f94cd0dc5 [file] [log] [blame]
// Copyright 2015 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
package org.chromium.net.impl;
import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_IDLE;
import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_LOWEST;
import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_LOW;
import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_MEDIUM;
import static org.chromium.net.BidirectionalStream.Builder.STREAM_PRIORITY_HIGHEST;
import androidx.annotation.IntDef;
import androidx.annotation.VisibleForTesting;
import org.jni_zero.CalledByNative;
import org.jni_zero.JNINamespace;
import org.jni_zero.NativeClassQualifiedName;
import org.jni_zero.NativeMethods;
import org.chromium.base.Log;
import org.chromium.net.BidirectionalStream;
import org.chromium.net.CallbackException;
import org.chromium.net.CronetException;
import org.chromium.net.ExperimentalBidirectionalStream;
import org.chromium.net.NetworkException;
import org.chromium.net.RequestFinishedInfo;
import org.chromium.net.RequestPriority;
import org.chromium.net.UrlResponseInfo;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.nio.ByteBuffer;
import java.util.AbstractMap;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import javax.annotation.concurrent.GuardedBy;
/**
* {@link BidirectionalStream} implementation using Chromium network stack.
* All @CalledByNative methods are called on the native network thread
* and post tasks with callback calls onto Executor. Upon returning from callback, the native
* stream is called on Executor thread and posts native tasks to the native network thread.
*/
@JNINamespace("cronet")
@VisibleForTesting
public class CronetBidirectionalStream extends ExperimentalBidirectionalStream {
/**
* States of BidirectionalStream are tracked in mReadState and mWriteState.
* The write state is separated out as it changes independently of the read state.
* There is one initial state: State.NOT_STARTED. There is one normal final state:
* State.SUCCESS, reached after State.READING_DONE and State.WRITING_DONE. There are two
* exceptional final states: State.CANCELED and State.ERROR, which can be reached from
* any other non-final state.
*/
@IntDef({
State.NOT_STARTED,
State.STARTED,
State.WAITING_FOR_READ,
State.READING,
State.READING_DONE,
State.CANCELED,
State.ERROR,
State.SUCCESS,
State.WAITING_FOR_FLUSH,
State.WRITING,
State.WRITING_DONE
})
@Retention(RetentionPolicy.SOURCE)
private @interface State {
/* Initial state, stream not started. */
int NOT_STARTED = 0;
/*
* Stream started, request headers are being sent if mDelayRequestHeadersUntilNextFlush
* is not set to true.
*/
int STARTED = 1;
/* Waiting for {@code read()} to be called. */
int WAITING_FOR_READ = 2;
/* Reading from the remote, {@code onReadCompleted()} callback will be called when done. */
int READING = 3;
/* There is no more data to read and stream is half-closed by the remote side. */
int READING_DONE = 4;
/* Stream is canceled. */
int CANCELED = 5;
/* Error has occurred, stream is closed. */
int ERROR = 6;
/* Reading and writing are done, and the stream is closed successfully. */
int SUCCESS = 7;
/* Waiting for {@code CronetBidirectionalStreamJni.get().sendRequestHeaders()} or {@code
CronetBidirectionalStreamJni.get().writevData()} to be called. */
int WAITING_FOR_FLUSH = 8;
/* Writing to the remote, {@code onWritevCompleted()} callback will be called when done. */
int WRITING = 9;
/* There is no more data to write and stream is half-closed by the local side. */
int WRITING_DONE = 10;
}
private final CronetUrlRequestContext mRequestContext;
private final Executor mExecutor;
private final VersionSafeCallbacks.BidirectionalStreamCallback mCallback;
private final String mInitialUrl;
private final int mInitialPriority;
private final String mInitialMethod;
private final String mRequestHeaders[];
private final UrlResponseInfo.HeaderBlock mRequestHeaderBlock;
private final boolean mDelayRequestHeadersUntilFirstFlush;
private final Collection<Object> mRequestAnnotations;
private final boolean mTrafficStatsTagSet;
private final int mTrafficStatsTag;
private final boolean mTrafficStatsUidSet;
private final int mTrafficStatsUid;
private final long mNetworkHandle;
private RefCountDelegate mInflightDoneCallbackCount;
private CronetException mException;
/*
* Synchronizes access to mNativeStream, mReadState and mWriteState.
*/
private final Object mNativeStreamLock = new Object();
@GuardedBy("mNativeStreamLock")
// Pending write data.
private LinkedList<ByteBuffer> mPendingData;
@GuardedBy("mNativeStreamLock")
// Flush data queue that should be pushed to the native stack when the previous
// CronetBidirectionalStreamJni.get().writevData completes.
private LinkedList<ByteBuffer> mFlushData;
@GuardedBy("mNativeStreamLock")
// Whether an end-of-stream flag is passed in through write().
private boolean mEndOfStreamWritten;
@GuardedBy("mNativeStreamLock")
// Whether request headers have been sent.
private boolean mRequestHeadersSent;
@GuardedBy("mNativeStreamLock")
// Metrics information. Obtained when request succeeds, fails or is canceled.
private RequestFinishedInfo.Metrics mMetrics;
/* Native BidirectionalStream object, owned by CronetBidirectionalStream. */
@GuardedBy("mNativeStreamLock")
private long mNativeStream;
/**
* Read state is tracking reading flow.
* / <--- READING <--- \
* | |
* \ /
* NOT_STARTED -> STARTED --> WAITING_FOR_READ -> READING_DONE -> SUCCESS
*/
@GuardedBy("mNativeStreamLock")
private @State int mReadState = State.NOT_STARTED;
/**
* Write state is tracking writing flow.
* / <--- WRITING <--- \
* | |
* \ /
* NOT_STARTED -> STARTED --> WAITING_FOR_FLUSH -> WRITING_DONE -> SUCCESS
*/
@GuardedBy("mNativeStreamLock")
private @State int mWriteState = State.NOT_STARTED;
// Only modified on the network thread.
private UrlResponseInfoImpl mResponseInfo;
/*
* OnReadCompleted callback is repeatedly invoked when each read is completed, so it
* is cached as a member variable.
*/
// Only modified on the network thread.
private OnReadCompletedRunnable mOnReadCompletedTask;
private Runnable mOnDestroyedCallbackForTesting;
private final class OnReadCompletedRunnable implements Runnable {
// Buffer passed back from current invocation of onReadCompleted.
ByteBuffer mByteBuffer;
// End of stream flag from current invocation of onReadCompleted.
boolean mEndOfStream;
@Override
public void run() {
try {
// Null out mByteBuffer, to pass buffer ownership to callback or release if done.
ByteBuffer buffer = mByteBuffer;
mByteBuffer = null;
boolean maybeOnSucceeded = false;
synchronized (mNativeStreamLock) {
if (isDoneLocked()) {
return;
}
if (mEndOfStream) {
mReadState = State.READING_DONE;
maybeOnSucceeded = (mWriteState == State.WRITING_DONE);
} else {
mReadState = State.WAITING_FOR_READ;
}
}
mCallback.onReadCompleted(
CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream);
if (maybeOnSucceeded) {
maybeOnSucceededOnExecutor();
}
} catch (Exception e) {
onCallbackException(e);
}
}
}
private final class OnWriteCompletedRunnable implements Runnable {
// Buffer passed back from current invocation of onWriteCompleted.
private ByteBuffer mByteBuffer;
// End of stream flag from current call to write.
private final boolean mEndOfStream;
OnWriteCompletedRunnable(ByteBuffer buffer, boolean endOfStream) {
mByteBuffer = buffer;
mEndOfStream = endOfStream;
}
@Override
public void run() {
try {
// Null out mByteBuffer, to pass buffer ownership to callback or release if done.
ByteBuffer buffer = mByteBuffer;
mByteBuffer = null;
boolean maybeOnSucceeded = false;
synchronized (mNativeStreamLock) {
if (isDoneLocked()) {
return;
}
if (mEndOfStream) {
mWriteState = State.WRITING_DONE;
maybeOnSucceeded = (mReadState == State.READING_DONE);
}
}
mCallback.onWriteCompleted(
CronetBidirectionalStream.this, mResponseInfo, buffer, mEndOfStream);
if (maybeOnSucceeded) {
maybeOnSucceededOnExecutor();
}
} catch (Exception e) {
onCallbackException(e);
}
}
}
CronetBidirectionalStream(
CronetUrlRequestContext requestContext,
String url,
@CronetEngineBase.StreamPriority int priority,
Callback callback,
Executor executor,
String httpMethod,
List<Map.Entry<String, String>> requestHeaders,
boolean delayRequestHeadersUntilNextFlush,
Collection<Object> requestAnnotations,
boolean trafficStatsTagSet,
int trafficStatsTag,
boolean trafficStatsUidSet,
int trafficStatsUid,
long networkHandle) {
mRequestContext = requestContext;
mInitialUrl = url;
mInitialPriority = convertStreamPriority(priority);
mCallback = new VersionSafeCallbacks.BidirectionalStreamCallback(callback);
mExecutor = executor;
mInitialMethod = httpMethod;
mRequestHeaders = stringsFromHeaderList(requestHeaders);
mRequestHeaderBlock = new UrlResponseInfoImpl.HeaderBlockImpl(requestHeaders);
mDelayRequestHeadersUntilFirstFlush = delayRequestHeadersUntilNextFlush;
mPendingData = new LinkedList<>();
mFlushData = new LinkedList<>();
mRequestAnnotations = requestAnnotations;
mTrafficStatsTagSet = trafficStatsTagSet;
mTrafficStatsTag = trafficStatsTag;
mTrafficStatsUidSet = trafficStatsUidSet;
mTrafficStatsUid = trafficStatsUid;
mNetworkHandle = networkHandle;
}
@Override
public String getHttpMethod() {
return mInitialMethod;
}
@Override
public boolean hasTrafficStatsTag() {
return mTrafficStatsTagSet;
}
@Override
public int getTrafficStatsTag() {
if (!hasTrafficStatsTag()) {
throw new IllegalStateException("TrafficStatsTag is not set");
}
return mTrafficStatsTag;
}
@Override
public boolean hasTrafficStatsUid() {
return mTrafficStatsUidSet;
}
@Override
public int getTrafficStatsUid() {
if (!hasTrafficStatsUid()) {
throw new IllegalStateException("TrafficStatsUid is not set");
}
return mTrafficStatsUid;
}
@Override
public UrlResponseInfo.HeaderBlock getHeaders() {
return mRequestHeaderBlock;
}
@Override
public int getPriority() {
switch (mInitialPriority) {
case RequestPriority.IDLE:
return STREAM_PRIORITY_IDLE;
case RequestPriority.LOWEST:
return STREAM_PRIORITY_LOWEST;
case RequestPriority.LOW:
return STREAM_PRIORITY_LOW;
case RequestPriority.MEDIUM:
return STREAM_PRIORITY_MEDIUM;
case RequestPriority.HIGHEST:
return STREAM_PRIORITY_HIGHEST;
default:
throw new IllegalStateException("Invalid stream priority: " + mInitialPriority);
}
}
@Override
public boolean isDelayRequestHeadersUntilFirstFlushEnabled() {
return mDelayRequestHeadersUntilFirstFlush;
}
@Override
public void start() {
synchronized (mNativeStreamLock) {
if (mReadState != State.NOT_STARTED) {
throw new IllegalStateException("Stream is already started.");
}
try {
mNativeStream =
CronetBidirectionalStreamJni.get()
.createBidirectionalStream(
CronetBidirectionalStream.this,
mRequestContext.getUrlRequestContextAdapter(),
!mDelayRequestHeadersUntilFirstFlush,
mTrafficStatsTagSet,
mTrafficStatsTag,
mTrafficStatsUidSet,
mTrafficStatsUid,
mNetworkHandle);
mRequestContext.onRequestStarted();
mInflightDoneCallbackCount =
new RefCountDelegate(mRequestContext::onRequestFinished);
// We need an initial count of 2: one decrement for the final callback
// (e.g. onSucceeded), and another for onMetricsCollected().
mInflightDoneCallbackCount.increment();
// Non-zero startResult means an argument error.
int startResult =
CronetBidirectionalStreamJni.get()
.start(
mNativeStream,
CronetBidirectionalStream.this,
mInitialUrl,
mInitialPriority,
mInitialMethod,
mRequestHeaders,
!doesMethodAllowWriteData(mInitialMethod));
if (startResult == -1) {
throw new IllegalArgumentException("Invalid http method " + mInitialMethod);
}
if (startResult > 0) {
int headerPos = startResult - 1;
throw new IllegalArgumentException(
"Invalid header with headername: " + mRequestHeaders[headerPos]);
}
mReadState = mWriteState = State.STARTED;
} catch (RuntimeException e) {
// If there's an exception, clean up and then throw the
// exception to the caller.
destroyNativeStreamLocked(false);
mInflightDoneCallbackCount.decrement();
mInflightDoneCallbackCount.decrement();
throw e;
}
}
}
@Override
public void read(ByteBuffer buffer) {
synchronized (mNativeStreamLock) {
Preconditions.checkHasRemaining(buffer);
Preconditions.checkDirect(buffer);
if (mReadState != State.WAITING_FOR_READ) {
throw new IllegalStateException("Unexpected read attempt.");
}
if (isDoneLocked()) {
return;
}
if (mOnReadCompletedTask == null) {
mOnReadCompletedTask = new OnReadCompletedRunnable();
}
mReadState = State.READING;
if (!CronetBidirectionalStreamJni.get()
.readData(
mNativeStream,
CronetBidirectionalStream.this,
buffer,
buffer.position(),
buffer.limit())) {
// Still waiting on read. This is just to have consistent
// behavior with the other error cases.
mReadState = State.WAITING_FOR_READ;
throw new IllegalArgumentException("Unable to call native read");
}
}
}
@Override
public void write(ByteBuffer buffer, boolean endOfStream) {
synchronized (mNativeStreamLock) {
Preconditions.checkDirect(buffer);
if (!buffer.hasRemaining() && !endOfStream) {
throw new IllegalArgumentException("Empty buffer before end of stream.");
}
if (mEndOfStreamWritten) {
throw new IllegalArgumentException("Write after writing end of stream.");
}
if (isDoneLocked()) {
return;
}
mPendingData.add(buffer);
if (endOfStream) {
mEndOfStreamWritten = true;
}
}
}
@Override
public void flush() {
synchronized (mNativeStreamLock) {
if (isDoneLocked()
|| (mWriteState != State.WAITING_FOR_FLUSH && mWriteState != State.WRITING)) {
return;
}
if (mPendingData.isEmpty() && mFlushData.isEmpty()) {
// If there is no pending write when flush() is called, see if
// request headers need to be flushed.
if (!mRequestHeadersSent) {
mRequestHeadersSent = true;
CronetBidirectionalStreamJni.get()
.sendRequestHeaders(mNativeStream, CronetBidirectionalStream.this);
if (!doesMethodAllowWriteData(mInitialMethod)) {
mWriteState = State.WRITING_DONE;
}
}
return;
}
assert !mPendingData.isEmpty() || !mFlushData.isEmpty();
// Move buffers from mPendingData to the flushing queue.
if (!mPendingData.isEmpty()) {
mFlushData.addAll(mPendingData);
mPendingData.clear();
}
if (mWriteState == State.WRITING) {
// If there is a write already pending, wait until onWritevCompleted is
// called before pushing data to the native stack.
return;
}
sendFlushDataLocked();
}
}
// Helper method to send buffers in mFlushData. Caller needs to acquire
// mNativeStreamLock and make sure mWriteState is WAITING_FOR_FLUSH and
// mFlushData queue isn't empty.
@SuppressWarnings("GuardedByChecker")
private void sendFlushDataLocked() {
assert mWriteState == State.WAITING_FOR_FLUSH;
int size = mFlushData.size();
ByteBuffer[] buffers = new ByteBuffer[size];
int[] positions = new int[size];
int[] limits = new int[size];
for (int i = 0; i < size; i++) {
ByteBuffer buffer = mFlushData.poll();
buffers[i] = buffer;
positions[i] = buffer.position();
limits[i] = buffer.limit();
}
assert mFlushData.isEmpty();
assert buffers.length >= 1;
mWriteState = State.WRITING;
mRequestHeadersSent = true;
if (!CronetBidirectionalStreamJni.get()
.writevData(
mNativeStream,
CronetBidirectionalStream.this,
buffers,
positions,
limits,
mEndOfStreamWritten && mPendingData.isEmpty())) {
// Still waiting on flush. This is just to have consistent
// behavior with the other error cases.
mWriteState = State.WAITING_FOR_FLUSH;
throw new IllegalArgumentException("Unable to call native writev.");
}
}
/** Returns a read-only copy of {@code mPendingData} for testing. */
public List<ByteBuffer> getPendingDataForTesting() {
synchronized (mNativeStreamLock) {
List<ByteBuffer> pendingData = new LinkedList<ByteBuffer>();
for (ByteBuffer buffer : mPendingData) {
pendingData.add(buffer.asReadOnlyBuffer());
}
return pendingData;
}
}
/** Returns a read-only copy of {@code mFlushData} for testing. */
public List<ByteBuffer> getFlushDataForTesting() {
synchronized (mNativeStreamLock) {
List<ByteBuffer> flushData = new LinkedList<ByteBuffer>();
for (ByteBuffer buffer : mFlushData) {
flushData.add(buffer.asReadOnlyBuffer());
}
return flushData;
}
}
@Override
public void cancel() {
synchronized (mNativeStreamLock) {
if (isDoneLocked() || mReadState == State.NOT_STARTED) {
return;
}
mReadState = mWriteState = State.CANCELED;
destroyNativeStreamLocked(true);
}
}
@Override
public boolean isDone() {
synchronized (mNativeStreamLock) {
return isDoneLocked();
}
}
@GuardedBy("mNativeStreamLock")
private boolean isDoneLocked() {
return mReadState != State.NOT_STARTED && mNativeStream == 0;
}
/*
* Runs an onSucceeded callback if both Read and Write sides are closed.
*/
private void maybeOnSucceededOnExecutor() {
synchronized (mNativeStreamLock) {
if (isDoneLocked()) {
return;
}
if (!(mWriteState == State.WRITING_DONE && mReadState == State.READING_DONE)) {
return;
}
mReadState = mWriteState = State.SUCCESS;
// Destroy native stream first, so UrlRequestContext could be shut
// down from the listener.
destroyNativeStreamLocked(false);
}
try {
mCallback.onSucceeded(CronetBidirectionalStream.this, mResponseInfo);
} catch (Exception e) {
Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in onSucceeded method", e);
}
mInflightDoneCallbackCount.decrement();
}
@SuppressWarnings("unused")
@CalledByNative
private void onStreamReady(final boolean requestHeadersSent) {
postTaskToExecutor(
new Runnable() {
@Override
public void run() {
synchronized (mNativeStreamLock) {
if (isDoneLocked()) {
return;
}
mRequestHeadersSent = requestHeadersSent;
mReadState = State.WAITING_FOR_READ;
if (!doesMethodAllowWriteData(mInitialMethod) && mRequestHeadersSent) {
mWriteState = State.WRITING_DONE;
} else {
mWriteState = State.WAITING_FOR_FLUSH;
}
}
try {
mCallback.onStreamReady(CronetBidirectionalStream.this);
} catch (Exception e) {
onCallbackException(e);
}
}
});
}
/**
* Called when the final set of headers, after all redirects,
* is received. Can only be called once for each stream.
*/
@SuppressWarnings("unused")
@CalledByNative
private void onResponseHeadersReceived(
int httpStatusCode,
String negotiatedProtocol,
String[] headers,
long receivedByteCount) {
try {
mResponseInfo =
prepareResponseInfoOnNetworkThread(
httpStatusCode, negotiatedProtocol, headers, receivedByteCount);
} catch (Exception e) {
failWithException(new CronetExceptionImpl("Cannot prepare ResponseInfo", null));
return;
}
postTaskToExecutor(
new Runnable() {
@Override
public void run() {
synchronized (mNativeStreamLock) {
if (isDoneLocked()) {
return;
}
mReadState = State.WAITING_FOR_READ;
}
try {
mCallback.onResponseHeadersReceived(
CronetBidirectionalStream.this, mResponseInfo);
} catch (Exception e) {
onCallbackException(e);
}
}
});
}
@SuppressWarnings("unused")
@CalledByNative
private void onReadCompleted(
final ByteBuffer byteBuffer,
int bytesRead,
int initialPosition,
int initialLimit,
long receivedByteCount) {
mResponseInfo.setReceivedByteCount(receivedByteCount);
if (byteBuffer.position() != initialPosition || byteBuffer.limit() != initialLimit) {
failWithException(
new CronetExceptionImpl("ByteBuffer modified externally during read", null));
return;
}
if (bytesRead < 0 || initialPosition + bytesRead > initialLimit) {
failWithException(new CronetExceptionImpl("Invalid number of bytes read", null));
return;
}
byteBuffer.position(initialPosition + bytesRead);
assert mOnReadCompletedTask.mByteBuffer == null;
mOnReadCompletedTask.mByteBuffer = byteBuffer;
mOnReadCompletedTask.mEndOfStream = (bytesRead == 0);
postTaskToExecutor(mOnReadCompletedTask);
}
@SuppressWarnings("unused")
@CalledByNative
private void onWritevCompleted(
final ByteBuffer[] byteBuffers,
int[] initialPositions,
int[] initialLimits,
boolean endOfStream) {
assert byteBuffers.length == initialPositions.length;
assert byteBuffers.length == initialLimits.length;
synchronized (mNativeStreamLock) {
if (isDoneLocked()) return;
mWriteState = State.WAITING_FOR_FLUSH;
// Flush if there is anything in the flush queue mFlushData.
if (!mFlushData.isEmpty()) {
sendFlushDataLocked();
}
}
for (int i = 0; i < byteBuffers.length; i++) {
ByteBuffer buffer = byteBuffers[i];
if (buffer.position() != initialPositions[i] || buffer.limit() != initialLimits[i]) {
failWithException(
new CronetExceptionImpl(
"ByteBuffer modified externally during write", null));
return;
}
// Current implementation always writes the complete buffer.
buffer.position(buffer.limit());
postTaskToExecutor(
new OnWriteCompletedRunnable(
buffer,
// Only set endOfStream flag if this buffer is the last in byteBuffers.
endOfStream && i == byteBuffers.length - 1));
}
}
@SuppressWarnings("unused")
@CalledByNative
private void onResponseTrailersReceived(String[] trailers) {
final UrlResponseInfo.HeaderBlock trailersBlock =
new UrlResponseInfoImpl.HeaderBlockImpl(headersListFromStrings(trailers));
postTaskToExecutor(
new Runnable() {
@Override
public void run() {
synchronized (mNativeStreamLock) {
if (isDoneLocked()) {
return;
}
}
try {
mCallback.onResponseTrailersReceived(
CronetBidirectionalStream.this, mResponseInfo, trailersBlock);
} catch (Exception e) {
onCallbackException(e);
}
}
});
}
@SuppressWarnings("unused")
@CalledByNative
private void onError(
int errorCode,
int nativeError,
int nativeQuicError,
String errorString,
long receivedByteCount) {
if (mResponseInfo != null) {
mResponseInfo.setReceivedByteCount(receivedByteCount);
}
if (errorCode == NetworkException.ERROR_QUIC_PROTOCOL_FAILED
|| errorCode == NetworkException.ERROR_NETWORK_CHANGED) {
failWithException(
new QuicExceptionImpl(
"Exception in BidirectionalStream: " + errorString,
errorCode,
nativeError,
nativeQuicError));
} else {
failWithException(
new BidirectionalStreamNetworkException(
"Exception in BidirectionalStream: " + errorString,
errorCode,
nativeError));
}
}
/** Called when request is canceled, no callbacks will be called afterwards. */
@SuppressWarnings("unused")
@CalledByNative
private void onCanceled() {
postTaskToExecutor(
new Runnable() {
@Override
public void run() {
try {
mCallback.onCanceled(CronetBidirectionalStream.this, mResponseInfo);
} catch (Exception e) {
Log.e(
CronetUrlRequestContext.LOG_TAG,
"Exception in onCanceled method",
e);
}
mInflightDoneCallbackCount.decrement();
}
});
}
/** Called by the native code to report metrics just before the native adapter is destroyed. */
@SuppressWarnings("unused")
@CalledByNative
private void onMetricsCollected(
long requestStartMs,
long dnsStartMs,
long dnsEndMs,
long connectStartMs,
long connectEndMs,
long sslStartMs,
long sslEndMs,
long sendingStartMs,
long sendingEndMs,
long pushStartMs,
long pushEndMs,
long responseStartMs,
long requestEndMs,
boolean socketReused,
long sentByteCount,
long receivedByteCount) {
synchronized (mNativeStreamLock) {
try {
if (mMetrics != null) {
throw new IllegalStateException("Metrics collection should only happen once.");
}
mMetrics =
new CronetMetrics(
requestStartMs,
dnsStartMs,
dnsEndMs,
connectStartMs,
connectEndMs,
sslStartMs,
sslEndMs,
sendingStartMs,
sendingEndMs,
pushStartMs,
pushEndMs,
responseStartMs,
requestEndMs,
socketReused,
sentByteCount,
receivedByteCount);
assert mReadState == mWriteState;
assert (mReadState == State.SUCCESS)
|| (mReadState == State.ERROR)
|| (mReadState == State.CANCELED);
int finishedReason;
if (mReadState == State.SUCCESS) {
finishedReason = RequestFinishedInfo.SUCCEEDED;
} else if (mReadState == State.CANCELED) {
finishedReason = RequestFinishedInfo.CANCELED;
} else {
finishedReason = RequestFinishedInfo.FAILED;
}
final RequestFinishedInfo requestFinishedInfo =
new RequestFinishedInfoImpl(
mInitialUrl,
mRequestAnnotations,
mMetrics,
finishedReason,
mResponseInfo,
mException);
mRequestContext.reportRequestFinished(
requestFinishedInfo, mInflightDoneCallbackCount);
} finally {
mInflightDoneCallbackCount.decrement();
}
}
}
public void setOnDestroyedCallbackForTesting(Runnable onDestroyedCallbackForTesting) {
mOnDestroyedCallbackForTesting = onDestroyedCallbackForTesting;
}
private static boolean doesMethodAllowWriteData(String methodName) {
return !methodName.equals("GET") && !methodName.equals("HEAD");
}
private static ArrayList<Map.Entry<String, String>> headersListFromStrings(String[] headers) {
ArrayList<Map.Entry<String, String>> headersList = new ArrayList<>(headers.length / 2);
for (int i = 0; i < headers.length; i += 2) {
headersList.add(new AbstractMap.SimpleImmutableEntry<>(headers[i], headers[i + 1]));
}
return headersList;
}
private static String[] stringsFromHeaderList(List<Map.Entry<String, String>> headersList) {
String headersArray[] = new String[headersList.size() * 2];
int i = 0;
for (Map.Entry<String, String> requestHeader : headersList) {
headersArray[i++] = requestHeader.getKey();
headersArray[i++] = requestHeader.getValue();
}
return headersArray;
}
private static int convertStreamPriority(@CronetEngineBase.StreamPriority int priority) {
switch (priority) {
case Builder.STREAM_PRIORITY_IDLE:
return RequestPriority.IDLE;
case Builder.STREAM_PRIORITY_LOWEST:
return RequestPriority.LOWEST;
case Builder.STREAM_PRIORITY_LOW:
return RequestPriority.LOW;
case Builder.STREAM_PRIORITY_MEDIUM:
return RequestPriority.MEDIUM;
case Builder.STREAM_PRIORITY_HIGHEST:
return RequestPriority.HIGHEST;
default:
throw new IllegalArgumentException("Invalid stream priority.");
}
}
/**
* Posts task to application Executor. Used for callbacks
* and other tasks that should not be executed on network thread.
*/
private void postTaskToExecutor(Runnable task) {
try {
mExecutor.execute(task);
} catch (RejectedExecutionException failException) {
Log.e(
CronetUrlRequestContext.LOG_TAG,
"Exception posting task to executor",
failException);
// If posting a task throws an exception, then there is no choice
// but to destroy the stream without invoking the callback.
synchronized (mNativeStreamLock) {
mReadState = mWriteState = State.ERROR;
destroyNativeStreamLocked(false);
}
}
}
private UrlResponseInfoImpl prepareResponseInfoOnNetworkThread(
int httpStatusCode,
String negotiatedProtocol,
String[] headers,
long receivedByteCount) {
UrlResponseInfoImpl responseInfo =
new UrlResponseInfoImpl(
Arrays.asList(mInitialUrl),
httpStatusCode,
"",
headersListFromStrings(headers),
false,
negotiatedProtocol,
null,
receivedByteCount);
return responseInfo;
}
@GuardedBy("mNativeStreamLock")
private void destroyNativeStreamLocked(boolean sendOnCanceled) {
Log.i(CronetUrlRequestContext.LOG_TAG, "destroyNativeStreamLocked " + this.toString());
if (mNativeStream == 0) {
return;
}
CronetBidirectionalStreamJni.get()
.destroy(mNativeStream, CronetBidirectionalStream.this, sendOnCanceled);
mRequestContext.onRequestDestroyed();
mNativeStream = 0;
if (mOnDestroyedCallbackForTesting != null) {
mOnDestroyedCallbackForTesting.run();
}
}
/** Fails the stream with an exception. Only called on the Executor. */
private void failWithExceptionOnExecutor(CronetException e) {
mException = e;
// Do not call into mCallback if request is complete.
synchronized (mNativeStreamLock) {
if (isDoneLocked()) {
return;
}
mReadState = mWriteState = State.ERROR;
destroyNativeStreamLocked(false);
}
try {
mCallback.onFailed(this, mResponseInfo, e);
} catch (Exception failException) {
Log.e(
CronetUrlRequestContext.LOG_TAG,
"Exception notifying of failed request",
failException);
}
mInflightDoneCallbackCount.decrement();
}
/**
* If callback method throws an exception, stream gets canceled
* and exception is reported via onFailed callback.
* Only called on the Executor.
*/
private void onCallbackException(Exception e) {
CallbackException streamError =
new CallbackExceptionImpl("CalledByNative method has thrown an exception", e);
Log.e(CronetUrlRequestContext.LOG_TAG, "Exception in CalledByNative method", e);
failWithExceptionOnExecutor(streamError);
}
/** Fails the stream with an exception. Can be called on any thread. */
private void failWithException(final CronetException exception) {
postTaskToExecutor(
new Runnable() {
@Override
public void run() {
failWithExceptionOnExecutor(exception);
}
});
}
@NativeMethods
interface Natives {
// Native methods are implemented in cronet_bidirectional_stream_adapter.cc.
long createBidirectionalStream(
CronetBidirectionalStream caller,
long urlRequestContextAdapter,
boolean sendRequestHeadersAutomatically,
boolean trafficStatsTagSet,
int trafficStatsTag,
boolean trafficStatsUidSet,
int trafficStatsUid,
long networkHandle);
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
int start(
long nativePtr,
CronetBidirectionalStream caller,
String url,
int priority,
String method,
String[] headers,
boolean endOfStream);
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
void sendRequestHeaders(long nativePtr, CronetBidirectionalStream caller);
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
boolean readData(
long nativePtr,
CronetBidirectionalStream caller,
ByteBuffer byteBuffer,
int position,
int limit);
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
boolean writevData(
long nativePtr,
CronetBidirectionalStream caller,
ByteBuffer[] buffers,
int[] positions,
int[] limits,
boolean endOfStream);
@NativeClassQualifiedName("CronetBidirectionalStreamAdapter")
void destroy(long nativePtr, CronetBidirectionalStream caller, boolean sendOnCanceled);
}
}