Handle mutex patch from gsamfira.
diff --git a/npipe_windows.go b/npipe_windows.go
index 48287e6..be08a78 100755
--- a/npipe_windows.go
+++ b/npipe_windows.go
@@ -64,6 +64,8 @@
error_invalid_name syscall.Errno = 0x7B
error_io_incomplete syscall.Errno = 0x3e4
+
+ error_broken_pipe syscall.Errno = 0x6D
)
var _ net.Conn = (*PipeConn)(nil)
@@ -225,8 +227,13 @@
// address. The address must be of the form \\.\pipe\<name>
//
// Listen will return a PipeError for an incorrectly formatted pipe name.
+var mtx = sync.Mutex{}
+
func Listen(address string) (*PipeListener, error) {
handle, err := createPipe(address, true)
+ mtx.Lock()
+ handles[int(handle)] = handle
+ mtx.Unlock()
if err == error_invalid_name {
return nil, badAddr(address)
}
@@ -234,17 +241,19 @@
return nil, err
}
return &PipeListener{
- addr: PipeAddr(address),
- handle: handle,
+ addr: PipeAddr(address),
+ handle: handle,
+ myHandle: handle,
}, nil
}
// PipeListener is a named pipe listener. Clients should typically
// use variables of type net.Listener instead of assuming named pipe.
type PipeListener struct {
- addr PipeAddr
- handle syscall.Handle
- closed bool
+ addr PipeAddr
+ handle syscall.Handle
+ myHandle syscall.Handle
+ closed bool
// acceptHandle contains the current handle waiting for
// an incoming connection or nil.
@@ -259,17 +268,25 @@
// Accept implements the Accept method in the net.Listener interface; it
// waits for the next call and returns a generic net.Conn.
func (l *PipeListener) Accept() (net.Conn, error) {
+ if l.closed == true {
+ return nil, ErrClosed
+ }
c, err := l.AcceptPipe()
for err == error_no_data {
// Ignore clients that connect and immediately disconnect.
c, err = l.AcceptPipe()
}
+ if err == error_broken_pipe {
+ return nil, ErrClosed
+ }
if err != nil {
return nil, err
}
return c, nil
}
+var cli = map[int]*clients{}
+
// AcceptPipe accepts the next incoming call and returns the new connection.
// It might return an error if a client connected and immediately cancelled
// the connection.
@@ -286,6 +303,9 @@
if handle == 0 {
var err error
handle, err = createPipe(string(l.addr), false)
+ mtx.Lock()
+ handles[int(handle)] = handle
+ mtx.Unlock()
if err != nil {
return nil, err
}
@@ -297,7 +317,21 @@
if err != nil {
return nil, err
}
- defer syscall.CloseHandle(overlapped.HEvent)
+
+ mtx.Lock()
+ cli[int(handle)] = &clients{
+ handle: handle,
+ overlapped: overlapped,
+ }
+ mtx.Unlock()
+ defer func() {
+ mtx.Lock()
+ if v, ok := cli[int(handle)]; ok {
+ v.overlapped = nil
+ }
+ syscall.CloseHandle(overlapped.HEvent)
+ mtx.Unlock()
+ }()
if err := connectNamedPipe(handle, overlapped); err != nil && err != error_pipe_connected {
if err == error_io_incomplete || err == syscall.ERROR_IO_PENDING {
l.acceptMutex.Lock()
@@ -310,7 +344,6 @@
l.acceptHandle = 0
l.acceptMutex.Unlock()
}()
-
_, err = waitForCompletion(handle, overlapped)
}
if err == syscall.ERROR_OPERATION_ABORTED {
@@ -322,46 +355,105 @@
return nil, err
}
}
- return &PipeConn{handle: handle, addr: l.addr}, nil
+ return &PipeConn{handle: handle, overlapped: overlapped, addr: l.addr}, nil
+}
+
+type clients struct {
+ handle syscall.Handle
+ overlapped *syscall.Overlapped
}
// Close stops listening on the address.
// Already Accepted connections are not closed.
func (l *PipeListener) Close() error {
+ mtx.Lock()
+ defer mtx.Unlock()
if l.closed {
return nil
}
+ for k, v := range cli {
+ if k == int(l.myHandle) {
+ continue
+ }
+ if v.overlapped != nil && v.handle != 0 {
+ // Cancel the pending IO. This call does not block, so it is safe
+ // to hold onto the mutex above.
+ if err := cancelIoEx(v.handle, v.overlapped); err != nil {
+ return err
+ }
+ if v.overlapped != nil {
+ err := syscall.CloseHandle(v.overlapped.HEvent)
+ if err != nil {
+ return err
+ }
+ v.overlapped.HEvent = 0
+ }
+ err := syscall.CloseHandle(v.handle)
+ if err != nil {
+ return err
+ }
+ v.handle = 0
+ }
+ }
+ /*
+ for k, v := range handles {
+ if k == int(l.handle) || k == int(l.acceptHandle) || k == int(l.myHandle) {
+ continue
+ }
+ if v != 0 {
+ err := syscall.CloseHandle(v)
+ if err != nil {
+ return err
+ }
+ }
+ delete(handles, k)
+ }
+ */
l.closed = true
- if l.handle != 0 {
- err := disconnectNamedPipe(l.handle)
+ if l.myHandle != 0 {
+ err := disconnectNamedPipe(l.myHandle)
if err != nil {
return err
}
- err = syscall.CloseHandle(l.handle)
+ err = syscall.CloseHandle(l.myHandle)
if err != nil {
return err
}
- l.handle = 0
+ l.myHandle = 0
}
- l.acceptMutex.Lock()
- defer l.acceptMutex.Unlock()
- if l.acceptOverlapped != nil && l.acceptHandle != 0 {
- // Cancel the pending IO. This call does not block, so it is safe
- // to hold onto the mutex above.
- if err := cancelIoEx(l.acceptHandle, l.acceptOverlapped); err != nil {
- return err
+ /*
+ if l.handle != 0 {
+ err := disconnectNamedPipe(l.handle)
+ if err != nil {
+ return err
+ }
+ err = syscall.CloseHandle(l.handle)
+ if err != nil {
+ return err
+ }
+ l.handle = 0
}
- err := syscall.CloseHandle(l.acceptOverlapped.HEvent)
- if err != nil {
- return err
+ l.acceptMutex.Lock()
+ defer l.acceptMutex.Unlock()
+ if l.acceptOverlapped != nil && l.acceptHandle != 0 {
+ // Cancel the pending IO. This call does not block, so it is safe
+ // to hold onto the mutex above.
+ if err := cancelIoEx(l.acceptHandle, l.acceptOverlapped); err != nil {
+ return err
+ }
+ err := syscall.CloseHandle(l.acceptOverlapped.HEvent)
+ if err != nil {
+ return err
+ }
+ l.acceptOverlapped.HEvent = 0
+ err = syscall.CloseHandle(l.acceptHandle)
+ delete(handles, int(l.acceptHandle))
+ if err != nil {
+ return err
+ }
+ l.acceptHandle = 0
}
- l.acceptOverlapped.HEvent = 0
- err = syscall.CloseHandle(l.acceptHandle)
- if err != nil {
- return err
- }
- l.acceptHandle = 0
- }
+ */
return nil
}
@@ -370,8 +462,9 @@
// PipeConn is the implementation of the net.Conn interface for named pipe connections.
type PipeConn struct {
- handle syscall.Handle
- addr PipeAddr
+ handle syscall.Handle
+ overlapped *syscall.Overlapped
+ addr PipeAddr
// these aren't actually used yet
readDeadline *time.Time
@@ -443,7 +536,17 @@
// Close closes the connection.
func (c *PipeConn) Close() error {
- return syscall.CloseHandle(c.handle)
+ /*
+ if c.overlapped != nil && c.overlapped.HEvent != 0 {
+ err := syscall.CloseHandle(c.overlapped.HEvent)
+ if err != nil {
+ return err
+ }
+ }
+ */
+ err := syscall.CloseHandle(c.handle)
+ //delete(handles, int(c.handle))
+ return err
}
// LocalAddr returns the local network address.
@@ -490,6 +593,8 @@
return string(a)
}
+var handles = map[int]syscall.Handle{}
+
// createPipe is a helper function to make sure we always create pipes
// with the same arguments, since subsequent calls to create pipe need
// to use the same arguments as the first one. If first is set, fail
diff --git a/npipe_windows_test.go b/npipe_windows_test.go
index 8b3b25a..65c05b3 100755
--- a/npipe_windows_test.go
+++ b/npipe_windows_test.go
@@ -434,11 +434,13 @@
defer func() {
ln.Close()
<-waitExit
+
}()
server := rpc.NewServer()
service := &RPCService{}
server.Register(service)
go server.Accept(ln)
+
go func() {
for {
conn, err := ln.Accept()
@@ -453,6 +455,7 @@
}
waitExit <- true
}()
+
var conn *PipeConn
conn, err = Dial(address)
if err != nil {