blob: 0eecd1d885893e7b1a3d6626880ec6de004aadb0 [file] [log] [blame]
// Copyright (c) 2012 The Chromium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "chrome/browser/chromeos/web_socket_proxy.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <algorithm>
#include <limits>
#include <list>
#include <map>
#include <vector>
#include <errno.h>
#include <fcntl.h>
#include <netinet/in.h>
#include <signal.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/wait.h>
#include "base/base64.h"
#include "base/basictypes.h"
#include "base/bind.h"
#include "base/bind_helpers.h"
#include "base/lazy_instance.h"
#include "base/logging.h"
#include "base/memory/ref_counted.h"
#include "base/memory/scoped_ptr.h"
#include "base/memory/weak_ptr.h"
#include "base/message_loop.h"
#include "base/sequenced_task_runner_helpers.h"
#include "base/sha1.h"
#include "base/stl_util.h"
#include "base/string_number_conversions.h"
#include "base/string_util.h"
#include "base/sys_byteorder.h"
#include "chrome/browser/chromeos/web_socket_proxy_helper.h"
#include "chrome/browser/internal_auth.h"
#include "chrome/common/chrome_notification_types.h"
#include "chrome/common/url_constants.h"
#include "content/public/browser/browser_thread.h"
#include "content/public/browser/notification_details.h"
#include "content/public/browser/notification_service.h"
#include "content/public/browser/notification_types.h"
#include "content/public/common/url_constants.h"
#include "googleurl/src/gurl.h"
#include "googleurl/src/url_parse.h"
#include "net/base/address_list.h"
#include "net/base/cert_verifier.h"
#include "net/base/host_port_pair.h"
#include "net/base/io_buffer.h"
#include "net/base/net_errors.h"
#include "net/base/ssl_config_service.h"
#include "net/socket/client_socket_factory.h"
#include "net/socket/client_socket_handle.h"
#include "net/socket/ssl_client_socket.h"
#include "net/socket/stream_socket.h"
#include "third_party/libevent/evdns.h"
#include "third_party/libevent/event.h"
using content::BrowserThread;
namespace chromeos {
namespace {
const uint8 kCRLF[] = "\r\n";
const uint8 kCRLFCRLF[] = "\r\n\r\n";
// Not a constant but preprocessor definition for easy concatenation.
#define kProxyPath "/tcpproxy"
enum WebSocketStatusCode {
WS_CLOSE_NORMAL = 1000,
WS_CLOSE_GOING_AWAY = 1001,
WS_CLOSE_PROTOCOL_ERROR = 1002,
WS_CLOSE_UNACCEPTABLE_DATA = 1003,
WS_CLOSE_DESTINATION_ERROR = 4000,
WS_CLOSE_LIMIT_VIOLATION = 4001,
WS_CLOSE_RESOLUTION_FAILED = 4002,
WS_CLOSE_UNEXPECTED = 4003
};
enum WebSocketFrameOpcode {
WS_OPCODE_TEXT = 1,
WS_OPCODE_CLOSE = 8
};
// Fixed-size (one-byte) messages communicated via control pipe.
const char kControlMessageShutdown[] = { '.' };
const char kControlMessageNetworkChange[] = { ':' };
// Returns true on success.
bool SetNonBlock(int fd) {
int flags = fcntl(fd, F_GETFL, 0);
return flags >= 0 && fcntl(fd, F_SETFL, flags | O_NONBLOCK) == 0;
}
// Returns true on success.
bool IgnoreSigPipe() {
struct sigaction sa;
sa.sa_handler = SIG_IGN;
sa.sa_flags = 0;
if (sigemptyset(&sa.sa_mask) || sigaction(SIGPIPE, &sa, 0)) {
LOG(ERROR) << "WebSocketProxy: Failed to disable sigpipe";
return false;
}
return true;
}
uint64 ReadNetworkInteger(uint8* buf, int num_bytes) {
uint64 rv = 0;
DCHECK_GE(num_bytes, 0);
DCHECK_LE(num_bytes, 8);
while (num_bytes--) {
rv <<= 8;
rv += *buf++;
}
return rv;
}
void WriteNetworkInteger(int64 n, uint8* buf, int num_bytes) {
DCHECK_GE(num_bytes, 0);
DCHECK_LE(num_bytes, 8);
while (num_bytes--) {
buf[num_bytes] = n % (1 << 8);
n >>= 8;
}
}
typedef uint8 (*AsciiFilter)(uint8);
uint8 AsciiFilterVerbatim(uint8 c) {
return c;
}
uint8 AsciiFilterLower(uint8 c) {
return base::ToLowerASCII(c);
}
std::string FetchAsciiSnippet(uint8* begin, uint8* end, AsciiFilter filter) {
std::string rv;
for (; begin < end; ++begin) {
if (!isascii(*begin))
return rv;
rv += filter(*begin);
}
return rv;
}
std::string FetchExtensionIdFromOrigin(const std::string &origin) {
GURL url(origin);
if (url.SchemeIs(chrome::kExtensionScheme))
return url.host();
else
return std::string();
}
inline size_t strlen(const void* s) {
return ::strlen(static_cast<const char*>(s));
}
void SendNotification(int port) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::UI));
content::NotificationService::current()->Notify(
chrome::NOTIFICATION_WEB_SOCKET_PROXY_STARTED,
content::NotificationService::AllSources(), content::Details<int>(&port));
}
class Conn;
// Websocket to TCP proxy server.
class Serv {
public:
Serv();
~Serv();
// Do not call it twice.
void Run();
// Terminates running server (should be called on a different thread).
void Shutdown();
// Called on network change. Reinitializes DNS resolution service.
void OnNetworkChange();
void ZapConn(Conn*);
void MarkConnImportance(Conn*, bool important);
Conn* GetFreshConn();
bool IsConnSane(Conn*);
void CloseAll();
static void OnConnect(int listening_sock, short event, void*);
static void OnControlRequest(int fd, short event, void*);
struct event_base* evbase() { return evbase_; }
// Libevent base.
struct event_base* evbase_;
// Socket to listen incoming websocket connections.
int listening_sock_;
// TODO(dilmah): remove this extra socket as soon as obsolete
// getPassportForTCP function is removed from webSocketProxyPrivate API.
// Additional socket to listen incoming connections on fixed port 10101.
int extra_listening_sock_;
// Used to communicate control requests: either shutdown request or network
// change notification.
int control_descriptor_[2];
// Flag whether shutdown has been requested.
bool shutdown_requested_;
// List of pending connections; We are trying to keep size of this list
// below kConnPoolLimit in LRU fashion.
typedef std::list<Conn*> ConnPool;
ConnPool conn_pool_;
// Reverse map to look up a connection in a conn_pool.
typedef std::map<Conn*, ConnPool::iterator> RevMap;
RevMap rev_map_;
scoped_ptr<struct event> connection_event_;
scoped_ptr<struct event> control_event_;
// TODO(dilmah): remove this extra event as soon as obsolete
// getPassportForTCP function is removed from webSocketProxyPrivate API.
scoped_ptr<struct event> extra_connection_event_;
DISALLOW_COPY_AND_ASSIGN(Serv);
};
// Connection (amalgamates both channels between proxy and javascript and
// between proxy and destination).
class Conn {
public:
enum Phase {
// Initial stage of connection.
PHASE_WAIT_HANDSHAKE,
PHASE_WAIT_DESTFRAME,
PHASE_WAIT_DESTCONNECT,
// Operational stage of connection.
PHASE_OUTSIDE_FRAME,
PHASE_INSIDE_FRAME_BASE64,
PHASE_INSIDE_FRAME_SKIP,
// Terminal stage of connection.
PHASE_SHUT, // Closing handshake was emitted, buffers may be pending.
PHASE_DEFUNCT // Connection was nuked.
};
// Channel structure (either proxy<->browser or proxy<->destination).
class Chan {
public:
explicit Chan(Conn* master)
: master_(master),
write_pending_(false),
read_bev_(NULL),
write_bev_(NULL),
read_fd_(-1),
write_fd_(-1) {
}
~Chan() {
Zap();
}
// Returns true on success.
bool Write(const void* data, size_t size) {
if (write_bev_ == NULL)
return false;
write_pending_ = true;
return (0 == bufferevent_write(write_bev_, data, size));
}
void Zap() {
if (read_bev_) {
bufferevent_disable(read_bev_, EV_READ);
bufferevent_free(read_bev_);
}
if (write_bev_ && write_bev_ != read_bev_) {
bufferevent_disable(write_bev_, EV_READ);
bufferevent_free(write_bev_);
}
read_bev_ = NULL;
write_bev_ = NULL;
if (write_fd_ && read_fd_ == write_fd_)
shutdown(write_fd_, SHUT_RDWR);
if (write_fd_ >= 0) {
close(write_fd_);
DCHECK_GE(read_fd_, 0);
}
if (read_fd_ && read_fd_ != write_fd_)
close(read_fd_);
read_fd_ = -1;
write_fd_ = -1;
write_pending_ = false;
master_->ConsiderSuicide();
}
void Shut() {
if (!write_pending_)
Zap();
}
int read_fd() const { return read_fd_; }
void set_read_fd(int fd) { read_fd_ = fd; }
int write_fd() const { return write_fd_; }
void set_write_fd(int fd) { write_fd_ = fd; }
bool write_pending() const { return write_pending_; }
void set_write_pending(bool pending) { write_pending_ = pending; }
struct bufferevent* read_bev() const { return read_bev_; }
void set_read_bev(struct bufferevent* bev) { read_bev_ = bev; }
struct bufferevent* write_bev() const { return write_bev_; }
void set_write_bev(struct bufferevent* bev) { write_bev_ = bev; }
private:
Conn* master_;
bool write_pending_; // Whether write buffer is not flushed yet.
struct bufferevent* read_bev_;
struct bufferevent* write_bev_;
// UNIX descriptors.
int read_fd_;
int write_fd_;
DISALLOW_COPY_AND_ASSIGN(Chan);
};
// Status of processing incoming data.
enum Status {
STATUS_OK,
STATUS_INCOMPLETE, // Not all required data is present in buffer yet.
STATUS_SKIP,
STATUS_ABORT // Data is invalid. We must shut connection.
};
// Unfortunately evdns callbacks are uncancellable,
// so potentially we can receive callback for a deleted Conn.
// Even worse, storage of deleted Conn may be reused
// for a new connection and new connection can receive callback
// destined for deleted Conn.
// EventKey is introduced in order to prevent that.
typedef void* EventKey;
typedef std::map<EventKey, Conn*> EventKeyMap;
explicit Conn(Serv* master);
~Conn();
static Conn* Get(EventKey evkey);
void Shut(int status, const void* reason);
void ConsiderSuicide();
Status ConsumeHeader(struct evbuffer*);
Status ConsumeDestframe(struct evbuffer*);
Status ConsumeFrameHeader(struct evbuffer*);
Status ProcessFrameData(struct evbuffer*);
// Return true on success.
bool EmitHandshake(Chan*);
bool EmitFrame(
Chan*, WebSocketFrameOpcode opcode, const void* buf, size_t size);
// Attempts to establish second connection (to remote TCP service).
// Returns true on success.
bool TryConnectDest(const struct sockaddr*, socklen_t);
// Return security origin associated with this connection.
const std::string& GetOrigin();
// Used as libevent callbacks.
static void OnDestConnectTimeout(int, short, EventKey);
static void OnPrimchanRead(struct bufferevent*, EventKey);
static void OnPrimchanWrite(struct bufferevent*, EventKey);
static void OnPrimchanError(struct bufferevent*, short what, EventKey);
static void OnDestResolutionIPv4(int result, char type, int count,
int ttl, void* addr_list, EventKey);
static void OnDestResolutionIPv6(int result, char type, int count,
int ttl, void* addr_list, EventKey);
static void OnDestchanRead(struct bufferevent*, EventKey);
static void OnDestchanWrite(struct bufferevent*, EventKey);
static void OnDestchanError(struct bufferevent*, short what, EventKey);
Chan& primchan() { return primchan_; }
EventKey evkey() const { return evkey_; }
private:
Serv* master_;
Phase phase_;
uint64 frame_bytes_remaining_;
uint8 frame_mask_[4];
int frame_mask_index_;
// We maintain two channels per Conn:
// primary channel is websocket connection.
Chan primchan_;
// Destination channel is a proxied connection.
Chan destchan_;
EventKey evkey_;
// Header fields supplied by client at initial websocket handshake.
std::map<std::string, std::string> header_fields_;
// Parameters requested via query component of GET resource.
std::map<std::string, std::string> requested_parameters_;
// Hostname and port of destination socket.
// Websocket client supplies them in first data frame (destframe).
std::string destname_;
int destport_;
// Preresolved |destname_| (empty if not pre-resolved).
std::string destaddr_;
// Whether TLS over TCP requested.
bool do_tls_;
// We try to DNS resolve hostname in both IPv4 and IPv6 domains.
// Track resolution failures here.
bool destresolution_ipv4_failed_;
bool destresolution_ipv6_failed_;
// Used to schedule a timeout for initial phase of connection.
scoped_ptr<struct event> destconnect_timeout_event_;
static base::LazyInstance<EventKeyMap>::Leaky evkey_map_;
static EventKey last_evkey_;
DISALLOW_COPY_AND_ASSIGN(Conn);
};
class SSLChan : public MessageLoopForIO::Watcher {
public:
static void Start(const net::AddressList& address_list,
const net::HostPortPair& host_port_pair,
int read_pipe,
int write_pipe) {
DCHECK(BrowserThread::CurrentlyOn(BrowserThread::IO));
SSLChan* ALLOW_UNUSED chan = new SSLChan(
address_list, host_port_pair, read_pipe, write_pipe);
}
private:
enum Phase {
PHASE_CONNECTING,
PHASE_RUNNING,
PHASE_CLOSING,
PHASE_CLOSED
};
class DerivedIOBufferWithSize : public net::IOBufferWithSize {
public:
DerivedIOBufferWithSize(net::IOBuffer* host, int size)
: IOBufferWithSize(host->data(), size), host_(host) {
DCHECK(host_);
DCHECK(host_->data());
}
protected:
virtual ~DerivedIOBufferWithSize() {
data_ = NULL; // We do not own memory, bypass base class destructor.
}
scoped_refptr<net::IOBuffer> host_;
};
// Provides queue of data represented as IOBuffers.
class IOBufferQueue {
public:
// We do not allocate all capacity at once but lazily in |buf_size_| chunks.
explicit IOBufferQueue(int capacity)
: buf_size_(1 + capacity / kNumBuffersLimit) {
}
// Obtains IOBuffer to add new data to back.
net::IOBufferWithSize* GetIOBufferToFill() {
if (back_ == NULL) {
if (storage_.size() >= kNumBuffersLimit)
return NULL;
storage_.push_back(new net::IOBufferWithSize(buf_size_));
back_ = new net::DrainableIOBuffer(storage_.back(), buf_size_);
}
return new DerivedIOBufferWithSize(
back_.get(), back_->BytesRemaining());
}
// Obtains IOBuffer with some data from front.
net::IOBufferWithSize* GetIOBufferToProcess() {
if (front_ == NULL) {
if (storage_.empty())
return NULL;
front_ = new net::DrainableIOBuffer(storage_.front(), buf_size_);
}
int front_capacity = (storage_.size() == 1 && back_) ?
back_->BytesConsumed() : buf_size_;
return new DerivedIOBufferWithSize(
front_.get(), front_capacity - front_->BytesConsumed());
}
// Records number of bytes as added to back.
void DidFill(int bytes) {
DCHECK(back_);
back_->DidConsume(bytes);
if (back_->BytesRemaining() == 0)
back_ = NULL;
}
// Pops number of bytes from front.
void DidProcess(int bytes) {
DCHECK(front_);
front_->DidConsume(bytes);
if (front_->BytesRemaining() == 0) {
storage_.pop_front();
front_ = NULL;
}
}
void Clear() {
front_ = NULL;
back_ = NULL;
storage_.clear();
}
private:
static const unsigned kNumBuffersLimit = 12;
const int buf_size_;
std::list< scoped_refptr<net::IOBufferWithSize> > storage_;
scoped_refptr<net::DrainableIOBuffer> front_;
scoped_refptr<net::DrainableIOBuffer> back_;
DISALLOW_COPY_AND_ASSIGN(IOBufferQueue);
};
SSLChan(const net::AddressList address_list,
const net::HostPortPair host_port_pair,
int read_pipe,
int write_pipe)
: phase_(PHASE_CONNECTING),
host_port_pair_(host_port_pair),
inbound_stream_(WebSocketProxy::kBufferLimit),
outbound_stream_(WebSocketProxy::kBufferLimit),
read_pipe_(read_pipe),
write_pipe_(write_pipe),
ALLOW_THIS_IN_INITIALIZER_LIST(weak_factory_(this)) {
if (!SetNonBlock(read_pipe_) || !SetNonBlock(write_pipe_)) {
Shut(net::ERR_UNEXPECTED);
return;
}
net::ClientSocketFactory* factory =
net::ClientSocketFactory::GetDefaultFactory();
socket_.reset(factory->CreateTransportClientSocket(
address_list, NULL, net::NetLog::Source()));
if (socket_ == NULL) {
Shut(net::ERR_FAILED);
return;
}
int result = socket_->Connect(base::Bind(&SSLChan::OnSocketConnect,
base::Unretained(this)));
if (result != net::ERR_IO_PENDING)
OnSocketConnect(result);
}
~SSLChan() {
phase_ = PHASE_CLOSED;
write_pipe_controller_.StopWatchingFileDescriptor();
read_pipe_controller_.StopWatchingFileDescriptor();
close(write_pipe_);
close(read_pipe_);
}
void Shut(int ALLOW_UNUSED net_error_code) {
if (phase_ != PHASE_CLOSED) {
phase_ = PHASE_CLOSING;
scoped_refptr<net::IOBufferWithSize> buf[] = {
outbound_stream_.GetIOBufferToProcess(),
inbound_stream_.GetIOBufferToProcess()
};
for (int i = arraysize(buf); i--;) {
if (buf[i] && buf[i]->size() > 0) {
MessageLoop::current()->PostTask(
FROM_HERE,
base::Bind(&SSLChan::Proceed, weak_factory_.GetWeakPtr()));
return;
}
}
phase_ = PHASE_CLOSED;
if (socket_ != NULL) {
socket_->Disconnect();
socket_.reset();
}
MessageLoop::current()->DeleteSoon(FROM_HERE, this);
}
}
void OnSocketConnect(int result) {
if (phase_ != PHASE_CONNECTING) {
NOTREACHED();
return;
}
if (result) {
Shut(result);
return;
}
net::ClientSocketHandle* handle = new net::ClientSocketHandle();
handle->set_socket(socket_.release());
net::ClientSocketFactory* factory =
net::ClientSocketFactory::GetDefaultFactory();
net::SSLClientSocketContext ssl_context;
if (!cert_verifier_.get())
cert_verifier_.reset(net::CertVerifier::CreateDefault());
ssl_context.cert_verifier = cert_verifier_.get();
socket_.reset(factory->CreateSSLClientSocket(
handle, host_port_pair_, ssl_config_, ssl_context));
if (!socket_.get()) {
LOG(WARNING) << "Failed to create an SSL client socket.";
OnSSLHandshakeCompleted(net::ERR_UNEXPECTED);
return;
}
result = socket_->Connect(base::Bind(&SSLChan::OnSSLHandshakeCompleted,
base::Unretained(this)));
if (result != net::ERR_IO_PENDING)
OnSSLHandshakeCompleted(result);
}
void OnSSLHandshakeCompleted(int result) {
if (result) {
Shut(result);
return;
}
is_socket_read_pending_ = false;
is_socket_write_pending_ = false;
is_read_pipe_blocked_ = false;
is_write_pipe_blocked_ = false;
MessageLoopForIO::current()->WatchFileDescriptor(
read_pipe_, false, MessageLoopForIO::WATCH_READ,
&read_pipe_controller_, this);
MessageLoopForIO::current()->WatchFileDescriptor(
write_pipe_, false, MessageLoopForIO::WATCH_WRITE,
&write_pipe_controller_, this);
phase_ = PHASE_RUNNING;
Proceed();
}
void OnSocketRead(int result) {
DCHECK(is_socket_read_pending_);
is_socket_read_pending_ = false;
if (result <= 0) {
Shut(result);
return;
}
inbound_stream_.DidFill(result);
Proceed();
}
void OnSocketWrite(int result) {
DCHECK(is_socket_write_pending_);
is_socket_write_pending_ = false;
if (result < 0) {
outbound_stream_.Clear();
Shut(result);
return;
}
outbound_stream_.DidProcess(result);
Proceed();
}
// MessageLoopForIO::Watcher overrides.
virtual void OnFileCanReadWithoutBlocking(int fd) OVERRIDE {
if (fd != read_pipe_) {
NOTREACHED();
return;
}
is_read_pipe_blocked_ = false;
Proceed();
}
virtual void OnFileCanWriteWithoutBlocking(int fd) OVERRIDE {
if (fd != write_pipe_) {
NOTREACHED();
return;
}
is_write_pipe_blocked_ = false;
Proceed();
}
private:
void Proceed() {
if (phase_ != PHASE_RUNNING && phase_ != PHASE_CLOSING)
return;
for (bool proceed = true; proceed;) {
proceed = false;
if (!is_read_pipe_blocked_ && phase_ == PHASE_RUNNING) {
scoped_refptr<net::IOBufferWithSize> buf =
outbound_stream_.GetIOBufferToFill();
if (buf && buf->size() > 0) {
int rv = read(read_pipe_, buf->data(), buf->size());
if (rv > 0) {
outbound_stream_.DidFill(rv);
proceed = true;
} else if (rv == -1 && errno == EAGAIN) {
is_read_pipe_blocked_ = true;
MessageLoopForIO::current()->WatchFileDescriptor(
read_pipe_, false, MessageLoopForIO::WATCH_READ,
&read_pipe_controller_, this);
} else if (rv == 0) {
Shut(0);
} else {
DCHECK_LT(rv, 0);
Shut(net::ERR_UNEXPECTED);
return;
}
}
}
if (!is_socket_read_pending_ && phase_ == PHASE_RUNNING) {
scoped_refptr<net::IOBufferWithSize> buf =
inbound_stream_.GetIOBufferToFill();
if (buf && buf->size() > 0) {
int rv = socket_->Read(
buf, buf->size(),
base::Bind(&SSLChan::OnSocketRead, base::Unretained(this)));
is_socket_read_pending_ = true;
if (rv != net::ERR_IO_PENDING) {
MessageLoop::current()->PostTask(
FROM_HERE, base::Bind(&SSLChan::OnSocketRead,
weak_factory_.GetWeakPtr(), rv));
}
}
}
if (!is_socket_write_pending_) {
scoped_refptr<net::IOBufferWithSize> buf =
outbound_stream_.GetIOBufferToProcess();
if (buf && buf->size() > 0) {
int rv = socket_->Write(
buf, buf->size(),
base::Bind(&SSLChan::OnSocketWrite, base::Unretained(this)));
is_socket_write_pending_ = true;
if (rv != net::ERR_IO_PENDING) {
MessageLoop::current()->PostTask(
FROM_HERE, base::Bind(&SSLChan::OnSocketWrite,
weak_factory_.GetWeakPtr(), rv));
}
} else if (phase_ == PHASE_CLOSING) {
Shut(0);
}
}
if (!is_write_pipe_blocked_) {
scoped_refptr<net::IOBufferWithSize> buf =
inbound_stream_.GetIOBufferToProcess();
if (buf && buf->size() > 0) {
int rv = write(write_pipe_, buf->data(), buf->size());
if (rv > 0) {
inbound_stream_.DidProcess(rv);
proceed = true;
} else if (rv == -1 && errno == EAGAIN) {
is_write_pipe_blocked_ = true;
MessageLoopForIO::current()->WatchFileDescriptor(
write_pipe_, false, MessageLoopForIO::WATCH_WRITE,
&write_pipe_controller_, this);
} else {
DCHECK_LE(rv, 0);
inbound_stream_.Clear();
Shut(net::ERR_UNEXPECTED);
return;
}
} else if (phase_ == PHASE_CLOSING) {
Shut(0);
}
}
}
}
Phase phase_;
scoped_ptr<net::StreamSocket> socket_;
net::HostPortPair host_port_pair_;
scoped_ptr<net::CertVerifier> cert_verifier_;
net::SSLConfig ssl_config_;
IOBufferQueue inbound_stream_;
IOBufferQueue outbound_stream_;
int read_pipe_;
int write_pipe_;
bool is_socket_read_pending_;
bool is_socket_write_pending_;
bool is_read_pipe_blocked_;
bool is_write_pipe_blocked_;
base::WeakPtrFactory<SSLChan> weak_factory_;
MessageLoopForIO::FileDescriptorWatcher read_pipe_controller_;
MessageLoopForIO::FileDescriptorWatcher write_pipe_controller_;
friend class base::DeleteHelper<SSLChan>;
DISALLOW_COPY_AND_ASSIGN(SSLChan);
};
Serv::Serv()
: evbase_(NULL),
listening_sock_(-1),
extra_listening_sock_(-1),
shutdown_requested_(false) {
control_descriptor_[0] = -1;
control_descriptor_[1] = -1;
}
Serv::~Serv() {
CloseAll();
}
void Serv::Run() {
if (evbase_ || shutdown_requested_)
return;
evbase_ = event_init();
if (!evbase_) {
LOG(ERROR) << "WebSocketProxy: Couldn't create libevent base";
return;
}
if (pipe(control_descriptor_) ||
!SetNonBlock(control_descriptor_[0]) ||
!SetNonBlock(control_descriptor_[1])) {
LOG(ERROR) << "WebSocketProxy: Failed to create control pipe";
return;
}
listening_sock_ = socket(AF_INET, SOCK_STREAM, 0);
if (listening_sock_ < 0) {
LOG(ERROR) << "WebSocketProxy: Failed to create socket";
return;
}
{
int on = 1;
setsockopt(listening_sock_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
}
struct sockaddr_in addr;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
// Let the OS allocate a port number.
addr.sin_port = base::HostToNet16(0);
addr.sin_addr.s_addr = base::HostToNet32(INADDR_LOOPBACK);
if (bind(listening_sock_,
reinterpret_cast<struct sockaddr*>(&addr),
sizeof(addr))) {
LOG(ERROR) << "WebSocketProxy: Failed to bind server socket";
return;
}
if (listen(listening_sock_, 12)) {
LOG(ERROR) << "WebSocketProxy: Failed to listen server socket";
return;
}
if (!SetNonBlock(listening_sock_)) {
LOG(ERROR) << "WebSocketProxy: Failed to go non block";
return;
}
connection_event_.reset(new struct event);
event_set(connection_event_.get(), listening_sock_, EV_READ | EV_PERSIST,
&OnConnect, this);
event_base_set(evbase_, connection_event_.get());
if (event_add(connection_event_.get(), NULL)) {
LOG(ERROR) << "WebSocketProxy: Failed to add listening event";
return;
}
{
// TODO(dilmah): remove this control block as soon as obsolete
// getPassportForTCP function is removed from webSocketProxyPrivate API.
// Following block adds extra listening socket on fixed port 10101.
extra_listening_sock_ = socket(AF_INET, SOCK_STREAM, 0);
if (extra_listening_sock_ < 0) {
LOG(ERROR) << "WebSocketProxy: Failed to create socket";
return;
}
{
int on = 1;
setsockopt(listening_sock_, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
}
const int kPort = 10101;
memset(&addr, 0, sizeof(addr));
addr.sin_family = AF_INET;
addr.sin_port = base::HostToNet16(kPort);
addr.sin_addr.s_addr = base::HostToNet32(INADDR_LOOPBACK);
if (bind(extra_listening_sock_,
reinterpret_cast<struct sockaddr*>(&addr),
sizeof(addr))) {
LOG(ERROR) << "WebSocketProxy: Failed to bind server socket";
return;
}
if (listen(extra_listening_sock_, 12)) {
LOG(ERROR) << "WebSocketProxy: Failed to listen server socket";
return;
}
if (!SetNonBlock(extra_listening_sock_)) {
LOG(ERROR) << "WebSocketProxy: Failed to go non block";
return;
}
extra_connection_event_.reset(new struct event);
event_set(extra_connection_event_.get(), extra_listening_sock_,
EV_READ | EV_PERSIST, &OnConnect, this);
event_base_set(evbase_, extra_connection_event_.get());
if (event_add(extra_connection_event_.get(), NULL)) {
LOG(ERROR) << "WebSocketProxy: Failed to add listening event";
return;
}
}
control_event_.reset(new struct event);
event_set(control_event_.get(), control_descriptor_[0], EV_READ | EV_PERSIST,
&OnControlRequest, this);
event_base_set(evbase_, control_event_.get());
if (event_add(control_event_.get(), NULL)) {
LOG(ERROR) << "WebSocketProxy: Failed to add control event";
return;
}
if (evdns_init())
LOG(WARNING) << "WebSocketProxy: Failed to initialize evDNS";
if (!IgnoreSigPipe()) {
LOG(ERROR) << "WebSocketProxy: Failed to ignore SIGPIPE";
return;
}
memset(&addr, 0, sizeof(addr));
socklen_t addr_len = sizeof(addr);
if (getsockname(
listening_sock_, reinterpret_cast<struct sockaddr*>(&addr), &addr_len)) {
LOG(ERROR) << "Failed to determine listening port";
return;
}
BrowserThread::PostTask(
BrowserThread::UI, FROM_HERE,
base::Bind(&SendNotification, base::NetToHost16(addr.sin_port)));
LOG(INFO) << "WebSocketProxy: Starting event dispatch loop.";
event_base_dispatch(evbase_);
if (shutdown_requested_)
LOG(INFO) << "WebSocketProxy: Event dispatch loop terminated upon request";
else
LOG(ERROR) << "WebSocketProxy: Event dispatch loop terminated unexpectedly";
CloseAll();
}
void Serv::Shutdown() {
int r ALLOW_UNUSED =
write(control_descriptor_[1], kControlMessageShutdown, 1);
}
void Serv::OnNetworkChange() {
int r ALLOW_UNUSED =
write(control_descriptor_[1], kControlMessageNetworkChange, 1);
}
void Serv::CloseAll() {
while (!conn_pool_.empty())
ZapConn(conn_pool_.back());
if (listening_sock_ >= 0) {
shutdown(listening_sock_, SHUT_RDWR);
close(listening_sock_);
}
for (int i = 0; i < 2; ++i) {
if (control_descriptor_[i] >= 0) {
control_descriptor_[i] = -1;
close(control_descriptor_[i]);
}
}
if (control_event_.get()) {
event_del(control_event_.get());
control_event_.reset();
}
if (extra_connection_event_.get()) {
event_del(extra_connection_event_.get());
extra_connection_event_.reset();
}
if (connection_event_.get()) {
event_del(connection_event_.get());
connection_event_.reset();
}
if (evbase_) {
event_base_free(evbase_);
evbase_ = NULL;
}
}
void Serv::ZapConn(Conn* cs) {
RevMap::iterator rit = rev_map_.find(cs);
if (rit != rev_map_.end()) {
conn_pool_.erase(rit->second);
rev_map_.erase(rit);
delete cs;
}
}
void Serv::MarkConnImportance(Conn* cs, bool important) {
if (conn_pool_.size() < WebSocketProxy::kConnPoolLimit / 4) {
// Fast common path.
return;
}
RevMap::iterator rit = rev_map_.find(cs);
if (rit != rev_map_.end()) {
ConnPool::iterator it = rit->second;
CHECK(*it == cs);
if (important && it == conn_pool_.begin()) {
// Already at the top. Shortcut.
return;
}
conn_pool_.erase(it);
}
if (important) {
conn_pool_.push_front(cs);
rev_map_[cs] = conn_pool_.begin();
} else {
conn_pool_.push_back(cs);
rev_map_[cs] = conn_pool_.end();
--rev_map_[cs];
}
}
Conn* Serv::GetFreshConn() {
if (conn_pool_.size() > WebSocketProxy::kConnPoolLimit) {
// Connections overflow. Shut those oldest not active.
ConnPool::iterator it = conn_pool_.end();
--it;
for (int i = conn_pool_.size() - WebSocketProxy::kConnPoolLimit; i-- > 0;) {
// Shut may invalidate an iterator; hence postdecrement.
(*it--)->Shut(WS_CLOSE_GOING_AWAY,
"Flood of new connections, getting rid of old ones");
}
if (conn_pool_.size() > WebSocketProxy::kConnPoolLimit + 12) {
// Connections overflow. Zap the oldest not active.
ZapConn(conn_pool_.back());
}
}
Conn* cs = new Conn(this);
conn_pool_.push_front(cs);
rev_map_[cs] = conn_pool_.begin();
return cs;
}
bool Serv::IsConnSane(Conn* cs) {
return rev_map_.find(cs) != rev_map_.end();
}
// static
void Serv::OnConnect(int listening_sock, short event, void* ctx) {
Serv* self = static_cast<Serv*>(ctx);
Conn* cs = self->GetFreshConn();
int sock = accept(listening_sock, NULL, NULL);
if (sock < 0 || !SetNonBlock(sock)) {
// Read readiness was triggered on listening socket
// yet we failed to accept a connection; definitely weird.
NOTREACHED();
self->ZapConn(cs);
return;
}
cs->primchan().set_read_fd(sock);
cs->primchan().set_write_fd(sock);
struct bufferevent* bev = bufferevent_new(
sock,
&Conn::OnPrimchanRead, &Conn::OnPrimchanWrite, &Conn::OnPrimchanError,
cs->evkey());
if (bev == NULL) {
self->ZapConn(cs);
return;
}
cs->primchan().set_read_bev(bev);
cs->primchan().set_write_bev(bev);
bufferevent_base_set(self->evbase_, bev);
bufferevent_setwatermark(bev, EV_READ, 0, WebSocketProxy::kBufferLimit);
if (bufferevent_enable(bev, EV_READ | EV_WRITE)) {
self->ZapConn(cs);
return;
}
}
// static
void Serv::OnControlRequest(int fd, short event, void* ctx) {
Serv* self = static_cast<Serv*>(ctx);
char c;
if (1 == read(fd, &c, 1) && c == *kControlMessageNetworkChange) {
// OnNetworkChange request.
evdns_clear_nameservers_and_suspend();
evdns_init();
evdns_resume();
} else if (c == *kControlMessageShutdown) {
self->shutdown_requested_ = true;
event_base_loopbreak(self->evbase_);
}
}
Conn::Conn(Serv* master)
: master_(master),
phase_(PHASE_WAIT_HANDSHAKE),
frame_bytes_remaining_(0),
frame_mask_index_(0),
primchan_(this),
destchan_(this),
do_tls_(false),
destresolution_ipv4_failed_(false),
destresolution_ipv6_failed_(false) {
while (evkey_map_.Get().find(last_evkey_) != evkey_map_.Get().end()) {
last_evkey_ = reinterpret_cast<EventKey>(reinterpret_cast<size_t>(
last_evkey_) + 1);
}
evkey_ = last_evkey_;
evkey_map_.Get()[evkey_] = this;
// Schedule timeout for initial phase of connection.
destconnect_timeout_event_.reset(new struct event);
evtimer_set(destconnect_timeout_event_.get(),
&OnDestConnectTimeout, evkey_);
event_base_set(master_->evbase(),
destconnect_timeout_event_.get());
struct timeval tv;
tv.tv_sec = 20;
tv.tv_usec = 0;
evtimer_add(destconnect_timeout_event_.get(), &tv);
}
Conn::~Conn() {
phase_ = PHASE_DEFUNCT;
event_del(destconnect_timeout_event_.get());
if (evkey_map_.Get()[evkey_] == this)
evkey_map_.Get().erase(evkey_);
else
NOTREACHED();
}
Conn* Conn::Get(EventKey evkey) {
EventKeyMap::iterator it = evkey_map_.Get().find(evkey);
if (it == evkey_map_.Get().end())
return NULL;
Conn* cs = it->second;
if (cs == NULL ||
cs->evkey_ != evkey ||
cs->master_ == NULL ||
cs->phase_ < 0 ||
cs->phase_ > PHASE_SHUT ||
!cs->master_->IsConnSane(cs)) {
return NULL;
}
return cs;
}
void Conn::Shut(int status, const void* reason) {
if (phase_ >= PHASE_SHUT)
return;
master_->MarkConnImportance(this, false);
std::vector<uint8> payload(2 + strlen(reason));
WriteNetworkInteger(status, vector_as_array(&payload), 2);
memcpy(vector_as_array(&payload) + 2, reason, strlen(reason));
EmitFrame(
&primchan_, WS_OPCODE_CLOSE, vector_as_array(&payload), payload.size());
primchan_.Shut();
destchan_.Shut();
phase_ = PHASE_SHUT;
}
void Conn::ConsiderSuicide() {
if (!primchan_.write_pending() && !destchan_.write_pending())
master_->ZapConn(this);
}
Conn::Status Conn::ConsumeHeader(struct evbuffer* evb) {
uint8* buf = EVBUFFER_DATA(evb);
size_t buf_size = EVBUFFER_LENGTH(evb);
static const uint8 kGetPrefix[] = "GET " kProxyPath;
static const uint8 kKeyValueDelimiter[] = ": ";
if (buf_size <= 0)
return STATUS_INCOMPLETE;
if (!buf)
return STATUS_ABORT;
if (!std::equal(buf, buf + std::min(buf_size, strlen(kGetPrefix)),
kGetPrefix)) {
// Data head does not match what is expected.
return STATUS_ABORT;
}
if (buf_size >= WebSocketProxy::kHeaderLimit)
return STATUS_ABORT;
uint8* buf_end = buf + buf_size;
// Handshake request must end with double CRLF.
uint8* term_pos = std::search(buf, buf_end, kCRLFCRLF,
kCRLFCRLF + strlen(kCRLFCRLF));
if (term_pos == buf_end)
return STATUS_INCOMPLETE;
term_pos += strlen(kCRLFCRLF);
// First line is "GET path?query protocol" line. If query is empty then we
// fall back to (obsolete) way of obtaining parameters from first websocket
// frame. Otherwise query contains all required parameters (host, port etc).
uint8* get_request_end = std::search(
buf, term_pos, kCRLF, kCRLF + strlen(kCRLF));
DCHECK(get_request_end != term_pos);
uint8* resource_end = std::find(
buf + strlen(kGetPrefix), get_request_end, ' ');
if (*resource_end != ' ')
return STATUS_ABORT;
if (resource_end != buf + strlen(kGetPrefix)) {
char* piece = reinterpret_cast<char*>(buf) + strlen(kGetPrefix) + 1;
url_parse::Component query(
0, resource_end - reinterpret_cast<uint8*>(piece));
for (url_parse::Component key, value;
url_parse::ExtractQueryKeyValue(piece, &query, &key, &value);) {
if (key.len > 0) {
requested_parameters_[std::string(piece + key.begin, key.len)] =
net::UnescapeURLComponent(std::string(piece + value.begin,
value.len), net::UnescapeRule::URL_SPECIAL_CHARS);
}
}
}
for (uint8* pos = get_request_end;;) {
pos += strlen(kCRLF);
if (term_pos - pos < static_cast<ptrdiff_t>(strlen(kCRLF)))
return STATUS_ABORT;
if (term_pos - pos == static_cast<ptrdiff_t>(strlen(kCRLF)))
break;
uint8* npos = std::search(pos, term_pos, kKeyValueDelimiter,
kKeyValueDelimiter + strlen(kKeyValueDelimiter));
if (npos == term_pos)
return STATUS_ABORT;
std::string key = FetchAsciiSnippet(pos, npos, AsciiFilterLower);
pos = std::search(npos += strlen(kKeyValueDelimiter), term_pos,
kCRLF, kCRLF + strlen(kCRLF));
if (pos == term_pos)
return STATUS_ABORT;
if (!key.empty()) {
header_fields_[key] = FetchAsciiSnippet(npos, pos,
key == "sec-websocket-key" ? AsciiFilterVerbatim : AsciiFilterLower);
}
}
// Values of Upgrade and Connection fields are hardcoded in the protocol.
if (header_fields_["upgrade"] != "websocket" ||
header_fields_["connection"] != "upgrade" ||
header_fields_["sec-websocket-key"].size() != 24) {
return STATUS_ABORT;
}
if (header_fields_["sec-websocket-version"] != "8" &&
header_fields_["sec-websocket-version"] != "13") {
return STATUS_ABORT;
}
// Normalize origin (e.g. leading slash).
GURL origin = GURL(GetOrigin()).GetOrigin();
if (!origin.is_valid())
return STATUS_ABORT;
if (!requested_parameters_.empty()) {
destname_ = requested_parameters_["hostname"];
int port;
if (!base::StringToInt(requested_parameters_["port"], &port) ||
port < 0 || port >= 1 << 16) {
return STATUS_ABORT;
}
destport_ = port;
destaddr_ = requested_parameters_["addr"];
do_tls_ = (requested_parameters_["tls"] == "true");
requested_parameters_["extension_id"] =
FetchExtensionIdFromOrigin(GetOrigin());
std::string passport(requested_parameters_["passport"]);
requested_parameters_.erase("passport");
if (!chrome::InternalAuthVerification::VerifyPassport(
passport, "web_socket_proxy", requested_parameters_)) {
return STATUS_ABORT;
}
}
evbuffer_drain(evb, term_pos - buf);
return STATUS_OK;
}
bool Conn::EmitHandshake(Chan* chan) {
std::vector<std::string> boilerplate;
boilerplate.push_back("HTTP/1.1 101 WebSocket Protocol Handshake");
boilerplate.push_back("Upgrade: websocket");
boilerplate.push_back("Connection: Upgrade");
{
// Take care of Accept field.
std::string word = header_fields_["sec-websocket-key"];
word += "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
std::string accept_token;
base::Base64Encode(base::SHA1HashString(word), &accept_token);
boilerplate.push_back("Sec-WebSocket-Accept: " + accept_token);
}
boilerplate.push_back("");
for (size_t i = 0; i < boilerplate.size(); ++i) {
if (!chan->Write(boilerplate[i].c_str(), boilerplate[i].size()) ||
!chan->Write(kCRLF, strlen(kCRLF))) {
return false;
}
}
return true;
}
bool Conn::EmitFrame(
Chan* chan, WebSocketFrameOpcode opcode, const void* buf, size_t size) {
uint8 header[10];
int header_size = 2;
DCHECK(chan);
DCHECK(opcode >= 0 && opcode < 16);
header[0] = opcode | 0x80; // FIN bit set.
if (size < 126) {
header[1] = size;
} else if (size < (1 << 16)) {
header[1] = 126;
WriteNetworkInteger(size, header + 2, 2);
header_size += 2;
} else {
header[1] = 127;
WriteNetworkInteger(size, header + 2, 8);
header_size += 8;
}
return chan->Write(header, header_size) && chan->Write(buf, size);
}
Conn::Status Conn::ConsumeDestframe(struct evbuffer* evb) {
if (!requested_parameters_.empty()) {
// Parameters were already provided (and verified) in query component of
// websocket URL.
return STATUS_OK;
}
if (frame_bytes_remaining_ == 0) {
Conn::Status rv = ConsumeFrameHeader(evb);
if (rv != STATUS_OK)
return rv;
if (frame_bytes_remaining_ == 0 ||
frame_bytes_remaining_ >= WebSocketProxy::kHeaderLimit) {
return STATUS_ABORT;
}
}
uint8* buf = EVBUFFER_DATA(evb);
size_t buf_size = EVBUFFER_LENGTH(evb);
if (buf_size < frame_bytes_remaining_)
return STATUS_INCOMPLETE;
for (size_t i = 0; i < buf_size; ++i) {
buf[i] ^= frame_mask_[frame_mask_index_];
frame_mask_index_ = (frame_mask_index_ + 1) % 4;
}
std::string passport;
if (!WebSocketProxyHelper::FetchPassportAddrNamePort(
buf, buf + frame_bytes_remaining_,
&passport, &destaddr_, &destname_, &destport_)) {
return STATUS_ABORT;
}
std::map<std::string, std::string> map;
map["hostname"] = destname_;
map["port"] = base::IntToString(destport_);
map["extension_id"] = FetchExtensionIdFromOrigin(GetOrigin());
if (!destaddr_.empty())
map["addr"] = destaddr_;
if (!chrome::InternalAuthVerification::VerifyPassport(
passport, "web_socket_proxy", map)) {
return STATUS_ABORT;
}
evbuffer_drain(evb, frame_bytes_remaining_);
frame_bytes_remaining_ = 0;
return STATUS_OK;
}
Conn::Status Conn::ConsumeFrameHeader(struct evbuffer* evb) {
uint8* buf = EVBUFFER_DATA(evb);
size_t buf_size = EVBUFFER_LENGTH(evb);
size_t header_size = 2;
if (buf_size < header_size)
return STATUS_INCOMPLETE;
if (buf[0] & 0x70) {
// We are not able to handle non-zero reserved bits.
NOTIMPLEMENTED();
return STATUS_ABORT;
}
bool fin_flag = buf[0] & 0x80;
if (!fin_flag) {
NOTIMPLEMENTED();
return STATUS_ABORT;
}
int opcode = buf[0] & 0x0f;
switch (opcode) {
case WS_OPCODE_TEXT:
break;
case WS_OPCODE_CLOSE:
return STATUS_ABORT;
default:
NOTIMPLEMENTED();
return STATUS_ABORT;
}
bool mask_flag = buf[1] & 0x80;
if (!mask_flag) {
// Client-to-server frames must be masked.
return STATUS_ABORT;
}
frame_bytes_remaining_ = buf[1] & 0x7f;
int extra_size = 0;
if (frame_bytes_remaining_ == 126)
extra_size = 2;
else if (frame_bytes_remaining_ == 127)
extra_size = 8;
if (buf_size < header_size + extra_size + sizeof(frame_mask_))
return STATUS_INCOMPLETE;
if (extra_size)
frame_bytes_remaining_ = ReadNetworkInteger(buf + header_size, extra_size);
header_size += extra_size;
memcpy(frame_mask_, buf + header_size, sizeof(frame_mask_));
header_size += sizeof(frame_mask_);
frame_mask_index_ = 0;
evbuffer_drain(evb, header_size);
return STATUS_OK;
}
Conn::Status Conn::ProcessFrameData(struct evbuffer* evb) {
uint8* buf = EVBUFFER_DATA(evb);
size_t buf_size = EVBUFFER_LENGTH(evb);
DCHECK_GE(frame_bytes_remaining_, 1u);
if (frame_bytes_remaining_ < buf_size)
buf_size = frame_bytes_remaining_;
// base64 is encoded in chunks of 4 bytes.
buf_size = buf_size / 4 * 4;
if (buf_size < 1)
return STATUS_INCOMPLETE;
switch (phase_) {
case PHASE_INSIDE_FRAME_BASE64: {
for (size_t i = 0; i < buf_size; ++i) {
buf[i] ^= frame_mask_[frame_mask_index_];
frame_mask_index_ = (frame_mask_index_ + 1) % 4;
}
std::string out_bytes;
base::Base64Decode(std::string(buf, buf + buf_size), &out_bytes);
evbuffer_drain(evb, buf_size);
DCHECK(destchan_.write_bev());
if (!destchan_.Write(out_bytes.c_str(), out_bytes.size()))
return STATUS_ABORT;
break;
}
case PHASE_INSIDE_FRAME_SKIP: {
evbuffer_drain(evb, buf_size);
break;
}
default: {
return STATUS_ABORT;
}
}
frame_bytes_remaining_ -= buf_size;
return frame_bytes_remaining_ ? STATUS_INCOMPLETE : STATUS_OK;
}
bool Conn::TryConnectDest(const struct sockaddr* addr, socklen_t addrlen) {
if (destchan_.read_fd() >= 0 || destchan_.read_bev() != NULL)
return false;
if (do_tls_) {
int fd[4];
if (pipe(fd) || pipe(fd + 2))
return false;
destchan_.set_read_fd(fd[0]);
destchan_.set_write_fd(fd[3]);
for (int i = arraysize(fd); i--;) {
if (!SetNonBlock(fd[i]))
return false;
}
destchan_.set_read_bev(bufferevent_new(
destchan_.read_fd(),
&OnDestchanRead, NULL, &OnDestchanError,
evkey_));
destchan_.set_write_bev(bufferevent_new(
destchan_.write_fd(),
NULL, &OnDestchanWrite, &OnDestchanError,
evkey_));
net::IPEndPoint endpoint;
if (!endpoint.FromSockAddr(addr, addrlen))
return false;
net::AddressList addrlist(endpoint);
net::HostPortPair host_port_pair(destname_, destport_);
BrowserThread::PostTask(
BrowserThread::IO, FROM_HERE, base::Bind(
&SSLChan::Start, addrlist, host_port_pair, fd[2], fd[1]));
} else {
int sock = socket(addr->sa_family, SOCK_STREAM, 0);
if (sock < 0)
return false;
destchan_.set_read_fd(sock);
destchan_.set_write_fd(sock);
if (!SetNonBlock(sock))
return false;
if (connect(sock, addr, addrlen)) {
if (errno != EINPROGRESS)
return false;
}
destchan_.set_read_bev(bufferevent_new(
sock,
&OnDestchanRead, &OnDestchanWrite, &OnDestchanError,
evkey_));
destchan_.set_write_bev(destchan_.read_bev());
}
if (destchan_.read_bev() == NULL || destchan_.write_bev() == NULL)
return false;
if (bufferevent_base_set(master_->evbase(), destchan_.read_bev()) ||
bufferevent_base_set(master_->evbase(), destchan_.write_bev())) {
return false;
}
bufferevent_setwatermark(
destchan_.read_bev(), EV_READ, 0, WebSocketProxy::kBufferLimit);
if (bufferevent_enable(destchan_.read_bev(), EV_READ) ||
bufferevent_enable(destchan_.write_bev(), EV_WRITE)) {
return false;
}
return true;
}
const std::string& Conn::GetOrigin() {
return header_fields_[header_fields_["sec-websocket-version"] == "8" ?
"sec-websocket-origin" : "origin"];
}
// static
void Conn::OnPrimchanRead(struct bufferevent* bev, EventKey evkey) {
Conn* cs = Conn::Get(evkey);
if (bev == NULL ||
cs == NULL ||
bev != cs->primchan_.read_bev()) {
NOTREACHED();
return;
}
if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) <= 0)
return;
cs->master_->MarkConnImportance(cs, true);
for (;;) {
switch (cs->phase_) {
case PHASE_WAIT_HANDSHAKE: {
switch (cs->ConsumeHeader(EVBUFFER_INPUT(bev))) {
case STATUS_OK: {
break;
}
case STATUS_INCOMPLETE: {
return;
}
case STATUS_ABORT:
default: {
cs->master_->ZapConn(cs);
return;
}
}
// Header consumed OK. Do respond.
if (!cs->EmitHandshake(&cs->primchan_)) {
cs->master_->ZapConn(cs);
return;
}
cs->phase_ = PHASE_WAIT_DESTFRAME;
}
case PHASE_WAIT_DESTFRAME: {
switch (cs->ConsumeDestframe(EVBUFFER_INPUT(bev))) {
case STATUS_OK: {
{
// Unfortunately libevent as of 1.4 does not look into /etc/hosts.
// There seems to be no easy API to perform only "local" part of
// getaddrinfo resolution. Hence this hack for "localhost".
if (cs->destname_ == "localhost")
cs->destname_ = "127.0.0.1";
}
if (cs->destaddr_.empty())
cs->destaddr_ = cs->destname_;
{
struct sockaddr_in sa;
memset(&sa, 0, sizeof(sa));
sa.sin_port = base::HostToNet16(cs->destport_);
if (inet_pton(sa.sin_family = AF_INET,
cs->destaddr_.c_str(),
&sa.sin_addr) == 1) {
// valid IPv4 address supplied.
if (cs->TryConnectDest((struct sockaddr*)&sa, sizeof(sa))) {
cs->phase_ = PHASE_WAIT_DESTCONNECT;
return;
}
}
}
{
if (cs->destaddr_.size() >= 2 &&
cs->destaddr_[0] == '[' &&
cs->destaddr_[cs->destaddr_.size() - 1] == ']') {
// Literal IPv6 address in brackets.
cs->destaddr_ =
cs->destaddr_.substr(1, cs->destaddr_.size() - 2);
}
struct sockaddr_in6 sa;
memset(&sa, 0, sizeof(sa));
sa.sin6_port = base::HostToNet16(cs->destport_);
if (inet_pton(sa.sin6_family = AF_INET6,
cs->destaddr_.c_str(),
&sa.sin6_addr) == 1) {
// valid IPv6 address supplied.
if (cs->TryConnectDest((struct sockaddr*)&sa, sizeof(sa))) {
cs->phase_ = PHASE_WAIT_DESTCONNECT;
return;
}
}
}
// Asynchronous DNS resolution.
if (evdns_count_nameservers() < 1) {
evdns_clear_nameservers_and_suspend();
evdns_init();
evdns_resume();
}
evdns_resolve_ipv4(cs->destname_.c_str(), 0,
&OnDestResolutionIPv4, evkey);
evdns_resolve_ipv6(cs->destname_.c_str(), 0,
&OnDestResolutionIPv6, evkey);
cs->phase_ = PHASE_WAIT_DESTCONNECT;
return;
}
case STATUS_INCOMPLETE: {
return;
}
case STATUS_ABORT:
default: {
cs->Shut(WS_CLOSE_DESTINATION_ERROR,
"Incorrect destination specification in first frame");
return;
}
}
}
case PHASE_WAIT_DESTCONNECT: {
if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) >=
WebSocketProxy::kBufferLimit) {
cs->Shut(WS_CLOSE_LIMIT_VIOLATION, "Read buffer overflow");
}
return;
}
case PHASE_OUTSIDE_FRAME: {
switch (cs->ConsumeFrameHeader(EVBUFFER_INPUT(bev))) {
case STATUS_OK: {
if (cs->frame_bytes_remaining_ % 4) {
// We expect base64 encoded data (encoded in 4-bytes chunks).
cs->Shut(WS_CLOSE_UNACCEPTABLE_DATA,
"Frame payload size is not multiple of 4");
return;
}
cs->phase_ = PHASE_INSIDE_FRAME_BASE64;
// Process remaining data if any.
break;
}
case STATUS_SKIP: {
cs->phase_ = PHASE_INSIDE_FRAME_SKIP;
// Process remaining data if any.
break;
}
case STATUS_INCOMPLETE: {
return;
}
case STATUS_ABORT:
default: {
cs->Shut(WS_CLOSE_PROTOCOL_ERROR, "Invalid frame header");
return;
}
}
break;
}
case PHASE_INSIDE_FRAME_BASE64:
case PHASE_INSIDE_FRAME_SKIP: {
switch (cs->ProcessFrameData(EVBUFFER_INPUT(bev))) {
case STATUS_OK: {
cs->phase_ = PHASE_OUTSIDE_FRAME;
// Handle remaining data if any.
break;
}
case STATUS_INCOMPLETE: {
return;
}
case STATUS_ABORT:
default: {
cs->Shut(WS_CLOSE_UNACCEPTABLE_DATA, "Invalid frame data");
return;
}
}
break;
}
case PHASE_SHUT: {
evbuffer_drain(EVBUFFER_INPUT(bev),
EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)));
return;
}
case PHASE_DEFUNCT:
default: {
NOTREACHED();
cs->master_->ZapConn(cs);
return;
}
}
}
}
// static
void Conn::OnPrimchanWrite(struct bufferevent* bev, EventKey evkey) {
Conn* cs = Conn::Get(evkey);
if (bev == NULL ||
cs == NULL ||
bev != cs->primchan_.write_bev()) {
NOTREACHED();
return;
}
// Write callback is called when low watermark is reached, 0 by default.
cs->primchan_.set_write_pending(false);
if (cs->phase_ >= PHASE_SHUT) {
cs->master_->ZapConn(cs);
return;
}
if (cs->phase_ > PHASE_WAIT_DESTCONNECT)
OnDestchanRead(cs->destchan_.read_bev(), evkey);
if (cs->phase_ >= PHASE_SHUT)
cs->primchan_.Zap();
}
// static
void Conn::OnPrimchanError(struct bufferevent* bev,
short what, EventKey evkey) {
Conn* cs = Conn::Get(evkey);
if (bev == NULL ||
cs == NULL ||
(bev != cs->primchan_.read_bev() && bev != cs->primchan_.write_bev())) {
return;
}
cs->primchan_.set_write_pending(false);
if (cs->phase_ >= PHASE_SHUT)
cs->master_->ZapConn(cs);
else
cs->Shut(WS_CLOSE_NORMAL, "Error reported on websocket channel");
}
// static
void Conn::OnDestResolutionIPv4(int result, char type,
int count, int ttl,
void* addr_list, EventKey evkey) {
Conn* cs = Conn::Get(evkey);
if (cs == NULL)
return;
if (cs->phase_ != PHASE_WAIT_DESTCONNECT)
return;
if (result == DNS_ERR_NONE &&
count >= 1 &&
addr_list != NULL &&
type == DNS_IPv4_A) {
for (int i = 0; i < count; ++i) {
struct sockaddr_in sa;
memset(&sa, 0, sizeof(sa));
sa.sin_family = AF_INET;
sa.sin_port = base::HostToNet16(cs->destport_);
DCHECK(sizeof(sa.sin_addr) == sizeof(struct in_addr));
memcpy(&sa.sin_addr,
static_cast<struct in_addr*>(addr_list) + i,
sizeof(sa.sin_addr));
if (cs->TryConnectDest((struct sockaddr*)&sa, sizeof(sa)))
return;
}
}
cs->destresolution_ipv4_failed_ = true;
if (cs->destresolution_ipv4_failed_ && cs->destresolution_ipv6_failed_)
cs->Shut(WS_CLOSE_RESOLUTION_FAILED, "DNS resolution failed");
}
// static
void Conn::OnDestResolutionIPv6(int result, char type,
int count, int ttl,
void* addr_list, EventKey evkey) {
Conn* cs = Conn::Get(evkey);
if (cs == NULL)
return;
if (cs->phase_ != PHASE_WAIT_DESTCONNECT)
return;
if (result == DNS_ERR_NONE &&
count >= 1 &&
addr_list != NULL &&
type == DNS_IPv6_AAAA) {
for (int i = 0; i < count; ++i) {
struct sockaddr_in6 sa;
memset(&sa, 0, sizeof(sa));
sa.sin6_family = AF_INET6;
sa.sin6_port = base::HostToNet16(cs->destport_);
DCHECK(sizeof(sa.sin6_addr) == sizeof(struct in6_addr));
memcpy(&sa.sin6_addr,
static_cast<struct in6_addr*>(addr_list) + i,
sizeof(sa.sin6_addr));
if (cs->TryConnectDest((struct sockaddr*)&sa, sizeof(sa)))
return;
}
}
cs->destresolution_ipv6_failed_ = true;
if (cs->destresolution_ipv4_failed_ && cs->destresolution_ipv6_failed_)
cs->Shut(WS_CLOSE_RESOLUTION_FAILED, "DNS resolution failed");
}
// static
void Conn::OnDestConnectTimeout(int, short, EventKey evkey) {
Conn* cs = Conn::Get(evkey);
if (cs == NULL)
return;
if (cs->phase_ > PHASE_WAIT_DESTCONNECT)
return;
cs->Shut(WS_CLOSE_RESOLUTION_FAILED, "DNS resolution timeout");
}
// static
void Conn::OnDestchanRead(struct bufferevent* bev, EventKey evkey) {
Conn* cs = Conn::Get(evkey);
if (bev == NULL ||
cs == NULL ||
bev != cs->destchan_.read_bev()) {
NOTREACHED();
return;
}
if (EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)) <= 0)
return;
if (cs->primchan_.write_bev() == NULL) {
cs->master_->ZapConn(cs);
return;
}
cs->master_->MarkConnImportance(cs, true);
std::string out_bytes;
base::Base64Encode(
std::string(
static_cast<const char*>(static_cast<void*>(
EVBUFFER_DATA(EVBUFFER_INPUT(bev)))),
EVBUFFER_LENGTH(EVBUFFER_INPUT(bev))),
&out_bytes);
evbuffer_drain(EVBUFFER_INPUT(bev), EVBUFFER_LENGTH(EVBUFFER_INPUT(bev)));
if (!cs->EmitFrame(&cs->primchan_, WS_OPCODE_TEXT,
out_bytes.c_str(), out_bytes.size())) {
cs->Shut(WS_CLOSE_UNEXPECTED, "Failed to write websocket frame");
}
}
// static
void Conn::OnDestchanWrite(struct bufferevent* bev, EventKey evkey) {
Conn* cs = Conn::Get(evkey);
if (bev == NULL ||
cs == NULL ||
bev != cs->destchan_.write_bev()) {
NOTREACHED();
return;
}
// Write callback is called when low watermark is reached, 0 by default.
cs->destchan_.set_write_pending(false);
if (cs->phase_ == PHASE_WAIT_DESTCONNECT)
cs->phase_ = PHASE_OUTSIDE_FRAME;
if (cs->phase_ < PHASE_SHUT)
OnPrimchanRead(cs->primchan_.read_bev(), evkey);
else
cs->destchan_.Zap();
}
// static
void Conn::OnDestchanError(struct bufferevent* bev,
short what, EventKey evkey) {
Conn* cs = Conn::Get(evkey);
if (bev == NULL ||
cs == NULL ||
(bev != cs->destchan_.read_bev() && bev != cs->destchan_.write_bev())) {
return;
}
cs->destchan_.set_write_pending(false);
if (cs->phase_ >= PHASE_SHUT)
cs->master_->ZapConn(cs);
else
cs->Shut(WS_CLOSE_DESTINATION_ERROR,
"Failure reported on destination channel");
}
// static
Conn::EventKey Conn::last_evkey_ = 0;
// static
base::LazyInstance<Conn::EventKeyMap>::Leaky
Conn::evkey_map_ = LAZY_INSTANCE_INITIALIZER;
} // namespace
WebSocketProxy::WebSocketProxy() : impl_(new Serv()) {
}
WebSocketProxy::~WebSocketProxy() {
delete static_cast<Serv*>(impl_);
impl_ = NULL;
}
void WebSocketProxy::Run() {
static_cast<Serv*>(impl_)->Run();
}
void WebSocketProxy::Shutdown() {
static_cast<Serv*>(impl_)->Shutdown();
}
void WebSocketProxy::OnNetworkChange() {
static_cast<Serv*>(impl_)->OnNetworkChange();
}
} // namespace chromeos