Bring in the latest version of the SDK's WebSocket impl.

R=alanknight@google.com

Review URL: https://codereview.chromium.org//1947683006 .
diff --git a/lib/src/copy/bytes_builder.dart b/lib/src/copy/bytes_builder.dart
index 39d44fe..573eed5 100644
--- a/lib/src/copy/bytes_builder.dart
+++ b/lib/src/copy/bytes_builder.dart
@@ -9,7 +9,8 @@
 // Because it's copied directly, there are no modifications from the original.
 //
 // This is up-to-date as of sdk revision
-// 86227840d75d974feb238f8b3c59c038b99c05cf.
+// e41fb4cafd6052157dbc1490d437045240f4773f.
+
 import 'dart:math';
 import 'dart:typed_data';
 
@@ -122,7 +123,7 @@
     _length = required;
   }
 
-  void addByte(int byte) => add([byte]);
+  void addByte(int byte) { add([byte]); }
 
   List<int> takeBytes() {
     if (_buffer == null) return new Uint8List(0);
@@ -162,7 +163,7 @@
 
 class _BytesBuilder implements BytesBuilder {
   int _length = 0;
-  final List _chunks = [];
+  final _chunks = <List<int>>[];
 
   void add(List<int> bytes) {
     if (bytes is! Uint8List) {
@@ -172,7 +173,7 @@
     _length += bytes.length;
   }
 
-  void addByte(int byte) => add([byte]);
+  void addByte(int byte) { add([byte]); }
 
   List<int> takeBytes() {
     if (_chunks.length == 0) return new Uint8List(0);
diff --git a/lib/src/copy/io_sink.dart b/lib/src/copy/io_sink.dart
index 0578bdb..34abcad 100644
--- a/lib/src/copy/io_sink.dart
+++ b/lib/src/copy/io_sink.dart
@@ -9,22 +9,20 @@
 // desired public API and to remove "dart:io" dependencies have been made.
 //
 // This is up-to-date as of sdk revision
-// 86227840d75d974feb238f8b3c59c038b99c05cf.
+// e41fb4cafd6052157dbc1490d437045240f4773f.
+
 import 'dart:async';
 
 class StreamSinkImpl<T> implements StreamSink<T> {
   final StreamConsumer<T> _target;
-  Completer _doneCompleter = new Completer();
-  Future _doneFuture;
+  final Completer _doneCompleter = new Completer();
   StreamController<T> _controllerInstance;
   Completer _controllerCompleter;
   bool _isClosed = false;
   bool _isBound = false;
   bool _hasError = false;
 
-  StreamSinkImpl(this._target) {
-    _doneFuture = _doneCompleter.future;
-  }
+  StreamSinkImpl(this._target);
 
   void add(T data) {
     if (_isClosed) return;
@@ -65,8 +63,8 @@
     var future = _controllerCompleter.future;
     _controllerInstance.close();
     return future.whenComplete(() {
-          _isBound = false;
-        });
+      _isBound = false;
+    });
   }
 
   Future close() {
@@ -88,19 +86,19 @@
     _target.close().then(_completeDoneValue, onError: _completeDoneError);
   }
 
-  Future get done => _doneFuture;
+  Future get done => _doneCompleter.future;
 
   void _completeDoneValue(value) {
-    if (_doneCompleter == null) return;
-    _doneCompleter.complete(value);
-    _doneCompleter = null;
+    if (!_doneCompleter.isCompleted) {
+      _doneCompleter.complete(value);
+    }
   }
 
   void _completeDoneError(error, StackTrace stackTrace) {
-    if (_doneCompleter == null) return;
-    _hasError = true;
-    _doneCompleter.completeError(error, stackTrace);
-    _doneCompleter = null;
+    if (!_doneCompleter.isCompleted) {
+      _hasError = true;
+      _doneCompleter.completeError(error, stackTrace);
+    }
   }
 
   StreamController<T> get _controller {
@@ -113,33 +111,29 @@
     if (_controllerInstance == null) {
       _controllerInstance = new StreamController<T>(sync: true);
       _controllerCompleter = new Completer();
-      _target.addStream(_controller.stream)
-          .then(
-              (_) {
-                if (_isBound) {
-                  // A new stream takes over - forward values to that stream.
-                  _controllerCompleter.complete(this);
-                  _controllerCompleter = null;
-                  _controllerInstance = null;
-                } else {
-                  // No new stream, .close was called. Close _target.
-                  _closeTarget();
-                }
-              },
-              onError: (error, stackTrace) {
-                if (_isBound) {
-                  // A new stream takes over - forward errors to that stream.
-                  _controllerCompleter.completeError(error, stackTrace);
-                  _controllerCompleter = null;
-                  _controllerInstance = null;
-                } else {
-                  // No new stream. No need to close target, as it have already
-                  // failed.
-                  _completeDoneError(error, stackTrace);
-                }
-              });
-    }
+      _target.addStream(_controller.stream).then((_) {
+        if (_isBound) {
+          // A new stream takes over - forward values to that stream.
+          _controllerCompleter.complete(this);
+          _controllerCompleter = null;
+          _controllerInstance = null;
+        } else {
+          // No new stream, .close was called. Close _target.
+          _closeTarget();
+        }
+      }, onError: (error, stackTrace) {
+        if (_isBound) {
+          // A new stream takes over - forward errors to that stream.
+          _controllerCompleter.completeError(error, stackTrace);
+          _controllerCompleter = null;
+          _controllerInstance = null;
+        } else {
+          // No new stream. No need to close target, as it has already
+          // failed.
+          _completeDoneError(error, stackTrace);
+        }
+      });
+   }
     return _controllerInstance;
   }
 }
-
diff --git a/lib/src/copy/web_socket.dart b/lib/src/copy/web_socket.dart
index 53460ba..c77dc2f 100644
--- a/lib/src/copy/web_socket.dart
+++ b/lib/src/copy/web_socket.dart
@@ -9,7 +9,8 @@
 // desired public API and to remove "dart:io" dependencies have been made.
 //
 // This is up-to-date as of sdk revision
-// 86227840d75d974feb238f8b3c59c038b99c05cf.
+// e41fb4cafd6052157dbc1490d437045240f4773f.
+
 /**
  * Web socket status codes used when closing a web socket connection.
  */
@@ -18,7 +19,7 @@
   static const int GOING_AWAY = 1001;
   static const int PROTOCOL_ERROR = 1002;
   static const int UNSUPPORTED_DATA = 1003;
-  static const int RESERVED_1004  = 1004;
+  static const int RESERVED_1004 = 1004;
   static const int NO_STATUS_RECEIVED = 1005;
   static const int ABNORMAL_CLOSURE = 1006;
   static const int INVALID_FRAME_PAYLOAD_DATA = 1007;
diff --git a/lib/src/copy/web_socket_impl.dart b/lib/src/copy/web_socket_impl.dart
index c10d8e8..38db9e3 100644
--- a/lib/src/copy/web_socket_impl.dart
+++ b/lib/src/copy/web_socket_impl.dart
@@ -10,7 +10,8 @@
 // desired public API and to remove "dart:io" dependencies have been made.
 //
 // This is up-to-date as of sdk revision
-// 86227840d75d974feb238f8b3c59c038b99c05cf.
+// e41fb4cafd6052157dbc1490d437045240f4773f.
+
 import 'dart:async';
 import 'dart:convert';
 import 'dart:math';
@@ -22,6 +23,11 @@
 import 'web_socket.dart';
 
 const String webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+const String _webSocketGUID = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+const String _clientNoContextTakeover = "client_no_context_takeover";
+const String _serverNoContextTakeover = "server_no_context_takeover";
+const String _clientMaxWindowBits = "client_max_window_bits";
+const String _serverMaxWindowBits = "server_max_window_bits";
 
 final _random = new Random();
 
@@ -32,7 +38,6 @@
   static const int BINARY = 2;
 }
 
-
 class _WebSocketOpcode {
   static const int CONTINUATION = 0;
   static const int TEXT = 1;
@@ -53,16 +58,29 @@
 }
 
 /**
+ *  Stores the header and integer value derived from negotiation of
+ *  client_max_window_bits and server_max_window_bits. headerValue will be
+ *  set in the Websocket response headers.
+ */
+class _CompressionMaxWindowBits {
+  String headerValue;
+  int maxWindowBits;
+  _CompressionMaxWindowBits([this.headerValue, this.maxWindowBits]);
+  String toString() => headerValue;
+}
+
+/**
  * The web socket protocol transformer handles the protocol byte stream
  * which is supplied through the [:handleData:]. As the protocol is processed,
  * it'll output frame data as either a List<int> or String.
  *
- * Important infomation about usage: Be sure you use cancelOnError, so the
- * socket will be closed when the processer encounter an error. Not using it
+ * Important information about usage: Be sure you use cancelOnError, so the
+ * socket will be closed when the processor encounter an error. Not using it
  * will lead to undefined behaviour.
  */
 // TODO(ajohnsen): make this transformer reusable?
-class _WebSocketProtocolTransformer implements StreamTransformer, EventSink {
+class _WebSocketProtocolTransformer
+    implements StreamTransformer<List<int>, dynamic>, EventSink<List<int>> {
   static const int START = 0;
   static const int LEN_FIRST = 1;
   static const int LEN_REST = 2;
@@ -70,9 +88,15 @@
   static const int PAYLOAD = 4;
   static const int CLOSED = 5;
   static const int FAILURE = 6;
+  static const int FIN = 0x80;
+  static const int RSV1 = 0x40;
+  static const int RSV2 = 0x20;
+  static const int RSV3 = 0x10;
+  static const int OPCODE = 0xF;
 
   int _state = START;
   bool _fin = false;
+  bool _compressed = false;
   int _opcode = -1;
   int _len = -1;
   bool _masked = false;
@@ -93,29 +117,28 @@
   _WebSocketProtocolTransformer([this._serverSide = false]);
 
   Stream bind(Stream stream) {
-    return new Stream.eventTransformed(
-        stream,
-        (EventSink eventSink) {
-          if (_eventSink != null) {
-            throw new StateError("WebSocket transformer already used.");
-          }
-          _eventSink = eventSink;
-          return this;
-        });
+    return new Stream.eventTransformed(stream, (EventSink eventSink) {
+      if (_eventSink != null) {
+        throw new StateError("WebSocket transformer already used.");
+      }
+      _eventSink = eventSink;
+      return this;
+    });
   }
 
-  void addError(Object error, [StackTrace stackTrace]) =>
-      _eventSink.addError(error, stackTrace);
+  void addError(Object error, [StackTrace stackTrace]) {
+    _eventSink.addError(error, stackTrace);
+  }
 
-  void close() => _eventSink.close();
+  void close() { _eventSink.close(); }
 
   /**
    * Process data received from the underlying communication channel.
    */
-  void add(Uint8List buffer) {
-    int count = buffer.length;
+  void add(List<int> bytes) {
+    var buffer = bytes is Uint8List ? bytes : new Uint8List.fromList(bytes);
     int index = 0;
-    int lastIndex = count;
+    int lastIndex = buffer.length;
     if (_state == CLOSED) {
       throw new WebSocketChannelException("Data on closed connection");
     }
@@ -126,12 +149,23 @@
       int byte = buffer[index];
       if (_state <= LEN_REST) {
         if (_state == START) {
-          _fin = (byte & 0x80) != 0;
-          if ((byte & 0x70) != 0) {
-            // The RSV1, RSV2 bits RSV3 must be all zero.
+          _fin = (byte & FIN) != 0;
+
+          if((byte & (RSV2 | RSV3)) != 0) {
+            // The RSV2, RSV3 bits must both be zero.
             throw new WebSocketChannelException("Protocol error");
           }
-          _opcode = (byte & 0xF);
+
+          _opcode = (byte & OPCODE);
+
+          if (_opcode != _WebSocketOpcode.CONTINUATION) {
+            if ((byte & RSV1) != 0) {
+              _compressed = true;
+            } else {
+              _compressed = false;
+            }
+          }
+
           if (_opcode <= _WebSocketOpcode.BINARY) {
             if (_opcode == _WebSocketOpcode.CONTINUATION) {
               if (_currentMessageType == _WebSocketMessageType.NONE) {
@@ -139,14 +173,14 @@
               }
             } else {
               assert(_opcode == _WebSocketOpcode.TEXT ||
-                     _opcode == _WebSocketOpcode.BINARY);
+                  _opcode == _WebSocketOpcode.BINARY);
               if (_currentMessageType != _WebSocketMessageType.NONE) {
                 throw new WebSocketChannelException("Protocol error");
               }
               _currentMessageType = _opcode;
             }
           } else if (_opcode >= _WebSocketOpcode.CLOSE &&
-                     _opcode <= _WebSocketOpcode.PONG) {
+              _opcode <= _WebSocketOpcode.PONG) {
             // Control frames cannot be fragmented.
             if (!_fin) throw new WebSocketChannelException("Protocol error");
           } else {
@@ -195,15 +229,14 @@
             _unmask(index, payloadLength, buffer);
           }
           // Control frame and data frame share _payloads.
-          _payload.add(
-              new Uint8List.view(buffer.buffer, index, payloadLength));
+          _payload.add(new Uint8List.view(buffer.buffer, index, payloadLength));
           index += payloadLength;
           if (_isControlFrame()) {
             if (_remainingPayloadBytes == 0) _controlFrameEnd();
           } else {
             if (_currentMessageType != _WebSocketMessageType.TEXT &&
                 _currentMessageType != _WebSocketMessageType.BINARY) {
-                throw new WebSocketChannelException("Protocol error");
+              throw new WebSocketChannelException("Protocol error");
             }
             if (_remainingPayloadBytes == 0) _messageFrameEnd();
           }
@@ -238,8 +271,8 @@
           mask = (mask << 8) | _maskingBytes[(_unmaskingIndex + i) & 3];
         }
         Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
-        Int32x4List blockBuffer = new Int32x4List.view(
-            buffer.buffer, index, blockCount);
+        Int32x4List blockBuffer =
+            new Int32x4List.view(buffer.buffer, index, blockCount);
         for (int i = 0; i < blockBuffer.length; i++) {
           blockBuffer[i] ^= blockMask;
         }
@@ -305,12 +338,14 @@
 
   void _messageFrameEnd() {
     if (_fin) {
+      var bytes = _payload.takeBytes();
+
       switch (_currentMessageType) {
         case _WebSocketMessageType.TEXT:
-          _eventSink.add(UTF8.decode(_payload.takeBytes()));
+          _eventSink.add(UTF8.decode(bytes));
           break;
         case _WebSocketMessageType.BINARY:
-          _eventSink.add(_payload.takeBytes());
+          _eventSink.add(bytes);
           break;
       }
       _currentMessageType = _WebSocketMessageType.NONE;
@@ -352,8 +387,8 @@
 
   bool _isControlFrame() {
     return _opcode == _WebSocketOpcode.CLOSE ||
-           _opcode == _WebSocketOpcode.PING ||
-           _opcode == _WebSocketOpcode.PONG;
+        _opcode == _WebSocketOpcode.PING ||
+        _opcode == _WebSocketOpcode.PONG;
   }
 
   void _prepareForNextFrame() {
@@ -368,35 +403,32 @@
   }
 }
 
-
 class _WebSocketPing {
   final List<int> payload;
   _WebSocketPing([this.payload = null]);
 }
 
-
 class _WebSocketPong {
   final List<int> payload;
   _WebSocketPong([this.payload = null]);
 }
 
 // TODO(ajohnsen): Make this transformer reusable.
-class _WebSocketOutgoingTransformer implements StreamTransformer, EventSink {
+class _WebSocketOutgoingTransformer
+    implements StreamTransformer<dynamic, List<int>>, EventSink {
   final WebSocketImpl webSocket;
-  EventSink _eventSink;
+  EventSink<List<int>> _eventSink;
 
   _WebSocketOutgoingTransformer(this.webSocket);
 
-  Stream bind(Stream stream) {
-    return new Stream.eventTransformed(
-        stream,
-        (EventSink eventSink) {
-          if (_eventSink != null) {
-            throw new StateError("WebSocket transformer already used");
-          }
-          _eventSink = eventSink;
-          return this;
-        });
+  Stream<List<int>> bind(Stream stream) {
+    return new Stream.eventTransformed(stream, (eventSink) {
+      if (_eventSink != null) {
+        throw new StateError("WebSocket transformer already used");
+      }
+      _eventSink = eventSink;
+      return this;
+    });
   }
 
   void add(message) {
@@ -415,11 +447,12 @@
         opcode = _WebSocketOpcode.TEXT;
         data = UTF8.encode(message);
       } else {
-        if (message is !List<int>) {
+        if (message is List<int>) {
+          data = message;
+          opcode = _WebSocketOpcode.BINARY;
+        } else {
           throw new ArgumentError(message);
         }
-        opcode = _WebSocketOpcode.BINARY;
-        data = message;
       }
     } else {
       opcode = _WebSocketOpcode.TEXT;
@@ -427,8 +460,9 @@
     addFrame(opcode, data);
   }
 
-  void addError(Object error, [StackTrace stackTrace]) =>
-      _eventSink.addError(error, stackTrace);
+  void addError(Object error, [StackTrace stackTrace]) {
+    _eventSink.addError(error, stackTrace);
+  }
 
   void close() {
     int code = webSocket._outCloseCode;
@@ -446,11 +480,17 @@
     _eventSink.close();
   }
 
-  void addFrame(int opcode, List<int> data) =>
-      createFrame(opcode, data, webSocket._serverSide).forEach(_eventSink.add);
+  void addFrame(int opcode, List<int> data) => createFrame(
+          opcode,
+          data,
+          webSocket._serverSide,
+          false).forEach((e) {
+        _eventSink.add(e);
+      });
 
-  static Iterable createFrame(int opcode, List<int> data, bool serverSide) {
-    bool mask = !serverSide;  // Masking not implemented for server.
+  static Iterable<List<int>> createFrame(
+      int opcode, List<int> data, bool serverSide, bool compressed) {
+    bool mask = !serverSide; // Masking not implemented for server.
     int dataLength = data == null ? 0 : data.length;
     // Determine the header size.
     int headerSize = (mask) ? 6 : 2;
@@ -461,8 +501,13 @@
     }
     Uint8List header = new Uint8List(headerSize);
     int index = 0;
+
     // Set FIN and opcode.
-    header[index++] = 0x80 | opcode;
+    var hoc = _WebSocketProtocolTransformer.FIN
+              | (compressed ? _WebSocketProtocolTransformer.RSV1 : 0)
+              | (opcode & _WebSocketProtocolTransformer.OPCODE);
+
+    header[index++] = hoc;
     // Determine size and position of length field.
     int lengthBytes = 1;
     if (dataLength > 65535) {
@@ -495,8 +540,7 @@
             list = new Uint8List(data.length);
             for (int i = 0; i < data.length; i++) {
               if (data[i] < 0 || 255 < data[i]) {
-                throw new ArgumentError(
-                    "List element is not a byte value "
+                throw new ArgumentError("List element is not a byte value "
                     "(value ${data[i]} at index $i)");
               }
               list[i] = data[i];
@@ -512,8 +556,8 @@
             mask = (mask << 8) | maskBytes[i];
           }
           Int32x4 blockMask = new Int32x4(mask, mask, mask, mask);
-          Int32x4List blockBuffer = new Int32x4List.view(
-              list.buffer, 0, blockCount);
+          Int32x4List blockBuffer =
+              new Int32x4List.view(list.buffer, 0, blockCount);
           for (int i = 0; i < blockBuffer.length; i++) {
             blockBuffer[i] ^= blockMask;
           }
@@ -534,7 +578,6 @@
   }
 }
 
-
 class _WebSocketConsumer implements StreamConsumer {
   final WebSocketImpl webSocket;
   final StreamSink<List<int>> sink;
@@ -579,28 +622,28 @@
 
   _ensureController() {
     if (_controller != null) return;
-    _controller = new StreamController(sync: true,
-                                       onPause: _onPause,
-                                       onResume: _onResume,
-                                       onCancel: _onListen);
-    var stream = _controller.stream.transform(
-        new _WebSocketOutgoingTransformer(webSocket));
-    sink.addStream(stream)
-        .then((_) {
-          _done();
-          _closeCompleter.complete(webSocket);
-        }, onError: (error, StackTrace stackTrace) {
-          _closed = true;
-          _cancel();
-          if (error is ArgumentError) {
-            if (!_done(error, stackTrace)) {
-              _closeCompleter.completeError(error, stackTrace);
-            }
-          } else {
-            _done();
-            _closeCompleter.complete(webSocket);
-          }
-        });
+    _controller = new StreamController(
+        sync: true,
+        onPause: _onPause,
+        onResume: _onResume,
+        onCancel: _onListen);
+    var stream = _controller.stream
+        .transform(new _WebSocketOutgoingTransformer(webSocket));
+    sink.addStream(stream).then((_) {
+      _done();
+      _closeCompleter.complete(webSocket);
+    }, onError: (error, StackTrace stackTrace) {
+      _closed = true;
+      _cancel();
+      if (error is ArgumentError) {
+        if (!_done(error, stackTrace)) {
+          _closeCompleter.completeError(error, stackTrace);
+        }
+      } else {
+        _done();
+        _closeCompleter.complete(webSocket);
+      }
+    });
   }
 
   bool _done([error, StackTrace stackTrace]) {
@@ -621,13 +664,9 @@
     }
     _ensureController();
     _completer = new Completer();
-    _subscription = stream.listen(
-        (data) {
-          _controller.add(data);
-        },
-        onDone: _done,
-        onError: _done,
-        cancelOnError: true);
+    _subscription = stream.listen((data) {
+      _controller.add(data);
+    }, onDone: _done, onError: _done, cancelOnError: true);
     if (_issuedPause) {
       _subscription.pause();
       _issuedPause = false;
@@ -657,10 +696,11 @@
   }
 }
 
-
 class WebSocketImpl extends Stream with _ServiceObject implements StreamSink {
   // Use default Map so we keep order.
   static Map<int, WebSocketImpl> _webSockets = new Map<int, WebSocketImpl>();
+  static const int DEFAULT_WINDOW_BITS = 15;
+  static const String PER_MESSAGE_DEFLATE = "permessage-deflate";
 
   final String protocol;
 
@@ -681,74 +721,64 @@
   String _outCloseReason;
   Timer _closeTimer;
 
-  WebSocketImpl.fromSocket(Stream<List<int>> stream,
-      StreamSink<List<int>> sink, this.protocol, [this._serverSide = false]) {
+  WebSocketImpl.fromSocket(
+      Stream<List<int>> stream, StreamSink<List<int>> sink, this.protocol,
+      [this._serverSide = false]) {
     _consumer = new _WebSocketConsumer(this, sink);
     _sink = new StreamSinkImpl(_consumer);
     _readyState = WebSocket.OPEN;
 
     var transformer = new _WebSocketProtocolTransformer(_serverSide);
-    _subscription = stream.transform(transformer).listen(
-        (data) {
-          if (data is _WebSocketPing) {
-            if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
-          } else if (data is _WebSocketPong) {
-            // Simply set pingInterval, as it'll cancel any timers.
-            pingInterval = _pingInterval;
-          } else {
-            _controller.add(data);
-          }
-        },
-        onError: (error) {
-          if (_closeTimer != null) _closeTimer.cancel();
-          if (error is FormatException) {
-            _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
-          } else {
-            _close(WebSocketStatus.PROTOCOL_ERROR);
-          }
-          // An error happened, set the close code set above.
-          _closeCode = _outCloseCode;
-          _closeReason = _outCloseReason;
-          _controller.close();
-        },
-        onDone: () {
-          if (_closeTimer != null) _closeTimer.cancel();
-          if (_readyState == WebSocket.OPEN) {
-            _readyState = WebSocket.CLOSING;
-            if (!_isReservedStatusCode(transformer.closeCode)) {
-              _close(transformer.closeCode);
-            } else {
-              _close();
-            }
-            _readyState = WebSocket.CLOSED;
-          }
-          // Protocol close, use close code from transformer.
-          _closeCode = transformer.closeCode;
-          _closeReason = transformer.closeReason;
-          _controller.close();
-        },
-        cancelOnError: true);
+    _subscription = stream.transform(transformer).listen((data) {
+      if (data is _WebSocketPing) {
+        if (!_writeClosed) _consumer.add(new _WebSocketPong(data.payload));
+      } else if (data is _WebSocketPong) {
+        // Simply set pingInterval, as it'll cancel any timers.
+        pingInterval = _pingInterval;
+      } else {
+        _controller.add(data);
+      }
+    }, onError: (error, stackTrace) {
+      if (_closeTimer != null) _closeTimer.cancel();
+      if (error is FormatException) {
+        _close(WebSocketStatus.INVALID_FRAME_PAYLOAD_DATA);
+      } else {
+        _close(WebSocketStatus.PROTOCOL_ERROR);
+      }
+      // An error happened, set the close code set above.
+      _closeCode = _outCloseCode;
+      _closeReason = _outCloseReason;
+      _controller.close();
+    }, onDone: () {
+      if (_closeTimer != null) _closeTimer.cancel();
+      if (_readyState == WebSocket.OPEN) {
+        _readyState = WebSocket.CLOSING;
+        if (!_isReservedStatusCode(transformer.closeCode)) {
+          _close(transformer.closeCode, transformer.closeReason);
+        } else {
+          _close();
+        }
+        _readyState = WebSocket.CLOSED;
+      }
+      // Protocol close, use close code from transformer.
+      _closeCode = transformer.closeCode;
+      _closeReason = transformer.closeReason;
+      _controller.close();
+    }, cancelOnError: true);
     _subscription.pause();
-    _controller = new StreamController(sync: true,
-                                       onListen: () => _subscription.resume(),
-                                       onCancel: () {
-                                         _subscription.cancel();
-                                         _subscription = null;
-                                       },
-                                       onPause: _subscription.pause,
-                                       onResume: _subscription.resume);
+    _controller = new StreamController(
+        sync: true, onListen: () => _subscription.resume(), onCancel: () {
+      _subscription.cancel();
+      _subscription = null;
+    }, onPause: _subscription.pause, onResume: _subscription.resume);
 
     _webSockets[_serviceId] = this;
   }
 
   StreamSubscription listen(void onData(message),
-                            {Function onError,
-                             void onDone(),
-                             bool cancelOnError}) {
+      {Function onError, void onDone(), bool cancelOnError}) {
     return _controller.stream.listen(onData,
-                                     onError: onError,
-                                     onDone: onDone,
-                                     cancelOnError: cancelOnError);
+        onError: onError, onDone: onDone, cancelOnError: cancelOnError);
   }
 
   Duration get pingInterval => _pingInterval;
@@ -776,9 +806,10 @@
   int get closeCode => _closeCode;
   String get closeReason => _closeReason;
 
-  void add(data) => _sink.add(data);
-  void addError(error, [StackTrace stackTrace]) =>
-      _sink.addError(error, stackTrace);
+  void add(data) { _sink.add(data); }
+  void addError(error, [StackTrace stackTrace]) {
+    _sink.addError(error, stackTrace);
+  }
   Future addStream(Stream stream) => _sink.addStream(stream);
   Future get done => _sink.done;
 
@@ -825,20 +856,19 @@
     _webSockets.remove(_serviceId);
   }
 
-  // The _toJSON, _serviceTypePath, and _serviceTypeName methods
-  // have been deleted for http_parser. The methods were unused in WebSocket
-  // code and produced warnings.
+  // The _toJSON, _serviceTypePath, and _serviceTypeName methods have been
+  // deleted for web_socket_channel. The methods were unused in WebSocket code
+  // and produced warnings.
 
   static bool _isReservedStatusCode(int code) {
     return code != null &&
-           (code < WebSocketStatus.NORMAL_CLOSURE ||
+        (code < WebSocketStatus.NORMAL_CLOSURE ||
             code == WebSocketStatus.RESERVED_1004 ||
             code == WebSocketStatus.NO_STATUS_RECEIVED ||
             code == WebSocketStatus.ABNORMAL_CLOSURE ||
             (code > WebSocketStatus.INTERNAL_SERVER_ERROR &&
-             code < WebSocketStatus.RESERVED_1015) ||
-            (code >= WebSocketStatus.RESERVED_1015 &&
-             code < 3000));
+                code < WebSocketStatus.RESERVED_1015) ||
+            (code >= WebSocketStatus.RESERVED_1015 && code < 3000));
   }
 }