| // Copyright 2016 The LUCI Authors. |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package collector |
| |
| import ( |
| "bytes" |
| "context" |
| "crypto/sha256" |
| "encoding/hex" |
| "errors" |
| "fmt" |
| "sort" |
| "strings" |
| "sync" |
| |
| "google.golang.org/protobuf/types/known/timestamppb" |
| |
| "go.chromium.org/luci/common/clock" |
| "go.chromium.org/luci/logdog/api/logpb" |
| "go.chromium.org/luci/logdog/client/pubsubprotocol" |
| "go.chromium.org/luci/logdog/common/storage" |
| "go.chromium.org/luci/logdog/common/types" |
| cc "go.chromium.org/luci/logdog/server/collector/coordinator" |
| ) |
| |
| var testSecret = bytes.Repeat([]byte{0x55}, types.PrefixSecretLength) |
| |
| type streamKey struct { |
| project string |
| id string |
| } |
| |
| func mkStreamKey(project, id string) streamKey { |
| return streamKey{project, id} |
| } |
| |
| // testCoordinator is an implementation of Coordinator that can be used for |
| // testing. |
| type testCoordinator struct { |
| sync.Mutex |
| |
| // registerCallback, if not nil, is called when stream registration happens. |
| registerCallback func(cc.LogStreamState) error |
| // terminateCallback, if not nil, is called when stream termination happens. |
| terminateCallback func(cc.TerminateRequest) error |
| |
| // state is the latest tracked stream state. |
| state map[streamKey]*cc.LogStreamState |
| } |
| |
| var _ cc.Coordinator = (*testCoordinator)(nil) |
| |
| func (c *testCoordinator) register(s cc.LogStreamState) cc.LogStreamState { |
| c.Lock() |
| defer c.Unlock() |
| |
| // Update our state. |
| if c.state == nil { |
| c.state = make(map[streamKey]*cc.LogStreamState) |
| } |
| |
| id := idFromPath(string(s.Path)) |
| key := mkStreamKey(string(s.Project), id) |
| |
| if sp := c.state[key]; sp != nil { |
| return *sp |
| } |
| |
| s.ID = id |
| c.state[key] = &s |
| return s |
| } |
| |
| func (c *testCoordinator) RegisterStream(ctx context.Context, s *cc.LogStreamState, desc []byte) (*cc.LogStreamState, error) { |
| if cb := c.registerCallback; cb != nil { |
| if err := cb(*s); err != nil { |
| return nil, err |
| } |
| } |
| |
| sp := c.register(*s) |
| return &sp, nil |
| } |
| |
| func (c *testCoordinator) TerminateStream(ctx context.Context, tr *cc.TerminateRequest) error { |
| if cb := c.terminateCallback; cb != nil { |
| if err := cb(*tr); err != nil { |
| return err |
| } |
| } |
| |
| if tr.TerminalIndex < 0 { |
| return errors.New("submitted stream is not terminal") |
| } |
| |
| c.Lock() |
| defer c.Unlock() |
| |
| // Update our state. |
| cachedState, ok := c.state[mkStreamKey(string(tr.Project), tr.ID)] |
| if !ok { |
| return fmt.Errorf("no such stream: %s", tr.ID) |
| } |
| if cachedState.TerminalIndex >= 0 && tr.TerminalIndex != cachedState.TerminalIndex { |
| return fmt.Errorf("incompatible terminal indexes: %d != %d", tr.TerminalIndex, cachedState.TerminalIndex) |
| } |
| |
| cachedState.TerminalIndex = tr.TerminalIndex |
| return nil |
| } |
| |
| func (c *testCoordinator) stream(project, id string) (int, bool) { |
| c.Lock() |
| defer c.Unlock() |
| |
| sp, ok := c.state[mkStreamKey(project, id)] |
| if !ok { |
| return 0, false |
| } |
| return int(sp.TerminalIndex), true |
| } |
| |
| func (c *testCoordinator) streamForPath(project, path string) (int, bool) { |
| return c.stream(project, idFromPath(path)) |
| } |
| |
| // testStorage is a testing storage instance that returns errors. |
| type testStorage struct { |
| storage.Storage |
| err func() error |
| } |
| |
| func (s *testStorage) Put(c context.Context, r storage.PutRequest) error { |
| if s.err != nil { |
| if err := s.err(); err != nil { |
| return err |
| } |
| } |
| return s.Storage.Put(c, r) |
| } |
| |
| // bundleBuilder is a set of utility functions to help test cases construct |
| // specific logpb.ButlerLogBundle layouts. |
| type bundleBuilder struct { |
| context.Context |
| |
| base *logpb.ButlerLogBundle |
| } |
| |
| func (b *bundleBuilder) genBase() *logpb.ButlerLogBundle { |
| if b.base == nil { |
| b.base = &logpb.ButlerLogBundle{ |
| Timestamp: timestamppb.New(clock.Now(b)), |
| Project: "test-project", |
| Prefix: "foo", |
| Secret: testSecret, |
| } |
| } |
| return b.base |
| } |
| |
| func (b *bundleBuilder) addBundleEntry(be *logpb.ButlerLogBundle_Entry) { |
| base := b.genBase() |
| base.Entries = append(base.Entries, be) |
| } |
| |
| func (b *bundleBuilder) genBundleEntry(name string, tidx int, idxs ...int) *logpb.ButlerLogBundle_Entry { |
| p, n := types.StreamPath(name).Split() |
| be := logpb.ButlerLogBundle_Entry{ |
| Desc: &logpb.LogStreamDescriptor{ |
| Prefix: string(p), |
| Name: string(n), |
| ContentType: "application/test-message", |
| StreamType: logpb.StreamType_TEXT, |
| Timestamp: timestamppb.New(clock.Now(b)), |
| }, |
| } |
| |
| if len(idxs) > 0 { |
| be.Logs = make([]*logpb.LogEntry, len(idxs)) |
| for i, idx := range idxs { |
| be.Logs[i] = b.logEntry(idx) |
| } |
| if tidx >= 0 { |
| be.Terminal = true |
| be.TerminalIndex = uint64(tidx) |
| } |
| } |
| |
| return &be |
| } |
| |
| func (b *bundleBuilder) addStreamEntries(name string, term int, idxs ...int) { |
| b.addBundleEntry(b.genBundleEntry(name, term, idxs...)) |
| } |
| |
| func (b *bundleBuilder) addFullStream(name string, count int) { |
| idxs := make([]int, count) |
| for i := range idxs { |
| idxs[i] = i |
| } |
| b.addStreamEntries(name, count-1, idxs...) |
| } |
| |
| func (b *bundleBuilder) logEntry(idx int) *logpb.LogEntry { |
| return &logpb.LogEntry{ |
| StreamIndex: uint64(idx), |
| Sequence: uint64(idx), |
| Content: &logpb.LogEntry_Text{ |
| Text: &logpb.Text{ |
| Lines: []*logpb.Text_Line{ |
| { |
| Value: []byte(fmt.Sprintf("Line #%d", idx)), |
| Delimiter: "\n", |
| }, |
| }, |
| }, |
| }, |
| } |
| } |
| |
| func (b *bundleBuilder) bundle() []byte { |
| buf := bytes.Buffer{} |
| w := pubsubprotocol.Writer{Compress: true} |
| if err := w.Write(&buf, b.genBase()); err != nil { |
| panic(err) |
| } |
| |
| b.base = nil |
| return buf.Bytes() |
| } |
| |
| type indexRange struct { |
| start int |
| end int |
| } |
| |
| func (r *indexRange) String() string { return fmt.Sprintf("[%d..%d]", r.start, r.end) } |
| |
| // shouldHaveRegisteredStream asserts that a testCoordinator has |
| // registered a stream (string) and its terminal index (int). |
| func shouldHaveRegisteredStream(actual interface{}, expected ...interface{}) string { |
| tcc := actual.(*testCoordinator) |
| |
| if len(expected) != 3 { |
| return "invalid number of expected arguments (should be 3)." |
| } |
| project := expected[0].(string) |
| path := expected[1].(string) |
| tidx := expected[2].(int) |
| |
| cur, ok := tcc.streamForPath(project, path) |
| if !ok { |
| return fmt.Sprintf("stream %q is not registered", path) |
| } |
| if tidx >= 0 && cur < 0 { |
| return fmt.Sprintf("stream %q is expected to be terminated, but isn't.", path) |
| } |
| if cur >= 0 && tidx < 0 { |
| return fmt.Sprintf("stream %q is NOT expected to be terminated, but it is.", path) |
| } |
| return "" |
| } |
| |
| // shoudNotHaveRegisteredStream asserts that a testCoordinator has not |
| // registered a stream (string). |
| func shouldNotHaveRegisteredStream(actual interface{}, expected ...interface{}) string { |
| tcc := actual.(*testCoordinator) |
| if len(expected) != 2 { |
| return "invalid number of expected arguments (should be 2)." |
| } |
| project := expected[0].(string) |
| path := expected[1].(string) |
| |
| if _, ok := tcc.streamForPath(project, path); ok { |
| return fmt.Sprintf("stream %q is registered, but it should NOT be.", path) |
| } |
| return "" |
| } |
| |
| // shouldHaveStoredStream asserts that a storage.Storage instance has contiguous |
| // stream records in it. |
| // |
| // actual is the storage.Storage instance. expected is a stream name (string) |
| // followed by a a series of records to assert. This can either be a specific |
| // integer index or an intexRange marking a closed range of indices. |
| func shouldHaveStoredStream(actual interface{}, expected ...interface{}) string { |
| st := actual.(storage.Storage) |
| project := expected[0].(string) |
| name := expected[1].(string) |
| expected = expected[2:] |
| |
| // Load all entries for this stream. |
| req := storage.GetRequest{ |
| Project: project, |
| Path: types.StreamPath(name), |
| } |
| |
| entries := make(map[int]*logpb.LogEntry) |
| var ierr error |
| err := st.Get(context.Background(), req, func(e *storage.Entry) bool { |
| var le *logpb.LogEntry |
| if le, ierr = e.GetLogEntry(); ierr != nil { |
| return false |
| } |
| entries[int(le.StreamIndex)] = le |
| return true |
| }) |
| if ierr != nil { |
| err = ierr |
| } |
| if err != nil && err != storage.ErrDoesNotExist { |
| return fmt.Sprintf("error: %v", err) |
| } |
| |
| assertLogEntry := func(i int) string { |
| le := entries[i] |
| if le == nil { |
| return fmt.Sprintf("%d", i) |
| } |
| delete(entries, i) |
| |
| if le.StreamIndex != uint64(i) { |
| return fmt.Sprintf("*%d", i) |
| } |
| return "" |
| } |
| |
| var failed []string |
| for _, exp := range expected { |
| switch e := exp.(type) { |
| case int: |
| if err := assertLogEntry(e); err != "" { |
| failed = append(failed, fmt.Sprintf("missing{%s}", err)) |
| } |
| |
| case indexRange: |
| var errs []string |
| for i := e.start; i <= e.end; i++ { |
| if err := assertLogEntry(i); err != "" { |
| errs = append(errs, err) |
| } |
| } |
| if len(errs) > 0 { |
| failed = append(failed, fmt.Sprintf("%s{%s}", e.String(), strings.Join(errs, ","))) |
| } |
| |
| default: |
| panic(fmt.Errorf("unknown expected type %T", e)) |
| } |
| } |
| |
| // Extras? |
| if len(entries) > 0 { |
| idxs := make([]int, 0, len(entries)) |
| for i := range entries { |
| idxs = append(idxs, i) |
| } |
| sort.Ints(idxs) |
| |
| extra := make([]string, len(idxs)) |
| for i, idx := range idxs { |
| extra[i] = fmt.Sprintf("%d", idx) |
| } |
| failed = append(failed, fmt.Sprintf("extra{%s}", strings.Join(extra, ","))) |
| } |
| |
| if len(failed) > 0 { |
| return strings.Join(failed, ", ") |
| } |
| return "" |
| } |
| |
| func idFromPath(path string) string { |
| hash := sha256.Sum256([]byte(path)) |
| return hex.EncodeToString(hash[:]) |
| } |