| // Copyright 2014 Gary Burd |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"): you may |
| // not use this file except in compliance with the License. You may obtain |
| // a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, WITHOUT |
| // WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the |
| // License for the specific language governing permissions and limitations |
| // under the License. |
| |
| package redisx |
| |
| import ( |
| "errors" |
| "sync" |
| |
| "github.com/gomodule/redigo/redis" |
| ) |
| |
| // ConnMux multiplexes one or more connections to a single underlying |
| // connection. The ConnMux connections do not support concurrency, commands |
| // that associate server side state with the connection or commands that put |
| // the connection in a special mode. |
| type ConnMux struct { |
| c redis.Conn |
| |
| sendMu sync.Mutex |
| sendID uint |
| |
| recvMu sync.Mutex |
| recvID uint |
| recvWait map[uint]chan struct{} |
| } |
| |
| func NewConnMux(c redis.Conn) *ConnMux { |
| return &ConnMux{c: c, recvWait: make(map[uint]chan struct{})} |
| } |
| |
| // Get gets a connection. The application must close the returned connection. |
| func (p *ConnMux) Get() redis.Conn { |
| c := &muxConn{p: p} |
| c.ids = c.buf[:0] |
| return c |
| } |
| |
| // Close closes the underlying connection. |
| func (p *ConnMux) Close() error { |
| return p.c.Close() |
| } |
| |
| type muxConn struct { |
| p *ConnMux |
| ids []uint |
| buf [8]uint |
| } |
| |
| func (c *muxConn) send(flush bool, cmd string, args ...interface{}) error { |
| if lookupCommandInfo(cmd).notMuxable { |
| return errors.New("command not supported by mux pool") |
| } |
| p := c.p |
| p.sendMu.Lock() |
| id := p.sendID |
| c.ids = append(c.ids, id) |
| p.sendID++ |
| err := p.c.Send(cmd, args...) |
| if flush { |
| err = p.c.Flush() |
| } |
| p.sendMu.Unlock() |
| return err |
| } |
| |
| func (c *muxConn) Send(cmd string, args ...interface{}) error { |
| return c.send(false, cmd, args...) |
| } |
| |
| func (c *muxConn) Flush() error { |
| p := c.p |
| p.sendMu.Lock() |
| err := p.c.Flush() |
| p.sendMu.Unlock() |
| return err |
| } |
| |
| func (c *muxConn) Receive() (interface{}, error) { |
| if len(c.ids) == 0 { |
| return nil, errors.New("mux pool underflow") |
| } |
| |
| id := c.ids[0] |
| c.ids = c.ids[1:] |
| if len(c.ids) == 0 { |
| c.ids = c.buf[:0] |
| } |
| |
| p := c.p |
| p.recvMu.Lock() |
| if p.recvID != id { |
| ch := make(chan struct{}) |
| p.recvWait[id] = ch |
| p.recvMu.Unlock() |
| <-ch |
| p.recvMu.Lock() |
| if p.recvID != id { |
| panic("out of sync") |
| } |
| } |
| |
| v, err := p.c.Receive() |
| |
| id++ |
| p.recvID = id |
| ch, ok := p.recvWait[id] |
| if ok { |
| delete(p.recvWait, id) |
| } |
| p.recvMu.Unlock() |
| if ok { |
| ch <- struct{}{} |
| } |
| |
| return v, err |
| } |
| |
| func (c *muxConn) Close() error { |
| var err error |
| if len(c.ids) == 0 { |
| return nil |
| } |
| c.Flush() |
| for _ = range c.ids { |
| _, err = c.Receive() |
| } |
| return err |
| } |
| |
| func (c *muxConn) Do(cmd string, args ...interface{}) (interface{}, error) { |
| if err := c.send(true, cmd, args...); err != nil { |
| return nil, err |
| } |
| return c.Receive() |
| } |
| |
| func (c *muxConn) Err() error { |
| return c.p.c.Err() |
| } |