blob: 39f5b44b5370cdf4b56c1330a0cbdec26f270571 [file] [log] [blame]
// Copyright 2017 by Dan Jacques. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package fslock
import (
"bytes"
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strconv"
"strings"
"testing"
"time"
)
func withTempDir(t *testing.T, prefix string, fn func(string)) {
wd, err := os.Getwd()
if err != nil {
t.Fatalf("failed to get working directory: %v", err)
}
tdir, err := ioutil.TempDir(wd, prefix)
if err != nil {
t.Fatalf("failed to create temporary directory in [%s]: %v", wd, err)
}
defer func() {
if err := os.RemoveAll(tdir); err != nil {
t.Logf("failed to clean up temporary directory [%s]: %v", tdir, err)
}
}()
fn(tdir)
}
// TestConcurrent tests file locking within the same process using concurrency
// (via goroutines).
//
// For this to really be effective, the test should be run with "-race", since
// it's *possible* that all of the goroutines end up cooperating in spite of a
// bug.
func TestConcurrent(t *testing.T) {
t.Parallel()
withTempDir(t, "concurrent", func(tdir string) {
value := 0
lock := filepath.Join(tdir, "lock")
const count = 1024
startC := make(chan struct{})
doneC := make(chan error, count)
// Individual test function, run per goroutine.
blocker := func() error {
time.Sleep(time.Millisecond)
return nil
}
doTest := func() error {
return WithBlocking(lock, blocker, func() error {
value++
return nil
})
}
for i := 0; i < count; i++ {
go func() {
var err error
defer func() {
doneC <- err
}()
// Wait for the start signal, then run the test.
<-startC
err = doTest()
}()
}
// Start our test.
close(startC)
// Reap errors.
errs := make([]error, 0, count)
for i := 0; i < count; i++ {
if err := <-doneC; err != nil {
errs = append(errs, err)
}
}
if len(errs) > 0 {
errList := make([]string, len(errs))
for i, err := range errs {
errList[i] = err.Error()
}
t.Fatalf("encountered %d error(s):\n%s", len(errs), strings.Join(errList, "\n"))
}
if value != count {
t.Fatalf("value doesn't match expected (%d != %d)", value, count)
}
})
}
// TestMultiProcessing tests access from multiple separate processes.
//
// The main process creates an output file, seeded with the value "0". It then
// spawns a number of subprocesses (re-executions of this test program with
// a "you're a subprocess" enviornment variable set). Each subprocess acquires
// the lock, reads the output file, increments its value by 1, and writes the
// result.
//
// To maximize contention, we spawn all of our subprocesses first, having each
// block on a signal. When each spawns, it will signal that it's ready. Then,
// the main process will signal that it should start.
//
// Success is if all of the subprocesses succeeded and the output file has the
// correct value.
func TestMultiProcessing(t *testing.T) {
t.Parallel()
getFiles := func(tdir string) (lock, out string) {
lock = filepath.Join(tdir, "lock")
out = filepath.Join(tdir, "out")
return
}
// Are we a testing process instance, or the main process?
const envSentinel = "_FSLOCK_TEST_WORKDIR"
if path := os.Getenv(envSentinel); path != "" {
// Resolve our signal files.
signalR := os.Stdin
respW := os.Stdout
lock, out := getFiles(path)
rv := testMultiProcessingSubprocess(lock, out, respW, signalR)
if _, err := respW.Write([]byte{rv}); err != nil {
// Raise an error in the parent process on Wait().
fmt.Printf("failed to write result (%d): %v\n", rv, err)
os.Exit(1)
}
os.Exit(0)
return
}
// This pipe will be used to signal that the processes should start the test.
signalR, signalW, err := os.Pipe()
if err != nil {
t.Fatalf("failed to create signal IPC pipe: %v", err)
}
defer signalR.Close()
defer signalW.Close()
respR, respW, err := os.Pipe()
if err != nil {
t.Fatalf("failed to create response IPC pipe: %v", err)
}
defer respR.Close()
defer respW.Close()
withTempDir(t, "multiprocessing", func(tdir string) {
// Seed our initial file.
_, out := getFiles(tdir)
if err := ioutil.WriteFile(out, []byte("0"), 0664); err != nil {
t.Fatalf("failed to write initial output file value: %v", err)
}
t.Logf("wrote initial output file to [%s]", out)
// TODO: Replace with os.Executable for Go 1.8.
executable := os.Args[0]
const count = 256
cmds := make([]*exec.Cmd, count)
// Kill all of our processes on cleanup, regardless of success/failure.
defer func() {
for _, cmd := range cmds {
_ = cmd.Process.Kill()
_ = cmd.Wait()
}
}()
for i := range cmds {
cmd := exec.Command(executable, "-test.run", "^TestMultiProcessing$")
cmd.Env = append(os.Environ(), fmt.Sprintf("%s=%s", envSentinel, tdir))
cmd.Stdin = signalR
cmd.Stdout = respW
cmd.Stderr = os.Stderr
if err := cmd.Start(); err != nil {
t.Fatalf("failed to start subprocess: %v", err)
}
cmds[i] = cmd
}
// Close our child-side pipe ends.
signalR.Close()
respW.Close()
// Wait for all of thr processes to signal that they're ready.
for i := 0; i < count; i++ {
buf := []byte{0}
switch n, err := respR.Read(buf[:]); {
case err != nil:
t.Fatalf("failed to read ready signal: %v", err)
case n != 1:
t.Fatal("failed to read ready signal byte")
}
}
// Signal our subprocesses to start!
if err := signalW.Close(); err != nil {
t.Fatalf("failed to signal processes to start: %v", err)
}
// Consume our responses. Each subprocess will write one byte to "respW"
// when they finish. That byte will be zero for success, non-zero for
// failure.
failures := 0
for i := 0; i < count; i++ {
buf := []byte{0}
switch n, err := respR.Read(buf[:]); {
case err != nil:
t.Fatalf("failed to read response: %v", err)
case n != 1:
t.Fatal("failed to read response byte")
default:
if buf[0] != 0 {
failures++
}
}
}
// Wait for our processes to actually exit.
for _, cmd := range cmds {
if err := cmd.Wait(); err != nil {
t.Fatalf("failed to wait for process: %v", err)
}
}
// Report the failure.
if failures > 0 {
t.Fatalf("subprocesses reported %d failure(s)", failures)
}
// Our "out" file should be "count".
buf, err := ioutil.ReadFile(out)
if err != nil {
t.Fatalf("failed to read output file: %v", err)
}
if exp := strconv.Itoa(count); string(buf) != exp {
t.Fatalf("output file doesn't match expected (%s != %s)", buf, exp)
}
})
}
func testMultiProcessingSubprocess(lock, out string, respW io.Writer, signalR io.Reader) byte {
// Signal that we're ready to start.
if _, err := respW.Write([]byte{0}); err != nil {
fmt.Fprintf(os.Stderr, "failed to send ready signal: %v", err)
return 1
}
// Wait for our signal (signalR closing).
if _, err := ioutil.ReadAll(signalR); err != nil {
fmt.Fprintf(os.Stderr, "failed to wait for signal: %v", err)
return 2
}
blocker := func() error {
time.Sleep(time.Millisecond)
return nil
}
var rc byte = 255
err := WithBlocking(lock, blocker, func() error {
// We hold the lock. Update our "out" file value by reading/writing a new
// number.
d, err := ioutil.ReadFile(out)
if err != nil {
rc = 4
return fmt.Errorf("failed to read output file: %v\n", err)
}
v, err := strconv.Atoi(string(d))
if err != nil {
rc = 5
return fmt.Errorf("invalid number value (%s): %v\n", d, err)
}
if err := ioutil.WriteFile(out, []byte(strconv.Itoa(v+1)), 0664); err != nil {
rc = 6
return fmt.Errorf("failed to write updated value: %v\n", err)
}
return nil
})
if err != nil {
fmt.Fprintln(os.Stderr, err.Error())
return rc
}
return 0
}
// TestBlockingAndContent tests L's Block and Content fields.
//
// It does this by creating one lock goroutine, writing Content to it, then
func TestBlockingAndContent(t *testing.T) {
t.Parallel()
withTempDir(t, "content", func(tdir string) {
lock := filepath.Join(tdir, "lock")
heldC := make(chan struct{})
blockedC := make(chan struct{})
errC := make(chan error)
// Blocking goroutine: test blocking, try and write content, should not
// write because first has already done it.
go func(blockedC chan<- struct{}) {
// Wait for the first to signal that it has the lock.
<-heldC
l := L{
Path: lock,
Content: []byte("Second"),
Block: func() error {
// Notify that we've tried and failed to acquire the lock.
if blockedC != nil {
close(blockedC)
blockedC = nil
}
time.Sleep(time.Millisecond)
return nil
},
}
errC <- l.With(func() error { return nil })
}(blockedC)
// Acquire lock, write content.
const expected = "First"
l := L{
Path: lock,
Content: []byte(expected),
}
err := l.With(func() error {
// Signal that we're holding the lock.
close(heldC)
// Wait for our other goroutine to signal that it has tried and failed
// to acquire the lock.
<-blockedC
// Release the lock.
return nil
})
if err != nil {
t.Fatalf("failed to create lock: %v", err)
}
// Wait for our blocker goroutine to finish.
if err := <-errC; err != nil {
t.Fatalf("goroutine error: %v", err)
}
// Confirm that the content is written, and that it is the first
// goroutine's content.
content, err := ioutil.ReadFile(lock)
if err != nil {
t.Fatalf("failed to read content: %v", err)
}
if !bytes.Equal(content, []byte(expected)) {
t.Fatalf("content does not match expected (%s != %s)", content, expected)
}
})
}