| // Copyright 2020 The Chromium OS Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style license that can be |
| // found in the LICENSE file. |
| |
| package driver |
| |
| import ( |
| "fmt" |
| "io" |
| "io/ioutil" |
| "os" |
| "sync" |
| |
| "github.com/golang/protobuf/jsonpb" |
| "github.com/golang/protobuf/proto" |
| tnProto "go.chromium.org/chromiumos/config/go/api/test/harness/tnull/v1" |
| results "go.chromium.org/chromiumos/config/go/api/test/results/v2" |
| rtd "go.chromium.org/chromiumos/config/go/api/test/rtd/v1" |
| |
| "go.chromium.org/luci/common/errors" |
| "go.chromium.org/luci/common/sync/parallel" |
| ) |
| |
| type progressSinkInternalClient struct { |
| config *rtd.ProgressSinkClientConfig |
| driver *Driver |
| failureReason error |
| } |
| |
| func (c *progressSinkInternalClient) archiveArtifact(req *rtd.ArchiveArtifactRequest) *rtd.ArchiveArtifactResponse { |
| c.failureReason = errors.Reason("'archiveArtifact' Not yet implemented").Err() |
| c.printErr(c.failureReason) |
| return nil |
| } |
| |
| func (c *progressSinkInternalClient) reportLog(req *rtd.ReportLogRequest) *rtd.ReportLogResponse { |
| c.failureReason = errors.Reason("'reportLog' Not yet implemented").Err() |
| c.printErr(c.failureReason) |
| return nil |
| } |
| |
| func (c *progressSinkInternalClient) reportResult(req *rtd.ReportResultRequest) *rtd.ReportResultResponse { |
| c.failureReason = errors.Reason("'reportResult' Not yet implemented").Err() |
| c.printErr(c.failureReason) |
| return nil |
| } |
| |
| func (c *progressSinkInternalClient) debugLog(req proto.Message, resp proto.Message) *rtd.ReportLogResponse { |
| // This form of the method is temporary and serves as a stand-in for an actual RTS connection |
| localDebug := c.driver.debugErr |
| m := jsonpb.Marshaler{EmitDefaults: true, Indent: " "} |
| fmt.Fprintln(localDebug, "Request:") |
| if err := m.Marshal(localDebug, req); err != nil { |
| err = errors.Annotate(err, "failed to marshal request:").Err() |
| fmt.Fprintln(localDebug, err) |
| } |
| fmt.Fprintln(localDebug, "Response:") |
| if err := m.Marshal(localDebug, resp); err != nil { |
| err = errors.Annotate(err, "failed to marshal response:").Err() |
| fmt.Fprintln(localDebug, err) |
| } |
| return nil |
| } |
| |
| func (c *progressSinkInternalClient) printErr(err error) { |
| // This method is temporary and serves as a stand-in for an actual RTS connection |
| c.failureReason = err |
| fmt.Fprintln(c.driver.debugErr, err) |
| } |
| |
| // Driver is a single-instance object which runs all the operations for a test |
| type Driver struct { |
| artifacts []*tnProto.MockArtifact |
| emitResult sync.Once |
| client *progressSinkInternalClient |
| config *rtd.ProgressSinkClientConfig |
| logs []*tnProto.MockLog |
| result *results.Result |
| debugErr io.Writer |
| } |
| |
| func (d *Driver) SetDebugErrDestination(dest io.Writer) { |
| d.debugErr = dest |
| } |
| |
| // Setup sets config and prepares a result/logs/artifacts. It is called only once. |
| func (d *Driver) Setup(testName string, setup tnProto.SetupStep) error { |
| d.config = setup.Config |
| d.client = &progressSinkInternalClient{config: setup.Config, driver: d} |
| d.result = setup.Result |
| d.logs = setup.Logs |
| d.artifacts = setup.Artifacts |
| return d.client.failureReason |
| } |
| |
| // Archive sends all the stored mock artifacts to the RPC service, in parallel |
| func (d *Driver) Archive(requestName string) error { |
| return parallel.WorkPool(0, func(fchan chan<- func() error) { |
| for _, a := range d.artifacts { |
| a := a |
| fchan <- func() error { |
| tmp, err := ioutil.TempFile("", "") |
| if err != nil { |
| return errors.Annotate(err, "archiving %s", a.Name).Err() |
| } |
| ioutil.WriteFile(tmp.Name(), a.FileBytes, os.ModePerm) |
| req := &rtd.ArchiveArtifactRequest{ |
| Name: a.Name, |
| Request: requestName, |
| LocalPath: tmp.Name(), |
| } |
| resp := d.client.archiveArtifact(req) |
| d.client.debugLog(req, resp) |
| return d.client.failureReason |
| } |
| } |
| }) |
| } |
| |
| // Log reports all the stored mock logs to the RPC service, in parallel |
| func (d *Driver) Log(requestName string) error { |
| return parallel.WorkPool(0, func(fchan chan<- func() error) { |
| for _, l := range d.logs { |
| l := l |
| for _, m := range l.Messages { |
| m := m |
| fchan <- func() error { |
| req := &rtd.ReportLogRequest{ |
| Name: l.Name, |
| Request: requestName, |
| Data: []byte(m), |
| } |
| resp := d.client.reportLog(req) |
| d.client.debugLog(req, resp) |
| return d.client.failureReason |
| } |
| } |
| } |
| }) |
| } |
| |
| // Result reports the already-stored result to the RPC service |
| func (d *Driver) Result(requestName string) error { |
| var wasFirstTime bool |
| d.emitResult.Do(func() { |
| req := &rtd.ReportResultRequest{ |
| Request: requestName, |
| Result: d.result, |
| } |
| resp := d.client.reportResult(req) |
| d.client.debugLog(req, resp) |
| wasFirstTime = true |
| }) |
| if !wasFirstTime { |
| return errors.Reason("Result cannot be called twice.").Err() |
| } |
| return d.client.failureReason |
| } |