exporter/zipkin: option to set RemoteEndpoint Provide an option `WithRemoteEndpoint` that allows setting the remote endpoint of a Zipkin exporter. This change is necessary because the constructor NewExporter only takes in a local endpoint, implying that remoteEndpoint is optional but when necessary, we need to send this information along since it is used by Zipkin to construct a service graph. I found this issue while working on the OpenCensus service/agent. Fixes #959
diff --git a/exporter/zipkin/options.go b/exporter/zipkin/options.go new file mode 100644 index 0000000..6ad98c5 --- /dev/null +++ b/exporter/zipkin/options.go
@@ -0,0 +1,36 @@ +// Copyright 2018, OpenCensus Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package zipkin + +import "github.com/openzipkin/zipkin-go/model" + +type Option interface { + mutateExporter(*Exporter) +} + +type remoteEndpoint struct { + endpoint *model.Endpoint +} + +var _ Option = (*remoteEndpoint)(nil) + +// WithRemoteEndpoint sets the remote endpoint of the exporter. +func WithRemoteEndpoint(endpoint *model.Endpoint) Option { + return &remoteEndpoint{endpoint: endpoint} +} + +func (re *remoteEndpoint) mutateExporter(exp *Exporter) { + exp.remoteEndpoint = re.endpoint +}
diff --git a/exporter/zipkin/zipkin.go b/exporter/zipkin/zipkin.go index 30d2fa4..4666dec 100644 --- a/exporter/zipkin/zipkin.go +++ b/exporter/zipkin/zipkin.go
@@ -27,8 +27,9 @@ // Exporter is an implementation of trace.Exporter that uploads spans to a // Zipkin server. type Exporter struct { - reporter reporter.Reporter - localEndpoint *model.Endpoint + reporter reporter.Reporter + localEndpoint *model.Endpoint + remoteEndpoint *model.Endpoint } // NewExporter returns an implementation of trace.Exporter that uploads spans @@ -42,16 +43,22 @@ // constructed with github.com/openzipkin/zipkin-go.NewEndpoint, e.g.: // localEndpoint, err := NewEndpoint("my server", listener.Addr().String()) // localEndpoint can be nil. -func NewExporter(reporter reporter.Reporter, localEndpoint *model.Endpoint) *Exporter { - return &Exporter{ +func NewExporter(reporter reporter.Reporter, localEndpoint *model.Endpoint, opts ...Option) *Exporter { + exp := &Exporter{ reporter: reporter, localEndpoint: localEndpoint, } + + for _, opt := range opts { + opt.mutateExporter(exp) + } + + return exp } // ExportSpan exports a span to a Zipkin server. func (e *Exporter) ExportSpan(s *trace.SpanData) { - e.reporter.Send(zipkinSpan(s, e.localEndpoint)) + e.reporter.Send(zipkinSpan(s, e.localEndpoint, e.remoteEndpoint)) } const ( @@ -110,7 +117,7 @@ return model.Undetermined } -func zipkinSpan(s *trace.SpanData, localEndpoint *model.Endpoint) model.SpanModel { +func zipkinSpan(s *trace.SpanData, localEndpoint, remoteEndpoint *model.Endpoint) model.SpanModel { sc := s.SpanContext z := model.SpanModel{ SpanContext: model.SpanContext{ @@ -118,11 +125,12 @@ ID: convertSpanID(sc.SpanID), Sampled: &sampledTrue, }, - Kind: spanKind(s), - Name: s.Name, - Timestamp: s.StartTime, - Shared: false, - LocalEndpoint: localEndpoint, + Kind: spanKind(s), + Name: s.Name, + Timestamp: s.StartTime, + Shared: false, + LocalEndpoint: localEndpoint, + RemoteEndpoint: remoteEndpoint, } if s.ParentSpanID != (trace.SpanID{}) {
diff --git a/exporter/zipkin/zipkin_test.go b/exporter/zipkin/zipkin_test.go index 2d5f81c..0e121d8 100644 --- a/exporter/zipkin/zipkin_test.go +++ b/exporter/zipkin/zipkin_test.go
@@ -15,14 +15,18 @@ package zipkin import ( + "bytes" "encoding/json" "io/ioutil" "net/http" + "net/http/httptest" "reflect" "strings" + "sync" "testing" "time" + openzipkin "github.com/openzipkin/zipkin-go" "github.com/openzipkin/zipkin-go/model" httpreporter "github.com/openzipkin/zipkin-go/reporter/http" "go.opencensus.io/trace" @@ -212,7 +216,7 @@ }, } for _, tt := range tests { - got := zipkinSpan(tt.span, nil) + got := zipkinSpan(tt.span, nil, nil) if len(got.Annotations) != len(tt.want.Annotations) { t.Fatalf("zipkinSpan: got %d annotations in span, want %d", len(got.Annotations), len(tt.want.Annotations)) } @@ -253,3 +257,84 @@ } } } + +// Ensure that we can pass in a remote endpoint but also that it is +// transmitted to its origina. Issue #959 +func TestRemoteEndpointOptionAndTransmission(t *testing.T) { + type lockableBuffer struct { + sync.Mutex + *bytes.Buffer + } + + buf := &lockableBuffer{Mutex: sync.Mutex{}, Buffer: new(bytes.Buffer)} + + cst := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + blob, _ := ioutil.ReadAll(r.Body) + _ = r.Body.Close() + buf.Lock() + buf.Write(blob) + buf.Unlock() + })) + defer cst.Close() + + reporter := httpreporter.NewReporter(cst.URL, httpreporter.BatchInterval(10*time.Millisecond)) + localEndpoint, _ := openzipkin.NewEndpoint("app", "10.0.0.17") + remoteEndpoint, _ := openzipkin.NewEndpoint("memcached", "10.0.0.42") + exp := NewExporter(reporter, localEndpoint, WithRemoteEndpoint(remoteEndpoint)) + exp.ExportSpan(&trace.SpanData{ + Name: "Test", + }) + + // Wait for the upload + <-time.After(300 * time.Millisecond) + + want := `[{ + "traceId":"0000000000000000", + "id":"0000000000000000", + "name":"Test", + "localEndpoint":{ + "serviceName":"app", + "ipv4":"10.0.0.17" + }, + "remoteEndpoint":{ + "serviceName":"memcached","ipv4":"10.0.0.42" + } + }]` + + buf.Lock() + got := buf.String() + buf.Unlock() + + // Since the reported JSON could contain spaces and other indentation, + // strip spaces out but also the fields could be mangled so we'll instead + // just use an anagram equivalence to ensure all the output is present + replacer := strings.NewReplacer(" ", "", "\t", "", "\n", "") + wj := replacer.Replace(want) + gj := replacer.Replace(got) + if !anagrams(gj, wj) { + t.Errorf("Mismatched JSON content\nGot:\n\t%s\nWant:\n\t%s", gj, wj) + } +} + +func anagrams(s1, s2 string) bool { + if len(s1) != len(s2) { + return false + } + if s1 == "" && s2 == "" { + return true + } + m1 := make(map[byte]int) + for i := range s1 { + m1[s1[i]] += 1 + m1[s2[i]] -= 1 + } + + // Finally check that all the values are at 0 + // that is, all the letters in s1 were matched in s2 + for _, count := range m1 { + if count != 0 { + return false + } + } + return true +}