Skip to content

Commit

Permalink
Separate the cronet callbacks from the send method (#1017)
Browse files Browse the repository at this point in the history
  • Loading branch information
brianquinlan committed Sep 12, 2023
1 parent 2cbb703 commit eafbbb0
Showing 1 changed file with 101 additions and 100 deletions.
201 changes: 101 additions & 100 deletions pkgs/cronet_http/lib/src/cronet_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,103 @@ Map<String, String> _cronetToClientHeaders(
key.toDartString(releaseOriginal: true).toLowerCase(),
value.join(',')));

jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface _urlRequestCallbacks(
BaseRequest request, Completer<StreamedResponse> responseCompleter) {
StreamController<List<int>>? responseStream;
jb.ByteBuffer? byteBuffer;
var numRedirects = 0;

// The order of callbacks generated by Cronet is documented here:
// https://developer.android.com/guide/topics/connectivity/cronet/lifecycle
return jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface.implement(
jb.$UrlRequestCallbackProxy_UrlRequestCallbackInterfaceImpl(
onResponseStarted: (urlRequest, responseInfo) {
responseStream = StreamController();
final responseHeaders =
_cronetToClientHeaders(responseInfo.getAllHeaders());
int? contentLength;

switch (responseHeaders['content-length']) {
case final contentLengthHeader?
when !_digitRegex.hasMatch(contentLengthHeader):
responseCompleter.completeError(ClientException(
'Invalid content-length header [$contentLengthHeader].',
request.url,
));
urlRequest.cancel();
return;
case final contentLengthHeader?:
contentLength = int.parse(contentLengthHeader);
}
responseCompleter.complete(StreamedResponse(
responseStream!.stream,
responseInfo.getHttpStatusCode(),
contentLength: contentLength,
reasonPhrase: responseInfo
.getHttpStatusText()
.toDartString(releaseOriginal: true),
request: request,
isRedirect: false,
headers: responseHeaders,
));

byteBuffer = jb.ByteBuffer.allocateDirect(1024 * 1024);
urlRequest.read(byteBuffer!);
},
onRedirectReceived: (urlRequest, responseInfo, newLocationUrl) {
if (!request.followRedirects) {
urlRequest.cancel();
responseCompleter.complete(StreamedResponse(
const Stream.empty(), // Cronet provides no body for redirects.
responseInfo.getHttpStatusCode(),
contentLength: 0,
reasonPhrase: responseInfo
.getHttpStatusText()
.toDartString(releaseOriginal: true),
request: request,
isRedirect: true,
headers: _cronetToClientHeaders(responseInfo.getAllHeaders())));
return;
}
++numRedirects;
if (numRedirects <= request.maxRedirects) {
urlRequest.followRedirect();
} else {
urlRequest.cancel();
responseCompleter.completeError(
ClientException('Redirect limit exceeded', request.url));
}
},
onReadCompleted: (urlRequest, responseInfo, byteBuffer) {
byteBuffer.flip();

final remaining = byteBuffer.remaining();
final data = Uint8List(remaining);
// TODO: Use a more efficient approach when
// https://github.com/dart-lang/jnigen/issues/387 is fixed.
for (var i = 0; i < remaining; ++i) {
data[i] = byteBuffer.get1(i);
}
responseStream!.add(data);
byteBuffer.clear();
urlRequest.read(byteBuffer);
},
onSucceeded: (urlRequest, responseInfo) {
responseStream!.sink.close();
},
onFailed: (urlRequest, responseInfo, cronetException) {
final error = ClientException(
'Cronet exception: ${cronetException.toString()}', request.url);
if (responseStream == null) {
responseCompleter.completeError(error);
} else {
responseStream!.addError(error);
responseStream!.close();
}
},
));
}

/// A HTTP [Client] based on the
/// [Cronet](https://developer.android.com/guide/topics/connectivity/cronet)
/// network stack.
Expand Down Expand Up @@ -219,106 +316,10 @@ class CronetClient extends BaseClient {
final responseCompleter = Completer<StreamedResponse>();
final engine = _engine!._engine;

late jb.UrlRequest cronetRequest;
var numRedirects = 0;
StreamController<List<int>>? responseStream;
jb.ByteBuffer? byteBuffer;

// The order of callbacks generated by Cronet is documented here:
// https://developer.android.com/guide/topics/connectivity/cronet/lifecycle

final cronetCallbacks =
jb.UrlRequestCallbackProxy_UrlRequestCallbackInterface.implement(
jb.$UrlRequestCallbackProxy_UrlRequestCallbackInterfaceImpl(
onResponseStarted: (urlRequest, responseInfo) {
responseStream = StreamController();
final responseHeaders =
_cronetToClientHeaders(responseInfo.getAllHeaders());
int? contentLength;

switch (responseHeaders['content-length']) {
case final contentLengthHeader?
when !_digitRegex.hasMatch(contentLengthHeader):
responseCompleter.completeError(ClientException(
'Invalid content-length header [$contentLengthHeader].',
request.url,
));
urlRequest.cancel();
return;
case final contentLengthHeader?:
contentLength = int.parse(contentLengthHeader);
}
responseCompleter.complete(StreamedResponse(
responseStream!.stream,
responseInfo.getHttpStatusCode(),
contentLength: contentLength,
reasonPhrase: responseInfo
.getHttpStatusText()
.toDartString(releaseOriginal: true),
request: request,
isRedirect: false,
headers: responseHeaders,
));

byteBuffer = jb.ByteBuffer.allocateDirect(1024 * 1024);
urlRequest.read(byteBuffer!);
},
onRedirectReceived: (urlRequest, responseInfo, newLocationUrl) {
if (!request.followRedirects) {
cronetRequest.cancel();
responseCompleter.complete(StreamedResponse(
const Stream.empty(), // Cronet provides no body for redirects.
responseInfo.getHttpStatusCode(),
contentLength: 0,
reasonPhrase: responseInfo
.getHttpStatusText()
.toDartString(releaseOriginal: true),
request: request,
isRedirect: true,
headers: _cronetToClientHeaders(responseInfo.getAllHeaders())));
return;
}
++numRedirects;
if (numRedirects <= request.maxRedirects) {
cronetRequest.followRedirect();
} else {
cronetRequest.cancel();
responseCompleter.completeError(
ClientException('Redirect limit exceeded', request.url));
}
},
onReadCompleted: (urlRequest, responseInfo, byteBuffer) {
byteBuffer.flip();

final remaining = byteBuffer.remaining();
final data = Uint8List(remaining);
// TODO: Use a more efficient approach when
// https://github.com/dart-lang/jnigen/issues/387 is fixed.
for (var i = 0; i < remaining; ++i) {
data[i] = byteBuffer.get1(i);
}
responseStream!.add(data);
byteBuffer.clear();
cronetRequest.read(byteBuffer);
},
onSucceeded: (urlRequest, responseInfo) {
responseStream!.sink.close();
},
onFailed: (urlRequest, responseInfo, cronetException) {
final error = ClientException(
'Cronet exception: ${cronetException.toString()}', request.url);
if (responseStream == null) {
responseCompleter.completeError(error);
} else {
responseStream!.addError(error);
responseStream!.close();
}
},
));

final builder = engine.newUrlRequestBuilder(
request.url.toString().toJString(),
jb.UrlRequestCallbackProxy.new1(cronetCallbacks),
jb.UrlRequestCallbackProxy.new1(
_urlRequestCallbacks(request, responseCompleter)),
_executor,
);

Expand All @@ -341,7 +342,7 @@ class CronetClient extends BaseClient {
builder.setUploadDataProvider(
jb.UploadDataProviders.create4(bodyBytes), _executor);
}
cronetRequest = builder.build()..start();
return responseCompleter.future.whenComplete(() => byteBuffer?.release());
builder.build().start();
return responseCompleter.future;
}
}

0 comments on commit eafbbb0

Please sign in to comment.