| package execution |
| |
| import ( |
| "bytes" |
| "fmt" |
| "io" |
| "io/ioutil" |
| "os" |
| "path/filepath" |
| "sync" |
| |
| "github.com/boltdb/bolt" |
| api "github.com/containerd/containerd/api/services/execution" |
| "github.com/containerd/containerd/api/types/descriptor" |
| "github.com/containerd/containerd/api/types/task" |
| "github.com/containerd/containerd/archive" |
| "github.com/containerd/containerd/containers" |
| "github.com/containerd/containerd/content" |
| "github.com/containerd/containerd/images" |
| "github.com/containerd/containerd/log" |
| "github.com/containerd/containerd/mount" |
| "github.com/containerd/containerd/plugin" |
| protobuf "github.com/gogo/protobuf/types" |
| google_protobuf "github.com/golang/protobuf/ptypes/empty" |
| specs "github.com/opencontainers/image-spec/specs-go" |
| "golang.org/x/net/context" |
| "google.golang.org/grpc" |
| "google.golang.org/grpc/codes" |
| ) |
| |
| var ( |
| _ = (api.TasksServer)(&Service{}) |
| empty = &google_protobuf.Empty{} |
| ) |
| |
| func init() { |
| plugin.Register("tasks-grpc", &plugin.Registration{ |
| Type: plugin.GRPCPlugin, |
| Init: New, |
| }) |
| } |
| |
| func New(ic *plugin.InitContext) (interface{}, error) { |
| c, err := newCollector(ic.Context, ic.Runtimes) |
| if err != nil { |
| return nil, err |
| } |
| return &Service{ |
| runtimes: ic.Runtimes, |
| tasks: make(map[string]plugin.Task), |
| db: ic.Meta, |
| collector: c, |
| store: ic.Content, |
| }, nil |
| } |
| |
| type Service struct { |
| mu sync.Mutex |
| |
| runtimes map[string]plugin.Runtime |
| tasks map[string]plugin.Task |
| db *bolt.DB |
| collector *collector |
| store content.Store |
| } |
| |
| func (s *Service) Register(server *grpc.Server) error { |
| api.RegisterTasksServer(server, s) |
| // load all tasks |
| for _, r := range s.runtimes { |
| tasks, err := r.Tasks(context.Background()) |
| if err != nil { |
| return err |
| } |
| for _, c := range tasks { |
| s.tasks[c.Info().ContainerID] = c |
| } |
| } |
| return nil |
| } |
| |
| func (s *Service) Create(ctx context.Context, r *api.CreateRequest) (*api.CreateResponse, error) { |
| var ( |
| checkpointPath string |
| err error |
| ) |
| if r.Checkpoint != nil { |
| checkpointPath, err = ioutil.TempDir("", "ctrd-checkpoint") |
| if err != nil { |
| return nil, err |
| } |
| if r.Checkpoint.MediaType != images.MediaTypeContainerd1Checkpoint { |
| return nil, fmt.Errorf("unsupported checkpoint type %q", r.Checkpoint.MediaType) |
| } |
| reader, err := s.store.Reader(ctx, r.Checkpoint.Digest) |
| if err != nil { |
| return nil, err |
| } |
| _, err = archive.Apply(ctx, checkpointPath, reader) |
| reader.Close() |
| if err != nil { |
| return nil, err |
| } |
| } |
| |
| var container containers.Container |
| if err := s.db.View(func(tx *bolt.Tx) error { |
| store := containers.NewStore(tx) |
| var err error |
| container, err = store.Get(ctx, r.ContainerID) |
| return err |
| }); err != nil { |
| switch { |
| case containers.IsNotFound(err): |
| return nil, grpc.Errorf(codes.NotFound, "container %v not found", r.ContainerID) |
| case containers.IsExists(err): |
| return nil, grpc.Errorf(codes.AlreadyExists, "container %v already exists", r.ContainerID) |
| } |
| |
| return nil, err |
| } |
| |
| opts := plugin.CreateOpts{ |
| Spec: container.Spec, |
| IO: plugin.IO{ |
| Stdin: r.Stdin, |
| Stdout: r.Stdout, |
| Stderr: r.Stderr, |
| Terminal: r.Terminal, |
| }, |
| Checkpoint: checkpointPath, |
| } |
| for _, m := range r.Rootfs { |
| opts.Rootfs = append(opts.Rootfs, mount.Mount{ |
| Type: m.Type, |
| Source: m.Source, |
| Options: m.Options, |
| }) |
| } |
| runtime, err := s.getRuntime(container.Runtime) |
| if err != nil { |
| return nil, err |
| } |
| s.mu.Lock() |
| if _, ok := s.tasks[r.ContainerID]; ok { |
| s.mu.Unlock() |
| return nil, grpc.Errorf(codes.AlreadyExists, "task %v already exists", r.ContainerID) |
| } |
| c, err := runtime.Create(ctx, r.ContainerID, opts) |
| if err != nil { |
| s.mu.Unlock() |
| return nil, err |
| } |
| s.tasks[r.ContainerID] = c |
| s.mu.Unlock() |
| state, err := c.State(ctx) |
| if err != nil { |
| s.mu.Lock() |
| delete(s.tasks, r.ContainerID) |
| runtime.Delete(ctx, c) |
| s.mu.Unlock() |
| return nil, err |
| } |
| return &api.CreateResponse{ |
| ContainerID: r.ContainerID, |
| Pid: state.Pid(), |
| }, nil |
| } |
| |
| func (s *Service) Start(ctx context.Context, r *api.StartRequest) (*google_protobuf.Empty, error) { |
| c, err := s.getTask(r.ContainerID) |
| if err != nil { |
| return nil, err |
| } |
| if err := c.Start(ctx); err != nil { |
| return nil, err |
| } |
| return empty, nil |
| } |
| |
| func (s *Service) Delete(ctx context.Context, r *api.DeleteRequest) (*api.DeleteResponse, error) { |
| c, err := s.getTask(r.ContainerID) |
| if err != nil { |
| return nil, err |
| } |
| runtime, err := s.getRuntime(c.Info().Runtime) |
| if err != nil { |
| return nil, err |
| } |
| exit, err := runtime.Delete(ctx, c) |
| if err != nil { |
| return nil, err |
| } |
| |
| delete(s.tasks, r.ContainerID) |
| |
| return &api.DeleteResponse{ |
| ExitStatus: exit.Status, |
| ExitedAt: exit.Timestamp, |
| }, nil |
| } |
| |
| func taskFromContainerd(ctx context.Context, c plugin.Task) (*task.Task, error) { |
| state, err := c.State(ctx) |
| if err != nil { |
| return nil, err |
| } |
| |
| var status task.Status |
| switch state.Status() { |
| case plugin.CreatedStatus: |
| status = task.StatusCreated |
| case plugin.RunningStatus: |
| status = task.StatusRunning |
| case plugin.StoppedStatus: |
| status = task.StatusStopped |
| case plugin.PausedStatus: |
| status = task.StatusPaused |
| default: |
| log.G(ctx).WithField("status", state.Status()).Warn("unknown status") |
| } |
| return &task.Task{ |
| ID: c.Info().ID, |
| ContainerID: c.Info().ContainerID, |
| Pid: state.Pid(), |
| Status: status, |
| Spec: &protobuf.Any{ |
| TypeUrl: specs.Version, |
| Value: c.Info().Spec, |
| }, |
| }, nil |
| } |
| |
| func (s *Service) Info(ctx context.Context, r *api.InfoRequest) (*api.InfoResponse, error) { |
| task, err := s.getTask(r.ContainerID) |
| if err != nil { |
| return nil, err |
| } |
| t, err := taskFromContainerd(ctx, task) |
| if err != nil { |
| return nil, err |
| } |
| return &api.InfoResponse{ |
| Task: t, |
| }, nil |
| } |
| |
| func (s *Service) List(ctx context.Context, r *api.ListRequest) (*api.ListResponse, error) { |
| resp := &api.ListResponse{} |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| for _, cd := range s.tasks { |
| c, err := taskFromContainerd(ctx, cd) |
| if err != nil { |
| return nil, err |
| } |
| resp.Tasks = append(resp.Tasks, c) |
| } |
| return resp, nil |
| } |
| |
| func (s *Service) Pause(ctx context.Context, r *api.PauseRequest) (*google_protobuf.Empty, error) { |
| c, err := s.getTask(r.ContainerID) |
| if err != nil { |
| return nil, err |
| } |
| err = c.Pause(ctx) |
| if err != nil { |
| return nil, err |
| } |
| return empty, nil |
| } |
| |
| func (s *Service) Resume(ctx context.Context, r *api.ResumeRequest) (*google_protobuf.Empty, error) { |
| c, err := s.getTask(r.ContainerID) |
| if err != nil { |
| return nil, err |
| } |
| err = c.Resume(ctx) |
| if err != nil { |
| return nil, err |
| } |
| return empty, nil |
| } |
| |
| func (s *Service) Kill(ctx context.Context, r *api.KillRequest) (*google_protobuf.Empty, error) { |
| c, err := s.getTask(r.ContainerID) |
| if err != nil { |
| return nil, err |
| } |
| |
| switch v := r.PidOrAll.(type) { |
| case *api.KillRequest_All: |
| if err := c.Kill(ctx, r.Signal, 0, true); err != nil { |
| return nil, err |
| } |
| case *api.KillRequest_Pid: |
| if err := c.Kill(ctx, r.Signal, v.Pid, false); err != nil { |
| return nil, err |
| } |
| default: |
| return nil, fmt.Errorf("invalid option specified; expected pid or all") |
| } |
| return empty, nil |
| } |
| |
| func (s *Service) Processes(ctx context.Context, r *api.ProcessesRequest) (*api.ProcessesResponse, error) { |
| c, err := s.getTask(r.ContainerID) |
| if err != nil { |
| return nil, err |
| } |
| |
| pids, err := c.Processes(ctx) |
| if err != nil { |
| return nil, err |
| } |
| |
| ps := []*task.Process{} |
| for _, pid := range pids { |
| ps = append(ps, &task.Process{ |
| Pid: pid, |
| }) |
| } |
| |
| resp := &api.ProcessesResponse{ |
| Processes: ps, |
| } |
| |
| return resp, nil |
| } |
| |
| func (s *Service) Events(r *api.EventsRequest, server api.Tasks_EventsServer) error { |
| w := &grpcEventWriter{ |
| server: server, |
| } |
| return s.collector.forward(w) |
| } |
| |
| func (s *Service) Exec(ctx context.Context, r *api.ExecRequest) (*api.ExecResponse, error) { |
| c, err := s.getTask(r.ContainerID) |
| if err != nil { |
| return nil, err |
| } |
| process, err := c.Exec(ctx, plugin.ExecOpts{ |
| Spec: r.Spec.Value, |
| IO: plugin.IO{ |
| Stdin: r.Stdin, |
| Stdout: r.Stdout, |
| Stderr: r.Stderr, |
| Terminal: r.Terminal, |
| }, |
| }) |
| if err != nil { |
| return nil, err |
| } |
| state, err := process.State(ctx) |
| if err != nil { |
| return nil, err |
| } |
| return &api.ExecResponse{ |
| Pid: state.Pid(), |
| }, nil |
| } |
| |
| func (s *Service) Pty(ctx context.Context, r *api.PtyRequest) (*google_protobuf.Empty, error) { |
| c, err := s.getTask(r.ContainerID) |
| if err != nil { |
| return nil, err |
| } |
| if err := c.Pty(ctx, r.Pid, plugin.ConsoleSize{ |
| Width: r.Width, |
| Height: r.Height, |
| }); err != nil { |
| return nil, err |
| } |
| return empty, nil |
| } |
| |
| func (s *Service) CloseStdin(ctx context.Context, r *api.CloseStdinRequest) (*google_protobuf.Empty, error) { |
| c, err := s.getTask(r.ContainerID) |
| if err != nil { |
| return nil, err |
| } |
| if err := c.CloseStdin(ctx, r.Pid); err != nil { |
| return nil, err |
| } |
| return empty, nil |
| } |
| |
| func (s *Service) Checkpoint(ctx context.Context, r *api.CheckpointRequest) (*api.CheckpointResponse, error) { |
| c, err := s.getTask(r.ContainerID) |
| if err != nil { |
| return nil, err |
| } |
| image, err := ioutil.TempDir("", "ctd-checkpoint") |
| if err != nil { |
| return nil, err |
| } |
| defer os.RemoveAll(image) |
| if err := c.Checkpoint(ctx, plugin.CheckpointOpts{ |
| Exit: r.Exit, |
| AllowTCP: r.AllowTcp, |
| AllowTerminal: r.AllowTerminal, |
| AllowUnixSockets: r.AllowUnixSockets, |
| FileLocks: r.FileLocks, |
| // ParentImage: r.ParentImage, |
| EmptyNamespaces: r.EmptyNamespaces, |
| Path: image, |
| }); err != nil { |
| return nil, err |
| } |
| // write checkpoint to the content store |
| tar := archive.Diff(ctx, "", image) |
| cp, err := s.writeContent(ctx, images.MediaTypeContainerd1Checkpoint, image, tar) |
| // close tar first after write |
| if err := tar.Close(); err != nil { |
| return nil, err |
| } |
| if err != nil { |
| return nil, err |
| } |
| // write the config to the content store |
| spec := bytes.NewReader(c.Info().Spec) |
| specD, err := s.writeContent(ctx, images.MediaTypeContainerd1CheckpointConfig, filepath.Join(image, "spec"), spec) |
| if err != nil { |
| return nil, err |
| } |
| return &api.CheckpointResponse{ |
| Descriptors: []*descriptor.Descriptor{ |
| cp, |
| specD, |
| }, |
| }, nil |
| } |
| |
| func (s *Service) writeContent(ctx context.Context, mediaType, ref string, r io.Reader) (*descriptor.Descriptor, error) { |
| writer, err := s.store.Writer(ctx, ref, 0, "") |
| if err != nil { |
| return nil, err |
| } |
| defer writer.Close() |
| size, err := io.Copy(writer, r) |
| if err != nil { |
| return nil, err |
| } |
| if err := writer.Commit(0, ""); err != nil { |
| return nil, err |
| } |
| return &descriptor.Descriptor{ |
| MediaType: mediaType, |
| Digest: writer.Digest(), |
| Size_: size, |
| }, nil |
| } |
| |
| func (s *Service) getTask(id string) (plugin.Task, error) { |
| s.mu.Lock() |
| c, ok := s.tasks[id] |
| s.mu.Unlock() |
| if !ok { |
| return nil, grpc.Errorf(codes.NotFound, "task %v not found", id) |
| } |
| return c, nil |
| } |
| |
| func (s *Service) getRuntime(name string) (plugin.Runtime, error) { |
| runtime, ok := s.runtimes[name] |
| if !ok { |
| return nil, plugin.ErrUnknownRuntime |
| } |
| return runtime, nil |
| } |
| |
| type grpcEventWriter struct { |
| server api.Tasks_EventsServer |
| } |
| |
| func (g *grpcEventWriter) Write(e *plugin.Event) error { |
| var t task.Event_EventType |
| switch e.Type { |
| case plugin.ExitEvent: |
| t = task.Event_EXIT |
| case plugin.ExecAddEvent: |
| t = task.Event_EXEC_ADDED |
| case plugin.PausedEvent: |
| t = task.Event_PAUSED |
| case plugin.CreateEvent: |
| t = task.Event_CREATE |
| case plugin.StartEvent: |
| t = task.Event_START |
| case plugin.OOMEvent: |
| t = task.Event_OOM |
| } |
| return g.server.Send(&task.Event{ |
| Type: t, |
| ID: e.ID, |
| Pid: e.Pid, |
| ExitStatus: e.ExitStatus, |
| }) |
| } |