Skip to content

Commit

Permalink
JavaClient can stream the HTTP response body (dart-lang#1005)
Browse files Browse the repository at this point in the history
  • Loading branch information
alex-james-dev committed Aug 17, 2023
1 parent df1f625 commit 58a5462
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 48 deletions.
171 changes: 124 additions & 47 deletions pkgs/java_http/lib/src/java_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:io';
import 'dart:isolate';
import 'dart:typed_data';

import 'package:async/async.dart';
import 'package:http/http.dart';
import 'package:jni/jni.dart';
import 'package:path/path.dart';
Expand Down Expand Up @@ -39,51 +41,118 @@ class JavaClient extends BaseClient {
// See https://github.com/dart-lang/http/pull/980#discussion_r1253700470.
_initJVM();

final receivePort = ReceivePort();
final events = StreamQueue<dynamic>(receivePort);

// We can't send a StreamedRequest to another Isolate.
// But we can send Map<String, String>, String, UInt8List, Uri.
final requestBody = await request.finalize().toBytes();
final requestHeaders = request.headers;
final requestMethod = request.method;
final requestUrl = request.url;

final (statusCode, reasonPhrase, responseHeaders, responseBody) =
await Isolate.run(() async {
final httpUrlConnection = URL
.ctor3(requestUrl.toString().toJString())
.openConnection()
.castTo(HttpURLConnection.type, deleteOriginal: true);

requestHeaders.forEach((headerName, headerValue) {
httpUrlConnection.setRequestProperty(
headerName.toJString(), headerValue.toJString());
});

httpUrlConnection.setRequestMethod(requestMethod.toJString());
_setRequestBody(httpUrlConnection, requestBody);

final statusCode = _statusCode(requestUrl, httpUrlConnection);
final reasonPhrase = _reasonPhrase(httpUrlConnection);
final responseHeaders = _responseHeaders(httpUrlConnection);
final responseBody = _responseBody(httpUrlConnection);

httpUrlConnection.disconnect();
final isolateRequest = (
sendPort: receivePort.sendPort,
url: request.url,
method: request.method,
headers: request.headers,
body: await request.finalize().toBytes(),
);

// Could create a new class to hold the data for the isolate instead
// of using a record.
await Isolate.spawn(_isolateMethod, isolateRequest);

final statusCodeEvent = await events.next;
late int statusCode;
if (statusCodeEvent is ClientException) {
// Do we need to close the ReceivePort here as well?
receivePort.close();
throw statusCodeEvent;
} else {
statusCode = statusCodeEvent as int;
}

return (
statusCode,
reasonPhrase,
responseHeaders,
responseBody,
);
});
final reasonPhrase = await events.next as String?;
final responseHeaders = await events.next as Map<String, String>;

Stream<List<int>> responseBodyStream(Stream<dynamic> events) async* {
try {
await for (final event in events) {
if (event is List<int>) {
yield event;
} else if (event is ClientException) {
throw event;
} else if (event == null) {
return;
}
}
} finally {
// TODO: Should we kill the isolate here?
receivePort.close();
}
}

return StreamedResponse(Stream.value(responseBody), statusCode,
contentLength:
_contentLengthHeader(request, responseHeaders, responseBody.length),
return StreamedResponse(responseBodyStream(events.rest), statusCode,
contentLength: _parseContentLengthHeader(request.url, responseHeaders),
request: request,
headers: responseHeaders,
reasonPhrase: reasonPhrase);
}

// TODO: Rename _isolateMethod to something more descriptive.
void _isolateMethod(
({
SendPort sendPort,
Uint8List body,
Map<String, String> headers,
String method,
Uri url,
}) request,
) {
final httpUrlConnection = URL
.ctor3(request.url.toString().toJString())
.openConnection()
.castTo(HttpURLConnection.type, deleteOriginal: true);

request.headers.forEach((headerName, headerValue) {
httpUrlConnection.setRequestProperty(
headerName.toJString(), headerValue.toJString());
});

httpUrlConnection.setRequestMethod(request.method.toJString());
_setRequestBody(httpUrlConnection, request.body);

try {
final statusCode = _statusCode(request.url, httpUrlConnection);
request.sendPort.send(statusCode);
} on ClientException catch (e) {
request.sendPort.send(e);
httpUrlConnection.disconnect();
return;
}

final reasonPhrase = _reasonPhrase(httpUrlConnection);
request.sendPort.send(reasonPhrase);

final responseHeaders = _responseHeaders(httpUrlConnection);
request.sendPort.send(responseHeaders);

// TODO: Throws a ClientException if the content length header is invalid.
// I think we need to send the ClientException over the SendPort.
final contentLengthHeader = _parseContentLengthHeader(
request.url,
responseHeaders,
);

_responseBody(
request.url,
httpUrlConnection,
request.sendPort,
contentLengthHeader,
);

httpUrlConnection.disconnect();

// Signals to the receiving isolate that we are done sending events.
request.sendPort.send(null);
}

void _setRequestBody(
HttpURLConnection httpUrlConnection,
Uint8List requestBody,
Expand Down Expand Up @@ -142,42 +211,50 @@ class JavaClient extends BaseClient {
return headers.map((key, value) => MapEntry(key, value.join(',')));
}

int? _contentLengthHeader(
BaseRequest request, Map<String, String> headers, int bodyLength) {
int? _parseContentLengthHeader(
Uri requestUrl,
Map<String, String> headers,
) {
int? contentLength;
switch (headers['content-length']) {
case final contentLengthHeader?
when !_digitRegex.hasMatch(contentLengthHeader):
throw ClientException(
'Invalid content-length header [$contentLengthHeader].',
request.url,
requestUrl,
);
case final contentLengthHeader?:
contentLength = int.parse(contentLengthHeader);
if (bodyLength < contentLength) {
throw ClientException('Unexpected end of body', request.url);
}
}

return contentLength;
}

Uint8List _responseBody(HttpURLConnection httpUrlConnection) {
void _responseBody(
Uri requestUrl,
HttpURLConnection httpUrlConnection,
SendPort sendPort,
int? expectedBodyLength,
) {
final responseCode = httpUrlConnection.getResponseCode();

final inputStream = (responseCode >= 200 && responseCode <= 299)
? httpUrlConnection.getInputStream()
: httpUrlConnection.getErrorStream();

final bytes = <int>[];
int byte;
var actualBodyLength = 0;
// TODO: inputStream.read() could throw IOException.
while ((byte = inputStream.read()) != -1) {
bytes.add(byte);
sendPort.send([byte]);
actualBodyLength++;
}

inputStream.close();
if (expectedBodyLength != null && actualBodyLength < expectedBodyLength) {
sendPort.send(ClientException('Unexpected end of body', requestUrl));
}

return Uint8List.fromList(bytes);
inputStream.close();
}
}

Expand Down
1 change: 1 addition & 0 deletions pkgs/java_http/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ environment:
sdk: ^3.0.0

dependencies:
async: ^2.11.0
http: '>=0.13.4 <2.0.0'
jni: ^0.5.0
path: ^1.8.0
Expand Down
3 changes: 2 additions & 1 deletion pkgs/java_http/test/java_client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ import 'package:test/test.dart';
void main() {
group('java_http client conformance tests', () {
testIsolate(JavaClient.new);
testResponseBody(JavaClient(), canStreamResponseBody: false);
testResponseBody(JavaClient());
testResponseBodyStreamed(JavaClient());
testResponseHeaders(JavaClient());
testRequestBody(JavaClient());
testRequestHeaders(JavaClient());
Expand Down

0 comments on commit 58a5462

Please sign in to comment.