| package dbus |
| |
| import ( |
| "sync" |
| ) |
| |
| // NewSequentialSignalHandler returns an instance of a new |
| // signal handler that guarantees sequential processing of signals. It is a |
| // guarantee of this signal handler that signals will be written to |
| // channels in the order they are received on the DBus connection. |
| func NewSequentialSignalHandler() SignalHandler { |
| return &sequentialSignalHandler{} |
| } |
| |
| type sequentialSignalHandler struct { |
| mu sync.RWMutex |
| closed bool |
| signals []*sequentialSignalChannelData |
| } |
| |
| func (sh *sequentialSignalHandler) DeliverSignal(intf, name string, signal *Signal) { |
| sh.mu.RLock() |
| defer sh.mu.RUnlock() |
| if sh.closed { |
| return |
| } |
| for _, scd := range sh.signals { |
| scd.deliver(signal) |
| } |
| } |
| |
| func (sh *sequentialSignalHandler) Terminate() { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.closed { |
| return |
| } |
| |
| for _, scd := range sh.signals { |
| scd.close() |
| close(scd.ch) |
| } |
| sh.closed = true |
| sh.signals = nil |
| } |
| |
| func (sh *sequentialSignalHandler) AddSignal(ch chan<- *Signal) { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.closed { |
| return |
| } |
| sh.signals = append(sh.signals, newSequentialSignalChannelData(ch)) |
| } |
| |
| func (sh *sequentialSignalHandler) RemoveSignal(ch chan<- *Signal) { |
| sh.mu.Lock() |
| defer sh.mu.Unlock() |
| if sh.closed { |
| return |
| } |
| for i := len(sh.signals) - 1; i >= 0; i-- { |
| if ch == sh.signals[i].ch { |
| sh.signals[i].close() |
| copy(sh.signals[i:], sh.signals[i+1:]) |
| sh.signals[len(sh.signals)-1] = nil |
| sh.signals = sh.signals[:len(sh.signals)-1] |
| } |
| } |
| } |
| |
| type sequentialSignalChannelData struct { |
| ch chan<- *Signal |
| in chan *Signal |
| done chan struct{} |
| } |
| |
| func newSequentialSignalChannelData(ch chan<- *Signal) *sequentialSignalChannelData { |
| scd := &sequentialSignalChannelData{ |
| ch: ch, |
| in: make(chan *Signal), |
| done: make(chan struct{}), |
| } |
| go scd.bufferSignals() |
| return scd |
| } |
| |
| func (scd *sequentialSignalChannelData) bufferSignals() { |
| defer close(scd.done) |
| |
| // Ensure that signals are delivered to scd.ch in the same |
| // order they are received from scd.in. |
| var queue []*Signal |
| for { |
| if len(queue) == 0 { |
| signal, ok := <- scd.in |
| if !ok { |
| return |
| } |
| queue = append(queue, signal) |
| } |
| select { |
| case scd.ch <- queue[0]: |
| copy(queue, queue[1:]) |
| queue[len(queue)-1] = nil |
| queue = queue[:len(queue)-1] |
| case signal, ok := <-scd.in: |
| if !ok { |
| return |
| } |
| queue = append(queue, signal) |
| } |
| } |
| } |
| |
| func (scd *sequentialSignalChannelData) deliver(signal *Signal) { |
| scd.in <- signal |
| } |
| |
| func (scd *sequentialSignalChannelData) close() { |
| close(scd.in) |
| // Ensure that bufferSignals() has exited and won't attempt |
| // any future sends on scd.ch |
| <-scd.done |
| } |