Skip to content

Commit

Permalink
Add support for negotiating a subprotocol (#1150)
Browse files Browse the repository at this point in the history
  • Loading branch information
brianquinlan committed Mar 7, 2024
1 parent e71e739 commit 8d3c647
Show file tree
Hide file tree
Showing 11 changed files with 226 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkgs/web_socket/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
## 0.1.0-wip

- Abstract interface definition.
- Basic functionality in place.
19 changes: 18 additions & 1 deletion pkgs/web_socket/lib/src/browser_web_socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,23 @@ class BrowserWebSocket implements WebSocket {
final web.WebSocket _webSocket;
final _events = StreamController<WebSocketEvent>();

/// Create a new WebSocket connection using the JavaScript WebSocket API.
///
/// The URL supplied in [url] must use the scheme ws or wss.
///
/// If provided, the [protocols] argument indicates that subprotocols that
/// the peer is able to select. See
/// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9).
static Future<BrowserWebSocket> connect(Uri url,
{Iterable<String>? protocols}) async {
final webSocket = web.WebSocket(url.toString())..binaryType = 'arraybuffer';
if (!url.isScheme('ws') && !url.isScheme('wss')) {
throw ArgumentError.value(
url, 'url', 'only ws: and wss: schemes are supported');
}

final webSocket = web.WebSocket(url.toString(),
protocols?.map((e) => e.toJS).toList().toJS ?? JSArray())
..binaryType = 'arraybuffer';
final browserSocket = BrowserWebSocket._(webSocket);
final webSocketConnected = Completer<BrowserWebSocket>();

Expand Down Expand Up @@ -126,6 +140,9 @@ class BrowserWebSocket implements WebSocket {

@override
Stream<WebSocketEvent> get events => _events.stream;

@override
String get protocol => _webSocket.protocol;
}

const connect = BrowserWebSocket.connect;
32 changes: 29 additions & 3 deletions pkgs/web_socket/lib/src/io_web_socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import 'dart:async';
import 'dart:io' as io;
import 'dart:typed_data';

import '../web_socket.dart';
import 'utils.dart';
import 'web_socket.dart';

/// A `dart-io`-based [WebSocket] implementation.
///
Expand All @@ -16,14 +16,37 @@ class IOWebSocket implements WebSocket {
final io.WebSocket _webSocket;
final _events = StreamController<WebSocketEvent>();

/// Create a new WebSocket connection using dart:io WebSocket.
///
/// The URL supplied in [url] must use the scheme ws or wss.
///
/// If provided, the [protocols] argument indicates that subprotocols that
/// the peer is able to select. See
/// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9).
static Future<IOWebSocket> connect(Uri url,
{Iterable<String>? protocols}) async {
if (!url.isScheme('ws') && !url.isScheme('wss')) {
throw ArgumentError.value(
url, 'url', 'only ws: and wss: schemes are supported');
}

final io.WebSocket webSocket;
try {
final webSocket = await io.WebSocket.connect(url.toString());
return IOWebSocket._(webSocket);
webSocket =
await io.WebSocket.connect(url.toString(), protocols: protocols);
} on io.WebSocketException catch (e) {
throw WebSocketException(e.message);
}

if (webSocket.protocol != null &&
!(protocols ?? []).contains(webSocket.protocol)) {
// dart:io WebSocket does not correctly validate the returned protocol.
// See https://github.com/dart-lang/sdk/issues/55106
await webSocket.close(1002); // protocol error
throw WebSocketException(
'unexpected protocol selected by peer: ${webSocket.protocol}');
}
return IOWebSocket._(webSocket);
}

IOWebSocket._(this._webSocket) {
Expand Down Expand Up @@ -90,6 +113,9 @@ class IOWebSocket implements WebSocket {

@override
Stream<WebSocketEvent> get events => _events.stream;

@override
String get protocol => _webSocket.protocol ?? '';
}

const connect = IOWebSocket.connect;
15 changes: 15 additions & 0 deletions pkgs/web_socket/lib/src/web_socket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,13 @@ class WebSocketConnectionClosed extends WebSocketException {
/// socket.sendText('Hello Dart WebSockets! 🎉');
/// }
abstract interface class WebSocket {
/// Create a new WebSocket connection.
///
/// The URL supplied in [url] must use the scheme ws or wss.
///
/// If provided, the [protocols] argument indicates that subprotocols that
/// the peer is able to select. See
/// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9).
static Future<WebSocket> connect(Uri url, {Iterable<String>? protocols}) =>
connector.connect(url, protocols: protocols);
Expand Down Expand Up @@ -169,4 +176,12 @@ abstract interface class WebSocket {
///
/// Errors will never appear in this [Stream].
Stream<WebSocketEvent> get events;
/// The WebSocket subprotocol negotiated with the peer.
///
/// Will be the empty string if no subprotocol was negotiated.
///
/// See
/// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9).
String get protocol;
}
3 changes: 3 additions & 0 deletions pkgs/web_socket_conformance_tests/example/client_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ class MyWebSocketImplementation implements WebSocket {

@override
void sendText(String s) => throw UnimplementedError();

@override
String get protocol => throw UnimplementedError();
}

void main() {
Expand Down
18 changes: 18 additions & 0 deletions pkgs/web_socket_conformance_tests/lib/src/connect_uri_tests.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'package:test/test.dart';
import 'package:web_socket/web_socket.dart';

/// Tests that the [WebSocket] rejects invalid connection URIs.
void testConnectUri(
Future<WebSocket> Function(Uri uri, {Iterable<String>? protocols})
channelFactory) {
group('connect uri', () {
test('no protocol', () async {
await expectLater(() => channelFactory(Uri.https('www.example.com', '/')),
throwsA(isA<ArgumentError>()));
});
});
}
47 changes: 47 additions & 0 deletions pkgs/web_socket_conformance_tests/lib/src/protocol_server.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:crypto/crypto.dart';
import 'package:stream_channel/stream_channel.dart';

const _webSocketGuid = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11';

/// Starts an WebSocket server that responds with a scripted subprotocol.
void hybridMain(StreamChannel<Object?> channel) async {
late final HttpServer server;
server = (await HttpServer.bind('localhost', 0))
..listen((request) async {
final serverProtocol = request.requestedUri.queryParameters['protocol'];
var key = request.headers.value('Sec-WebSocket-Key');
var digest = sha1.convert('$key$_webSocketGuid'.codeUnits);
var accept = base64.encode(digest.bytes);
channel.sink.add(request.headers['Sec-WebSocket-Protocol']);
request.response
..statusCode = HttpStatus.switchingProtocols
..headers.add(HttpHeaders.connectionHeader, 'Upgrade')
..headers.add(HttpHeaders.upgradeHeader, 'websocket')
..headers.add('Sec-WebSocket-Accept', accept);
if (serverProtocol != null) {
request.response.headers.add('Sec-WebSocket-Protocol', serverProtocol);
}
request.response.contentLength = 0;
final socket = await request.response.detachSocket();
final webSocket = WebSocket.fromUpgradedSocket(socket,
protocol: serverProtocol, serverSide: true);
webSocket.listen((e) async {
webSocket.add(e);
await webSocket.close();
});
});

channel.sink.add(server.port);

await channel
.stream.first; // Any writes indicates that the server should exit.
unawaited(server.close());
}
12 changes: 12 additions & 0 deletions pkgs/web_socket_conformance_tests/lib/src/protocol_server_vm.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

70 changes: 70 additions & 0 deletions pkgs/web_socket_conformance_tests/lib/src/protocol_tests.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
// Copyright (c) 2024, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'package:async/async.dart';
import 'package:stream_channel/stream_channel.dart';
import 'package:test/test.dart';
import 'package:web_socket/web_socket.dart';

import 'protocol_server_vm.dart'
if (dart.library.html) 'protocol_server_web.dart';

/// Tests that the [WebSocket] can correctly negotiate a subprotocol with the
/// peer.
///
/// See
/// [RFC-6455 1.9](https://datatracker.ietf.org/doc/html/rfc6455#section-1.9).
void testProtocols(
Future<WebSocket> Function(Uri uri, {Iterable<String>? protocols})
channelFactory) {
group('protocols', () {
late Uri uri;
late StreamChannel<Object?> httpServerChannel;
late StreamQueue<Object?> httpServerQueue;

setUp(() async {
httpServerChannel = await startServer();
httpServerQueue = StreamQueue(httpServerChannel.stream);
uri = Uri.parse('ws://localhost:${await httpServerQueue.next}');
});
tearDown(() => httpServerChannel.sink.add(null));

test('no protocol', () async {
final socket = await channelFactory(uri);

expect(await httpServerQueue.next, null);
expect(socket.protocol, '');
socket.sendText('Hello World!');
});

test('single protocol', () async {
final socket = await channelFactory(
uri.replace(queryParameters: {'protocol': 'chat.example.com'}),
protocols: ['chat.example.com']);

expect(await httpServerQueue.next, ['chat.example.com']);
expect(socket.protocol, 'chat.example.com');
socket.sendText('Hello World!');
});

test('mutiple protocols', () async {
final socket = await channelFactory(
uri.replace(queryParameters: {'protocol': 'text.example.com'}),
protocols: ['chat.example.com', 'text.example.com']);

expect(
await httpServerQueue.next, ['chat.example.com, text.example.com']);
expect(socket.protocol, 'text.example.com');
socket.sendText('Hello World!');
});

test('protocol mismatch', () async {
await expectLater(
() => channelFactory(
uri.replace(queryParameters: {'protocol': 'example.example.com'}),
protocols: ['chat.example.com']),
throwsA(isA<WebSocketException>()));
});
});
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,23 @@
import 'package:web_socket/web_socket.dart';
import 'src/close_local_tests.dart';
import 'src/close_remote_tests.dart';
import 'src/connect_uri_tests.dart';
import 'src/disconnect_after_upgrade_tests.dart';
import 'src/no_upgrade_tests.dart';
import 'src/payload_transfer_tests.dart';
import 'src/peer_protocol_errors_tests.dart';
import 'src/protocol_tests.dart';

/// Runs the entire test suite against the given [WebSocket].
void testAll(
Future<WebSocket> Function(Uri uri, {Iterable<String>? protocols})
webSocketFactory) {
testCloseLocal(webSocketFactory);
testCloseRemote(webSocketFactory);
testConnectUri(webSocketFactory);
testDisconnectAfterUpgrade(webSocketFactory);
testNoUpgrade(webSocketFactory);
testPayloadTransfer(webSocketFactory);
testPeerProtocolErrors(webSocketFactory);
testProtocols(webSocketFactory);
}

0 comments on commit 8d3c647

Please sign in to comment.