| // Copyright 2017 The Gorilla WebSocket Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package websocket |
| |
| import ( |
| "io" |
| "io/ioutil" |
| "sync/atomic" |
| "testing" |
| ) |
| |
| // broadcastBench allows to run broadcast benchmarks. |
| // In every broadcast benchmark we create many connections, then send the same |
| // message into every connection and wait for all writes complete. This emulates |
| // an application where many connections listen to the same data - i.e. PUB/SUB |
| // scenarios with many subscribers in one channel. |
| type broadcastBench struct { |
| w io.Writer |
| message *broadcastMessage |
| closeCh chan struct{} |
| doneCh chan struct{} |
| count int32 |
| conns []*broadcastConn |
| compression bool |
| usePrepared bool |
| } |
| |
| type broadcastMessage struct { |
| payload []byte |
| prepared *PreparedMessage |
| } |
| |
| type broadcastConn struct { |
| conn *Conn |
| msgCh chan *broadcastMessage |
| } |
| |
| func newBroadcastConn(c *Conn) *broadcastConn { |
| return &broadcastConn{ |
| conn: c, |
| msgCh: make(chan *broadcastMessage, 1), |
| } |
| } |
| |
| func newBroadcastBench(usePrepared, compression bool) *broadcastBench { |
| bench := &broadcastBench{ |
| w: ioutil.Discard, |
| doneCh: make(chan struct{}), |
| closeCh: make(chan struct{}), |
| usePrepared: usePrepared, |
| compression: compression, |
| } |
| msg := &broadcastMessage{ |
| payload: textMessages(1)[0], |
| } |
| if usePrepared { |
| pm, _ := NewPreparedMessage(TextMessage, msg.payload) |
| msg.prepared = pm |
| } |
| bench.message = msg |
| bench.makeConns(10000) |
| return bench |
| } |
| |
| func (b *broadcastBench) makeConns(numConns int) { |
| conns := make([]*broadcastConn, numConns) |
| |
| for i := 0; i < numConns; i++ { |
| c := newTestConn(nil, b.w, true) |
| if b.compression { |
| c.enableWriteCompression = true |
| c.newCompressionWriter = compressNoContextTakeover |
| } |
| conns[i] = newBroadcastConn(c) |
| go func(c *broadcastConn) { |
| for { |
| select { |
| case msg := <-c.msgCh: |
| if b.usePrepared { |
| c.conn.WritePreparedMessage(msg.prepared) |
| } else { |
| c.conn.WriteMessage(TextMessage, msg.payload) |
| } |
| val := atomic.AddInt32(&b.count, 1) |
| if val%int32(numConns) == 0 { |
| b.doneCh <- struct{}{} |
| } |
| case <-b.closeCh: |
| return |
| } |
| } |
| }(conns[i]) |
| } |
| b.conns = conns |
| } |
| |
| func (b *broadcastBench) close() { |
| close(b.closeCh) |
| } |
| |
| func (b *broadcastBench) runOnce() { |
| for _, c := range b.conns { |
| c.msgCh <- b.message |
| } |
| <-b.doneCh |
| } |
| |
| func BenchmarkBroadcast(b *testing.B) { |
| benchmarks := []struct { |
| name string |
| usePrepared bool |
| compression bool |
| }{ |
| {"NoCompression", false, false}, |
| {"WithCompression", false, true}, |
| {"NoCompressionPrepared", true, false}, |
| {"WithCompressionPrepared", true, true}, |
| } |
| for _, bm := range benchmarks { |
| b.Run(bm.name, func(b *testing.B) { |
| bench := newBroadcastBench(bm.usePrepared, bm.compression) |
| defer bench.close() |
| b.ResetTimer() |
| for i := 0; i < b.N; i++ { |
| bench.runOnce() |
| } |
| b.ReportAllocs() |
| }) |
| } |
| } |