| // Copyright 2019 The gVisor Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package pipe |
| |
| import ( |
| "io" |
| "math" |
| "syscall" |
| |
| "gvisor.dev/gvisor/pkg/abi/linux" |
| "gvisor.dev/gvisor/pkg/amutex" |
| "gvisor.dev/gvisor/pkg/buffer" |
| "gvisor.dev/gvisor/pkg/context" |
| "gvisor.dev/gvisor/pkg/marshal/primitive" |
| "gvisor.dev/gvisor/pkg/sentry/arch" |
| "gvisor.dev/gvisor/pkg/sync" |
| "gvisor.dev/gvisor/pkg/usermem" |
| "gvisor.dev/gvisor/pkg/waiter" |
| ) |
| |
| // This file contains Pipe file functionality that is tied to neither VFS nor |
| // the old fs architecture. |
| |
| // Release cleans up the pipe's state. |
| func (p *Pipe) Release(context.Context) { |
| p.rClose() |
| p.wClose() |
| |
| // Wake up readers and writers. |
| p.Notify(waiter.EventIn | waiter.EventOut) |
| } |
| |
| // Read reads from the Pipe into dst. |
| func (p *Pipe) Read(ctx context.Context, dst usermem.IOSequence) (int64, error) { |
| n, err := p.read(ctx, readOps{ |
| left: func() int64 { |
| return dst.NumBytes() |
| }, |
| limit: func(l int64) { |
| dst = dst.TakeFirst64(l) |
| }, |
| read: func(view *buffer.View) (int64, error) { |
| n, err := dst.CopyOutFrom(ctx, view) |
| dst = dst.DropFirst64(n) |
| view.TrimFront(n) |
| return n, err |
| }, |
| }) |
| if n > 0 { |
| p.Notify(waiter.EventOut) |
| } |
| return n, err |
| } |
| |
| // WriteTo writes to w from the Pipe. |
| func (p *Pipe) WriteTo(ctx context.Context, w io.Writer, count int64, dup bool) (int64, error) { |
| ops := readOps{ |
| left: func() int64 { |
| return count |
| }, |
| limit: func(l int64) { |
| count = l |
| }, |
| read: func(view *buffer.View) (int64, error) { |
| n, err := view.ReadToWriter(w, count) |
| if !dup { |
| view.TrimFront(n) |
| } |
| count -= n |
| return n, err |
| }, |
| } |
| n, err := p.read(ctx, ops) |
| if n > 0 { |
| p.Notify(waiter.EventOut) |
| } |
| return n, err |
| } |
| |
| // Write writes to the Pipe from src. |
| func (p *Pipe) Write(ctx context.Context, src usermem.IOSequence) (int64, error) { |
| n, err := p.write(ctx, writeOps{ |
| left: func() int64 { |
| return src.NumBytes() |
| }, |
| limit: func(l int64) { |
| src = src.TakeFirst64(l) |
| }, |
| write: func(view *buffer.View) (int64, error) { |
| n, err := src.CopyInTo(ctx, view) |
| src = src.DropFirst64(n) |
| return n, err |
| }, |
| }) |
| if n > 0 { |
| p.Notify(waiter.EventIn) |
| } |
| return n, err |
| } |
| |
| // ReadFrom reads from r to the Pipe. |
| func (p *Pipe) ReadFrom(ctx context.Context, r io.Reader, count int64) (int64, error) { |
| n, err := p.write(ctx, writeOps{ |
| left: func() int64 { |
| return count |
| }, |
| limit: func(l int64) { |
| count = l |
| }, |
| write: func(view *buffer.View) (int64, error) { |
| n, err := view.WriteFromReader(r, count) |
| count -= n |
| return n, err |
| }, |
| }) |
| if n > 0 { |
| p.Notify(waiter.EventIn) |
| } |
| return n, err |
| } |
| |
| // Readiness returns the ready events in the underlying pipe. |
| func (p *Pipe) Readiness(mask waiter.EventMask) waiter.EventMask { |
| return p.rwReadiness() & mask |
| } |
| |
| // Ioctl implements ioctls on the Pipe. |
| func (p *Pipe) Ioctl(ctx context.Context, io usermem.IO, args arch.SyscallArguments) (uintptr, error) { |
| // Switch on ioctl request. |
| switch int(args[1].Int()) { |
| case linux.FIONREAD: |
| v := p.queued() |
| if v > math.MaxInt32 { |
| v = math.MaxInt32 // Silently truncate. |
| } |
| // Copy result to userspace. |
| iocc := primitive.IOCopyContext{ |
| IO: io, |
| Ctx: ctx, |
| Opts: usermem.IOOpts{ |
| AddressSpaceActive: true, |
| }, |
| } |
| _, err := primitive.CopyInt32Out(&iocc, args[2].Pointer(), int32(v)) |
| return 0, err |
| default: |
| return 0, syscall.ENOTTY |
| } |
| } |
| |
| // waitFor blocks until the underlying pipe has at least one reader/writer is |
| // announced via 'wakeupChan', or until 'sleeper' is cancelled. Any call to this |
| // function will block for either readers or writers, depending on where |
| // 'wakeupChan' points. |
| // |
| // mu must be held by the caller. waitFor returns with mu held, but it will |
| // drop mu before blocking for any reader/writers. |
| func waitFor(mu *sync.Mutex, wakeupChan *chan struct{}, sleeper amutex.Sleeper) bool { |
| // Ideally this function would simply use a condition variable. However, the |
| // wait needs to be interruptible via 'sleeper', so we must sychronize via a |
| // channel. The synchronization below relies on the fact that closing a |
| // channel unblocks all receives on the channel. |
| |
| // Does an appropriate wakeup channel already exist? If not, create a new |
| // one. This is all done under f.mu to avoid races. |
| if *wakeupChan == nil { |
| *wakeupChan = make(chan struct{}) |
| } |
| |
| // Grab a local reference to the wakeup channel since it may disappear as |
| // soon as we drop f.mu. |
| wakeup := *wakeupChan |
| |
| // Drop the lock and prepare to sleep. |
| mu.Unlock() |
| cancel := sleeper.SleepStart() |
| |
| // Wait for either a new reader/write to be signalled via 'wakeup', or |
| // for the sleep to be cancelled. |
| select { |
| case <-wakeup: |
| sleeper.SleepFinish(true) |
| case <-cancel: |
| sleeper.SleepFinish(false) |
| } |
| |
| // Take the lock and check if we were woken. If we were woken and |
| // interrupted, the former takes priority. |
| mu.Lock() |
| select { |
| case <-wakeup: |
| return true |
| default: |
| return false |
| } |
| } |
| |
| // newHandleLocked signals a new pipe reader or writer depending on where |
| // 'wakeupChan' points. This unblocks any corresponding reader or writer |
| // waiting for the other end of the channel to be opened, see Fifo.waitFor. |
| // |
| // Precondition: the mutex protecting wakeupChan must be held. |
| func newHandleLocked(wakeupChan *chan struct{}) { |
| if *wakeupChan != nil { |
| close(*wakeupChan) |
| *wakeupChan = nil |
| } |
| } |