diff --git a/pkgs/java_http/lib/src/java_client.dart b/pkgs/java_http/lib/src/java_client.dart index 51847a50f6..1076deae5c 100644 --- a/pkgs/java_http/lib/src/java_client.dart +++ b/pkgs/java_http/lib/src/java_client.dart @@ -2,10 +2,12 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:io'; import 'dart:isolate'; import 'dart:typed_data'; +import 'package:async/async.dart'; import 'package:http/http.dart'; import 'package:jni/jni.dart'; import 'package:path/path.dart'; @@ -39,51 +41,118 @@ class JavaClient extends BaseClient { // See https://github.com/dart-lang/http/pull/980#discussion_r1253700470. _initJVM(); + final receivePort = ReceivePort(); + final events = StreamQueue(receivePort); + // We can't send a StreamedRequest to another Isolate. // But we can send Map, String, UInt8List, Uri. - final requestBody = await request.finalize().toBytes(); - final requestHeaders = request.headers; - final requestMethod = request.method; - final requestUrl = request.url; - - final (statusCode, reasonPhrase, responseHeaders, responseBody) = - await Isolate.run(() async { - final httpUrlConnection = URL - .ctor3(requestUrl.toString().toJString()) - .openConnection() - .castTo(HttpURLConnection.type, deleteOriginal: true); - - requestHeaders.forEach((headerName, headerValue) { - httpUrlConnection.setRequestProperty( - headerName.toJString(), headerValue.toJString()); - }); - - httpUrlConnection.setRequestMethod(requestMethod.toJString()); - _setRequestBody(httpUrlConnection, requestBody); - - final statusCode = _statusCode(requestUrl, httpUrlConnection); - final reasonPhrase = _reasonPhrase(httpUrlConnection); - final responseHeaders = _responseHeaders(httpUrlConnection); - final responseBody = _responseBody(httpUrlConnection); - - httpUrlConnection.disconnect(); + final isolateRequest = ( + sendPort: receivePort.sendPort, + url: request.url, + method: request.method, + headers: request.headers, + body: await request.finalize().toBytes(), + ); + + // Could create a new class to hold the data for the isolate instead + // of using a record. + await Isolate.spawn(_isolateMethod, isolateRequest); + + final statusCodeEvent = await events.next; + late int statusCode; + if (statusCodeEvent is ClientException) { + // Do we need to close the ReceivePort here as well? + receivePort.close(); + throw statusCodeEvent; + } else { + statusCode = statusCodeEvent as int; + } - return ( - statusCode, - reasonPhrase, - responseHeaders, - responseBody, - ); - }); + final reasonPhrase = await events.next as String?; + final responseHeaders = await events.next as Map; + + Stream> responseBodyStream(Stream events) async* { + try { + await for (final event in events) { + if (event is List) { + yield event; + } else if (event is ClientException) { + throw event; + } else if (event == null) { + return; + } + } + } finally { + // TODO: Should we kill the isolate here? + receivePort.close(); + } + } - return StreamedResponse(Stream.value(responseBody), statusCode, - contentLength: - _contentLengthHeader(request, responseHeaders, responseBody.length), + return StreamedResponse(responseBodyStream(events.rest), statusCode, + contentLength: _parseContentLengthHeader(request.url, responseHeaders), request: request, headers: responseHeaders, reasonPhrase: reasonPhrase); } + // TODO: Rename _isolateMethod to something more descriptive. + void _isolateMethod( + ({ + SendPort sendPort, + Uint8List body, + Map headers, + String method, + Uri url, + }) request, + ) { + final httpUrlConnection = URL + .ctor3(request.url.toString().toJString()) + .openConnection() + .castTo(HttpURLConnection.type, deleteOriginal: true); + + request.headers.forEach((headerName, headerValue) { + httpUrlConnection.setRequestProperty( + headerName.toJString(), headerValue.toJString()); + }); + + httpUrlConnection.setRequestMethod(request.method.toJString()); + _setRequestBody(httpUrlConnection, request.body); + + try { + final statusCode = _statusCode(request.url, httpUrlConnection); + request.sendPort.send(statusCode); + } on ClientException catch (e) { + request.sendPort.send(e); + httpUrlConnection.disconnect(); + return; + } + + final reasonPhrase = _reasonPhrase(httpUrlConnection); + request.sendPort.send(reasonPhrase); + + final responseHeaders = _responseHeaders(httpUrlConnection); + request.sendPort.send(responseHeaders); + + // TODO: Throws a ClientException if the content length header is invalid. + // I think we need to send the ClientException over the SendPort. + final contentLengthHeader = _parseContentLengthHeader( + request.url, + responseHeaders, + ); + + _responseBody( + request.url, + httpUrlConnection, + request.sendPort, + contentLengthHeader, + ); + + httpUrlConnection.disconnect(); + + // Signals to the receiving isolate that we are done sending events. + request.sendPort.send(null); + } + void _setRequestBody( HttpURLConnection httpUrlConnection, Uint8List requestBody, @@ -142,42 +211,50 @@ class JavaClient extends BaseClient { return headers.map((key, value) => MapEntry(key, value.join(','))); } - int? _contentLengthHeader( - BaseRequest request, Map headers, int bodyLength) { + int? _parseContentLengthHeader( + Uri requestUrl, + Map headers, + ) { int? contentLength; switch (headers['content-length']) { case final contentLengthHeader? when !_digitRegex.hasMatch(contentLengthHeader): throw ClientException( 'Invalid content-length header [$contentLengthHeader].', - request.url, + requestUrl, ); case final contentLengthHeader?: contentLength = int.parse(contentLengthHeader); - if (bodyLength < contentLength) { - throw ClientException('Unexpected end of body', request.url); - } } return contentLength; } - Uint8List _responseBody(HttpURLConnection httpUrlConnection) { + void _responseBody( + Uri requestUrl, + HttpURLConnection httpUrlConnection, + SendPort sendPort, + int? expectedBodyLength, + ) { final responseCode = httpUrlConnection.getResponseCode(); final inputStream = (responseCode >= 200 && responseCode <= 299) ? httpUrlConnection.getInputStream() : httpUrlConnection.getErrorStream(); - final bytes = []; int byte; + var actualBodyLength = 0; + // TODO: inputStream.read() could throw IOException. while ((byte = inputStream.read()) != -1) { - bytes.add(byte); + sendPort.send([byte]); + actualBodyLength++; } - inputStream.close(); + if (expectedBodyLength != null && actualBodyLength < expectedBodyLength) { + sendPort.send(ClientException('Unexpected end of body', requestUrl)); + } - return Uint8List.fromList(bytes); + inputStream.close(); } } diff --git a/pkgs/java_http/pubspec.yaml b/pkgs/java_http/pubspec.yaml index a11cdf912a..ecba4f13e6 100644 --- a/pkgs/java_http/pubspec.yaml +++ b/pkgs/java_http/pubspec.yaml @@ -8,6 +8,7 @@ environment: sdk: ^3.0.0 dependencies: + async: ^2.11.0 http: '>=0.13.4 <2.0.0' jni: ^0.5.0 path: ^1.8.0 diff --git a/pkgs/java_http/test/java_client_test.dart b/pkgs/java_http/test/java_client_test.dart index 5ad0120099..011b6a6633 100644 --- a/pkgs/java_http/test/java_client_test.dart +++ b/pkgs/java_http/test/java_client_test.dart @@ -9,7 +9,8 @@ import 'package:test/test.dart'; void main() { group('java_http client conformance tests', () { testIsolate(JavaClient.new); - testResponseBody(JavaClient(), canStreamResponseBody: false); + testResponseBody(JavaClient()); + testResponseBodyStreamed(JavaClient()); testResponseHeaders(JavaClient()); testRequestBody(JavaClient()); testRequestHeaders(JavaClient());