Telemetry: Cleanup batch configuration.

Batch publishing is the default and we will not be returning to
the previous scheme. Cleanup vestigial batch publishing
configurations and args, and drop the old exporter.

BUG=b:321307708
TEST=run_tests, CQ

Change-Id: I8d319559d9ccc93590fc65f26533a5b0003ed121
Reviewed-on: https://chromium-review.googlesource.com/c/chromiumos/chromite/+/5545080
Auto-Submit: Alex Klein <saklein@chromium.org>
Reviewed-by: Sergey Frolov <sfrolov@google.com>
Commit-Queue: Sergey Frolov <sfrolov@google.com>
Tested-by: Alex Klein <saklein@chromium.org>
Commit-Queue: Alex Klein <saklein@chromium.org>
diff --git a/cli/cros/cros_telemetry.py b/cli/cros/cros_telemetry.py
index ccc15b6..0684003 100644
--- a/cli/cros/cros_telemetry.py
+++ b/cli/cros/cros_telemetry.py
@@ -87,16 +87,6 @@
             action="store_true",
             help="Regenerate UUIDs.",
         )
-        actions.add_argument(
-            "--batch",
-            action="store_true",
-            help="Write telemetry to files and do batch uploading.",
-        )
-        actions.add_argument(
-            "--stop-batch",
-            action="store_true",
-            help="Stop writing telemetry to files and doing batch uploading.",
-        )
 
     @staticmethod
     def _show_telemetry(cfg: config.Config) -> None:
@@ -108,8 +98,6 @@
             )
             if cfg.trace_config.dev_flag:
                 print(f"{config.KEY_DEV} = True")
-            if cfg.trace_config.batch:
-                print(f"{config.BATCH_PUBLISHING_ENABLED_KEY} = True")
         else:
             print(f"notice_countdown = {cfg.root_config.notice_countdown}")
 
@@ -140,9 +128,5 @@
         elif self.options.regen_ids:
             span.set_attribute("regen_ids", True)
             cfg.trace_config.gen_id(regen=True)
-        elif self.options.batch:
-            cfg.trace_config.set_batch(True)
-        elif self.options.stop_batch:
-            cfg.trace_config.set_batch(False)
 
         cfg.flush()
diff --git a/lib/telemetry/config.py b/lib/telemetry/config.py
index d7dbb56..74a72d3 100644
--- a/lib/telemetry/config.py
+++ b/lib/telemetry/config.py
@@ -24,7 +24,6 @@
     ROOT_SECTION_KEY: {NOTICE_COUNTDOWN_KEY: 10},
     TRACE_SECTION_KEY: {},
 }
-BATCH_PUBLISHING_ENABLED_KEY = "batch_publishing"
 # The "telemetry in development" config to allow publishing the telemetry, but
 # easily filtering it out later.
 KEY_DEV = "development"
@@ -69,19 +68,6 @@
 
         return False
 
-    def set_batch(self, enabled: bool) -> None:
-        """Set or delete the batch flag."""
-        self._config.set(
-            TRACE_SECTION_KEY, BATCH_PUBLISHING_ENABLED_KEY, str(enabled)
-        )
-
-    @property
-    def batch(self) -> bool:
-        """Check if batch uploads are configured."""
-        return self._config[TRACE_SECTION_KEY].getboolean(
-            BATCH_PUBLISHING_ENABLED_KEY, False
-        )
-
     def _uuid_stale(self):
         """Check if the UUID is stale or doesn't exist."""
         if (
diff --git a/lib/telemetry/cros_detector.py b/lib/telemetry/cros_detector.py
index 18b19b6..dc43226 100644
--- a/lib/telemetry/cros_detector.py
+++ b/lib/telemetry/cros_detector.py
@@ -73,12 +73,10 @@
         self,
         *args,
         force_dev: bool = False,
-        batch_publishing: bool = False,
         **kwargs,
     ) -> None:
         super().__init__(*args, **kwargs)
         self.force_dev = force_dev
-        self.batch_publishing = batch_publishing
 
     def detect(self) -> resources.Resource:
         resource = {
@@ -87,7 +85,6 @@
                 or os.environ.get("CHROMITE_TELEMETRY_IGNORE") == "1"
             ),
             "development.tag": os.environ.get("CHROMITE_TELEMETRY_TAG", ""),
-            "development.batch_publishing": self.batch_publishing,
         }
 
         return resources.Resource(resource)
diff --git a/lib/telemetry/trace/__init__.py b/lib/telemetry/trace/__init__.py
index 33b2e8c..e66a43b 100644
--- a/lib/telemetry/trace/__init__.py
+++ b/lib/telemetry/trace/__init__.py
@@ -68,7 +68,6 @@
     enabled: bool = False,
     development_mode: bool = False,
     user_uuid: str = "",
-    batch: bool = True,
 ) -> None:
     """Initialize opentelemetry tracing.
 
@@ -80,7 +79,6 @@
         development_mode: Mark the telemetry as in development, so it can be
             easily identified as such later, e.g. filtered out of queries.
         user_uuid: The user's UUID.
-        batch: Write telemetry to files for batch publishing.
     """
 
     # The opentelemetry imports are moved inside this function to reduce the
@@ -107,7 +105,6 @@
     from chromite.lib.telemetry.trace import chromite_tracer
     from chromite.utils import hostname_util
     from chromite.utils.telemetry import detector
-    from chromite.utils.telemetry import exporter as utils_exporter
 
     # Need this to globally mark telemetry initialized to enable real imports.
     # pylint: disable=global-statement
@@ -127,9 +124,7 @@
             detector.ProcessDetector(),
             cros_detector.SDKSourceDetector(),  # type: ignore[no-untyped-call]
             detector.SystemDetector(),  # type: ignore[no-untyped-call]
-            cros_detector.DevelopmentDetector(
-                force_dev=development_mode, batch_publishing=batch
-            ),
+            cros_detector.DevelopmentDetector(force_dev=development_mode),
             cros_detector.UserDetector(user_uuid=user_uuid),
         ]
     )
@@ -144,19 +139,10 @@
         return
 
     if enabled:
-        if batch:
-            path = _get_trace_path()
-            tracer_provider.add_span_processor(
-                otel_export.SimpleSpanProcessor(
-                    exporter.ChromiteFileExporter(path)
-                )
-            )
-        else:
-            tracer_provider.add_span_processor(
-                otel_export.BatchSpanProcessor(
-                    utils_exporter.ClearcutSpanExporter()
-                )
-            )
+        path = _get_trace_path()
+        tracer_provider.add_span_processor(
+            otel_export.SimpleSpanProcessor(exporter.ChromiteFileExporter(path))
+        )
 
     if TRACEPARENT_ENVVAR in os.environ:
         ctx = tracecontext.TraceContextTextMapPropagator().extract(os.environ)
diff --git a/utils/telemetry/exporter.py b/utils/telemetry/exporter.py
deleted file mode 100644
index d64d873..0000000
--- a/utils/telemetry/exporter.py
+++ /dev/null
@@ -1,361 +0,0 @@
-# Copyright 2023 The ChromiumOS Authors
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Defines span exporters to be used with tracer."""
-
-import datetime
-import logging
-import time
-from typing import Callable, Dict, Optional, Sequence
-import urllib.request
-
-from chromite.third_party.google.protobuf import json_format
-from chromite.third_party.google.protobuf import message as proto_msg
-from chromite.third_party.google.protobuf import struct_pb2
-from chromite.third_party.opentelemetry import trace as trace_api
-from chromite.third_party.opentelemetry.sdk import resources
-from chromite.third_party.opentelemetry.sdk import trace
-from chromite.third_party.opentelemetry.sdk.trace import export
-from chromite.third_party.opentelemetry.util import types
-
-# Required due to incomplete proto support in chromite. This proto usage is not
-# tied to the Build API, so delegating the proto handling to api/ does not make
-# sense. When proto is better supported in chromite, the protos could, for
-# example, live somewhere in utils/ instead.
-from chromite.api.gen.chromite.telemetry import clientanalytics_pb2
-from chromite.api.gen.chromite.telemetry import trace_span_pb2
-from chromite.utils.telemetry import detector
-from chromite.utils.telemetry import utils
-
-
-_DEFAULT_ENDPOINT = "https://play.googleapis.com/log"
-_DEFAULT_TIMEOUT = 15
-_DEFAULT_FLUSH_TIMEOUT_MILLIS = 30000
-_DEAULT_MAX_WAIT_SECS = 60
-# Preallocated in Clearcut proto to Build.
-_LOG_SOURCE = 2044
-# Preallocated in Clearcut proto to Python clients.
-_CLIENT_TYPE = 33
-_DEFAULT_MAX_QUEUE_SIZE = 1000
-
-
-class AnonymizingFilter:
-    """Applies the anonymizer to TraceSpan messages."""
-
-    def __init__(self, anonymizer: utils.Anonymizer) -> None:
-        self._anonymizer = anonymizer
-
-    def __call__(
-        self, msg: trace_span_pb2.TraceSpan
-    ) -> trace_span_pb2.TraceSpan:
-        """Applies the anonymizer to TraceSpan message."""
-        raw = json_format.MessageToJson(msg)
-        json_msg = self._anonymizer.apply(raw)
-        output = trace_span_pb2.TraceSpan()
-        json_format.Parse(json_msg, output)
-        return output
-
-
-class ClearcutSpanExporter(export.SpanExporter):
-    """Exports the spans to google http endpoint."""
-
-    def __init__(
-        self,
-        endpoint: str = _DEFAULT_ENDPOINT,
-        timeout: int = _DEFAULT_TIMEOUT,
-        max_wait_secs: int = _DEAULT_MAX_WAIT_SECS,
-        max_queue_size: int = _DEFAULT_MAX_QUEUE_SIZE,
-        prefilter: Optional[
-            Callable[[trace_span_pb2.TraceSpan], trace_span_pb2.TraceSpan]
-        ] = None,
-    ) -> None:
-        self._endpoint = endpoint
-        self._timeout = timeout
-        self._prefilter = prefilter or AnonymizingFilter(utils.Anonymizer())
-        self._log_source = _LOG_SOURCE
-        self._next_request_dt = datetime.datetime.now()
-        self._max_wait_secs = max_wait_secs
-        self._queue = []
-        self._max_queue_size = max_queue_size
-
-    def export(
-        self, spans: Sequence[trace.ReadableSpan]
-    ) -> export.SpanExportResult:
-        spans = [self._prefilter(self._translate_span(s)) for s in spans]
-        self._queue.extend(spans)
-
-        if len(self._queue) >= self._max_queue_size:
-            return (
-                export.SpanExportResult.SUCCESS
-                if self._export_batch()
-                else export.SpanExportResult.FAILURE
-            )
-
-        return export.SpanExportResult.SUCCESS
-
-    def shutdown(self) -> None:
-        self.force_flush()
-
-    def force_flush(
-        self, timeout_millis: int = _DEFAULT_FLUSH_TIMEOUT_MILLIS
-    ) -> bool:
-        if self._queue:
-            return self._export_batch(timeout=timeout_millis / 1000)
-
-        return True
-
-    def _translate_context(
-        self, data: trace_api.SpanContext
-    ) -> trace_span_pb2.TraceSpan.Context:
-        ctx = trace_span_pb2.TraceSpan.Context()
-        ctx.trace_id = f"0x{trace_api.format_trace_id(data.trace_id)}"
-        ctx.span_id = f"0x{trace_api.format_span_id(data.span_id)}"
-        ctx.trace_state = repr(data.trace_state)
-        return ctx
-
-    def _translate_attributes(
-        self, data: types.Attributes
-    ) -> struct_pb2.Struct:
-        patch = {}
-        for k, v in data.items():
-            if isinstance(v, tuple):
-                v = list(v)
-            patch[k] = v
-
-        struct = struct_pb2.Struct()
-        try:
-            struct.update(patch)
-        except Exception as e:
-            logging.debug("Set attribute failed: %s", e)
-        return struct
-
-    def _translate_span_attributes(
-        self, data: trace.ReadableSpan
-    ) -> struct_pb2.Struct:
-        return self._translate_attributes(data.attributes)
-
-    def _translate_links(
-        self, data: trace.ReadableSpan
-    ) -> trace_span_pb2.TraceSpan.Link:
-        links = []
-
-        for d in data.links:
-            link = trace_span_pb2.TraceSpan.Link()
-            link.context.MergeFrom(self._translate_context(d.context))
-            link.attributes.MergeFrom(self._translate_attributes(d.attributes))
-            links.append(link)
-
-        return links
-
-    def _translate_events(
-        self, data: trace.ReadableSpan
-    ) -> trace_span_pb2.TraceSpan.Event:
-        events = []
-        for e in data.events:
-            event = trace_span_pb2.TraceSpan.Event()
-            event.event_time_millis = int(e.timestamp / 1e6)
-            event.name = e.name
-            event.attributes.MergeFrom(self._translate_attributes(e.attributes))
-            events.append(event)
-        return events
-
-    def _translate_instrumentation_scope(
-        self, data: trace.ReadableSpan
-    ) -> trace_span_pb2.TraceSpan.InstrumentationScope:
-        s = data.instrumentation_scope
-        scope = trace_span_pb2.TraceSpan.InstrumentationScope()
-        scope.name = s.name
-        scope.version = s.version
-        return scope
-
-    def _translate_env(self, data: Dict[str, str]):
-        environ = {}
-        for k, v in data.items():
-            if k.startswith("process.env."):
-                key = k.split("process.env.")[1]
-                environ[key] = v
-        return environ
-
-    def _translate_resource(
-        self, data: trace.ReadableSpan
-    ) -> trace_span_pb2.TraceSpan.Resource:
-        attrs = dict(data.resource.attributes)
-        resource = trace_span_pb2.TraceSpan.Resource()
-        resource.system.cpu = attrs.pop(detector.CPU_NAME, "")
-        resource.system.host_architecture = attrs.pop(
-            detector.CPU_ARCHITECTURE, ""
-        )
-        resource.system.os_name = attrs.pop(detector.OS_NAME, "")
-        resource.system.os_version = attrs.pop(resources.OS_DESCRIPTION, "")
-        resource.system.os_type = attrs.pop(resources.OS_TYPE, "")
-        resource.process.pid = str(attrs.pop(resources.PROCESS_PID, ""))
-        resource.process.executable_name = attrs.pop(
-            resources.PROCESS_EXECUTABLE_NAME, ""
-        )
-        resource.process.executable_path = attrs.pop(
-            resources.PROCESS_EXECUTABLE_PATH, ""
-        )
-        resource.process.command = attrs.pop(resources.PROCESS_COMMAND, "")
-        resource.process.command_args.extend(
-            attrs.pop(resources.PROCESS_COMMAND_ARGS, [])
-        )
-        resource.process.owner_is_root = (
-            attrs.pop(resources.PROCESS_OWNER, 9999) == 0
-        )
-        resource.process.runtime_name = attrs.pop(
-            resources.PROCESS_RUNTIME_NAME, ""
-        )
-        resource.process.runtime_version = attrs.pop(
-            resources.PROCESS_RUNTIME_VERSION, ""
-        )
-        resource.process.runtime_description = attrs.pop(
-            resources.PROCESS_RUNTIME_DESCRIPTION, ""
-        )
-        resource.process.api_version = str(
-            attrs.pop(detector.PROCESS_RUNTIME_API_VERSION, "")
-        )
-        resource.process.env.update(self._translate_env(attrs))
-        resource.attributes.MergeFrom(self._translate_attributes(attrs))
-        return resource
-
-    def _translate_status(
-        self, data: trace.ReadableSpan
-    ) -> trace_span_pb2.TraceSpan.Status:
-        status = trace_span_pb2.TraceSpan.Status()
-
-        if data.status.status_code == trace.StatusCode.ERROR:
-            status.status_code = (
-                trace_span_pb2.TraceSpan.Status.StatusCode.STATUS_CODE_ERROR
-            )
-        else:
-            status.status_code = (
-                trace_span_pb2.TraceSpan.Status.StatusCode.STATUS_CODE_OK
-            )
-
-        if data.status.description:
-            status.message = data.status.description
-
-        return status
-
-    def _translate_sdk(
-        self, data: trace.ReadableSpan
-    ) -> trace_span_pb2.TraceSpan.TelemetrySdk:
-        attrs = data.resource.attributes
-        sdk = trace_span_pb2.TraceSpan.TelemetrySdk()
-        sdk.name = attrs.get(resources.TELEMETRY_SDK_NAME)
-        sdk.version = attrs.get(resources.TELEMETRY_SDK_VERSION)
-        sdk.language = attrs.get(resources.TELEMETRY_SDK_LANGUAGE)
-        return sdk
-
-    def _translate_kind(
-        self, data: trace_api.SpanKind
-    ) -> trace_span_pb2.TraceSpan.SpanKind:
-        if data == trace_api.SpanKind.INTERNAL:
-            return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_INTERNAL
-        elif data == trace_api.SpanKind.CLIENT:
-            return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_CLIENT
-        elif data == trace_api.SpanKind.SERVER:
-            return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_SERVER
-        return trace_span_pb2.TraceSpan.SpanKind.SPAN_KIND_UNSPECIFIED
-
-    def _translate_span(
-        self, data: trace.ReadableSpan
-    ) -> trace_span_pb2.TraceSpan:
-        span = trace_span_pb2.TraceSpan()
-        span.name = data.name
-        span.context.MergeFrom(self._translate_context(data.get_span_context()))
-
-        if data.parent is not None:
-            if isinstance(data.parent, trace_api.Span):
-                ctx = data.parent.context
-                span.parent_span_id = (
-                    f"0x{trace_api.format_span_id(ctx.span_id)}"
-                )
-            elif isinstance(data.parent, trace_api.SpanContext):
-                span.parent_span_id = (
-                    f"0x{trace_api.format_span_id(data.parent.span_id)}"
-                )
-
-        span.start_time_millis = int(data.start_time / 1e6)
-        span.end_time_millis = int(data.end_time / 1e6)
-        span.span_kind = self._translate_kind(data.kind)
-        span.instrumentation_scope.MergeFrom(
-            self._translate_instrumentation_scope(data)
-        )
-        span.events.extend(self._translate_events(data))
-        span.links.extend(self._translate_links(data))
-        span.attributes.MergeFrom(self._translate_span_attributes(data))
-        span.status.MergeFrom(self._translate_status(data))
-        span.resource.MergeFrom(self._translate_resource(data))
-        span.telemetry_sdk.MergeFrom(self._translate_sdk(data))
-
-        return span
-
-    def _export_batch(self, timeout: Optional[int] = None) -> bool:
-        """Export the spans to clearcut via http api."""
-
-        spans = self._queue[: self._max_queue_size]
-        self._queue = self._queue[self._max_queue_size :]
-
-        while True:
-            wait_delta = self._next_request_dt - datetime.datetime.now()
-            wait_time = wait_delta.total_seconds()
-
-            # TODO(anujjamwal): Fix this logic.
-            # Drop the packets if wait time is more than threshold.
-            if wait_time > self._max_wait_secs:
-                logging.warning(
-                    "dropping %d spans for large wait: %d",
-                    len(spans),
-                    wait_time,
-                )
-                return True
-
-            if wait_time > 0:
-                time.sleep(wait_time)
-                continue
-
-            logrequest = self._prepare_request_body(spans)
-
-            req = urllib.request.Request(
-                self._endpoint,
-                data=logrequest.SerializeToString(),
-                method="POST",
-            )
-            logresponse = clientanalytics_pb2.LogResponse()
-
-            try:
-                with urllib.request.urlopen(
-                    req, timeout=timeout or self._timeout
-                ) as f:
-                    logresponse.ParseFromString(f.read())
-            except urllib.error.URLError as e:
-                # It is expected that child Pids in build_image which call
-                # sys.exit do not have network re-enabled in that namespace, so
-                # for now, log this error at the debug level.
-                logging.debug(e)
-                return False
-            except proto_msg.DecodeError as e:
-                logging.warning("could not decode data into proto: %s", e)
-                return False
-
-            now = datetime.datetime.now()
-            delta = datetime.timedelta(
-                milliseconds=logresponse.next_request_wait_millis
-            )
-            self._next_request_dt = now + delta
-            return True
-
-    def _prepare_request_body(self, spans) -> clientanalytics_pb2.LogRequest:
-        log_request = clientanalytics_pb2.LogRequest()
-        log_request.request_time_ms = int(time.time() * 1000)
-        log_request.client_info.client_type = _CLIENT_TYPE
-        log_request.log_source = self._log_source
-
-        for span in spans:
-            log_event = log_request.log_event.add()
-            log_event.event_time_ms = int(time.time() * 1000)
-            log_event.source_extension = span.SerializeToString()
-
-        return log_request
diff --git a/utils/telemetry/exporter_unittest.py b/utils/telemetry/exporter_unittest.py
deleted file mode 100644
index 7d9c06b..0000000
--- a/utils/telemetry/exporter_unittest.py
+++ /dev/null
@@ -1,265 +0,0 @@
-# Copyright 2023 The ChromiumOS Authors
-# Use of this source code is governed by a BSD-style license that can be
-# found in the LICENSE file.
-
-"""Unittests for SpanExporter classes."""
-
-import datetime
-import re
-import time
-import urllib.request
-
-from chromite.third_party.opentelemetry.sdk import trace
-from chromite.third_party.opentelemetry.sdk.trace import export
-
-from chromite.api.gen.chromite.telemetry import clientanalytics_pb2
-from chromite.api.gen.chromite.telemetry import trace_span_pb2
-from chromite.utils.telemetry import exporter
-from chromite.utils.telemetry import utils
-
-
-class MockResponse:
-    """Mock requests.Response."""
-
-    def __init__(self, status, text) -> None:
-        self._status = status
-        self._text = text
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, *args) -> None:
-        pass
-
-    def read(self):
-        return self._text
-
-
-tracer = trace.TracerProvider().get_tracer(__name__)
-
-
-def test_anonymizing_filter_to_redact_info_from_msg() -> None:
-    """Test AnonymizingFilter to apply the passed anonymizer to msg."""
-    msg = trace_span_pb2.TraceSpan()
-    msg.name = "log-user-user1234"
-
-    anonymizer = utils.Anonymizer([(re.escape("user1234"), "<user>")])
-    f = exporter.AnonymizingFilter(anonymizer)
-
-    filtered_msg = f(msg)
-    assert filtered_msg.name == "log-user-<user>"
-
-
-def test_otel_span_translation(monkeypatch) -> None:
-    """Test ClearcutSpanExporter to translate otel spans to TraceSpan."""
-    requests = []
-
-    def mock_urlopen(request, timeout=0):
-        requests.append((request, timeout))
-        resp = clientanalytics_pb2.LogResponse()
-        resp.next_request_wait_millis = 1
-        body = resp.SerializeToString()
-        return MockResponse(200, body)
-
-    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
-
-    span = tracer.start_span("name")
-    span.end()
-
-    e = exporter.ClearcutSpanExporter(max_queue_size=1)
-
-    assert e.export([span]) == export.SpanExportResult.SUCCESS
-    req, _ = requests[0]
-    log_request = clientanalytics_pb2.LogRequest()
-    log_request.ParseFromString(req.data)
-
-    assert log_request.request_time_ms <= int(time.time() * 1000)
-    assert len(log_request.log_event) == 1
-
-    # The following constants are defined in chromite.utils.telemetry.exporter
-    # as _CLIENT_TYPE and _LOG_SOURCE respectively.
-    assert log_request.client_info.client_type == 33
-    assert log_request.log_source == 2044
-
-    tspan = trace_span_pb2.TraceSpan()
-    tspan.ParseFromString(log_request.log_event[0].source_extension)
-
-    assert tspan.name == span.name
-    assert tspan.start_time_millis == int(span.start_time / 1e6)
-    assert tspan.end_time_millis == int(span.end_time / 1e6)
-
-
-def test_otel_span_translation_with_anonymization(monkeypatch) -> None:
-    """Test ClearcutSpanExporter to anonymize spans to before export."""
-    requests = []
-
-    def mock_urlopen(request, timeout=0):
-        requests.append((request, timeout))
-        resp = clientanalytics_pb2.LogResponse()
-        resp.next_request_wait_millis = 1
-        body = resp.SerializeToString()
-        return MockResponse(200, body)
-
-    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
-
-    span = tracer.start_span("span-user4321")
-    span.set_attributes({"username": "user4321"})
-    span.add_event("event-for-user4321")
-    span.end()
-
-    anonymizer = utils.Anonymizer([(re.escape("user4321"), "<user>")])
-    f = exporter.AnonymizingFilter(anonymizer)
-    e = exporter.ClearcutSpanExporter(prefilter=f, max_queue_size=1)
-
-    assert e.export([span]) == export.SpanExportResult.SUCCESS
-    req, _ = requests[0]
-    log_request = clientanalytics_pb2.LogRequest()
-    log_request.ParseFromString(req.data)
-
-    tspan = trace_span_pb2.TraceSpan()
-    tspan.ParseFromString(log_request.log_event[0].source_extension)
-
-    assert tspan.name == "span-<user>"
-    assert tspan.events[0].name == "event-for-<user>"
-    assert tspan.attributes["username"] == "<user>"
-
-
-def test_export_to_http_api(monkeypatch) -> None:
-    """Test ClearcutSpanExporter to export spans over http."""
-    requests = []
-
-    def mock_urlopen(request, timeout=0):
-        requests.append((request, timeout))
-        resp = clientanalytics_pb2.LogResponse()
-        resp.next_request_wait_millis = 1
-        body = resp.SerializeToString()
-        return MockResponse(200, body)
-
-    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
-
-    span = tracer.start_span("name")
-    span.end()
-    endpoint = "http://domain.com/path"
-
-    e = exporter.ClearcutSpanExporter(
-        endpoint=endpoint, timeout=7, max_queue_size=1
-    )
-
-    assert e.export([span])
-    req, timeout = requests[0]
-    assert req.full_url == endpoint
-    assert timeout == 7
-
-
-def test_export_to_http_api_throttle(monkeypatch) -> None:
-    """Test ClearcutSpanExporter to throttle based on prev response."""
-    mock_open_times = []
-
-    # pylint: disable=unused-argument
-    def mock_urlopen(request, timeout=0):
-        nonlocal mock_open_times
-        mock_open_times.append(datetime.datetime.now())
-        resp = clientanalytics_pb2.LogResponse()
-        resp.next_request_wait_millis = 1000
-        body = resp.SerializeToString()
-        return MockResponse(200, body)
-
-    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
-
-    span = tracer.start_span("name")
-    span.end()
-
-    e = exporter.ClearcutSpanExporter(max_queue_size=1)
-
-    assert e.export([span])
-    assert e.export([span])
-
-    # We've called export() on the same exporter instance twice, so we expect
-    # the following things to be true:
-    #   1. The request.urlopen() function has been called exactly twice, and
-    #   2. The calls to urlopen() are more than 1000 ms apart (due to the
-    #      value in the mock_urlopen response).
-    # The mock_open_times list is a proxy for observing this behavior directly.
-    assert len(mock_open_times) == 2
-    assert (mock_open_times[1] - mock_open_times[0]).total_seconds() > 1
-
-
-def test_export_to_drop_spans_if_wait_more_than_threshold(monkeypatch) -> None:
-    """Test ClearcutSpanExporter to drop span if wait is more than threshold."""
-    mock_open_times = []
-
-    # pylint: disable=unused-argument
-    def mock_urlopen(request, timeout=0):
-        nonlocal mock_open_times
-        mock_open_times.append(datetime.datetime.now())
-        resp = clientanalytics_pb2.LogResponse()
-        resp.next_request_wait_millis = 900000
-        body = resp.SerializeToString()
-        return MockResponse(200, body)
-
-    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
-
-    span = tracer.start_span("name")
-    span.end()
-
-    e = exporter.ClearcutSpanExporter(max_queue_size=1)
-
-    assert e.export([span])
-    assert e.export([span])
-
-    # We've called export() on the same exporter instance twice, so we expect
-    # the following things to be true:
-    #   1. The request.urlopen() function has been called exactly once
-    assert len(mock_open_times) == 1
-
-
-def test_flush_to_clear_export_queue_to_http_api(monkeypatch) -> None:
-    """Test ClearcutSpanExporter to export spans on flush."""
-    requests = []
-
-    def mock_urlopen(request, timeout=0):
-        requests.append((request, timeout))
-        resp = clientanalytics_pb2.LogResponse()
-        resp.next_request_wait_millis = 1
-        body = resp.SerializeToString()
-        return MockResponse(200, body)
-
-    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
-
-    span = tracer.start_span("name")
-    span.end()
-
-    e = exporter.ClearcutSpanExporter(max_queue_size=3)
-
-    assert e.export([span])
-    assert e.export([span])
-    assert len(requests) == 0
-
-    assert e.force_flush()
-    assert len(requests) == 1
-
-
-def test_shutdown_to_clear_export_queue_to_http_api(monkeypatch) -> None:
-    """Test ClearcutSpanExporter to export spans on shutdown."""
-    requests = []
-
-    def mock_urlopen(request, timeout=0):
-        requests.append((request, timeout))
-        resp = clientanalytics_pb2.LogResponse()
-        resp.next_request_wait_millis = 1
-        body = resp.SerializeToString()
-        return MockResponse(200, body)
-
-    monkeypatch.setattr(urllib.request, "urlopen", mock_urlopen)
-
-    span = tracer.start_span("name")
-    span.end()
-
-    e = exporter.ClearcutSpanExporter(max_queue_size=3)
-
-    assert e.export([span])
-    assert e.export([span])
-    assert len(requests) == 0
-
-    e.shutdown()
-    assert len(requests) == 1