Introduction
This post is some quick notes on using ZIO and zio-telemetry to implement OpenTelemetry distributed tracing for Scala applications.
The source code is available here.
This is not an introduction to any of these technologies, but here are a few good reads:
- NewRelic's introduction to Distributed Tracing.
- Lightstep's concise summary for OpenTelemetry.
Initial implementation
For demonstration purpose, we will perform manual instrumentation on a modified version of the zio-grpc's helloworld example, in which we incorporate both gRPC and HTTP communications:
-
Original:
hello-clientsends aHelloRequestwithnamex andhello-serverreturns aHelloResponsewithmessageHello, x. -
Modified: in addition to the original behavior, client sends an optional integer field
guessand server performs an HTTP request to HTTPBin based on its value.
Add the new flag guess:
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
google.protobuf.Int32Value guess = 2;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
Add zio-grpc dependency
resolvers += Resolver.sonatypeRepo("snapshots")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.3")
addSbtPlugin("com.thesamet" % "sbt-protoc" % "1.0.4")
val zioGrpcVersion = "0.5.1+12-93cdbe22-SNAPSHOT"
libraryDependencies ++= Seq(
"com.thesamet.scalapb.zio-grpc" %% "zio-grpc-codegen" % zioGrpcVersion,
"com.thesamet.scalapb" %% "compilerplugin" % "0.11.5"
)
Set up build.sbt
- Generate Scala code from
helloworld.proto. - Depend on sttp for HTTP client.
val grpcVersion = "1.41.0"
val sttpVersion = "3.3.15"
val scalaPBRuntime = "com.thesamet.scalapb" %% "scalapb-runtime-grpc" % scalapb.compiler.Version.scalapbVersion
val grpcRuntimeDeps = Seq(
"io.grpc" % "grpc-netty" % grpcVersion,
scalaPBRuntime,
scalaPBRuntime % "protobuf"
)
val sttpZioDeps = Seq(
"com.softwaremill.sttp.client3" %% "async-http-client-backend-zio" % sttpVersion
)
lazy val root = Project("opentelemetry-distributed-tracing-zio", file(".")).aggregate(zio)
lazy val zio = commonProject("zio").settings(
Compile / PB.targets := Seq(
scalapb.gen(grpc = true) -> (Compile / sourceManaged).value,
scalapb.zio_grpc.ZioCodeGenerator -> (Compile / sourceManaged).value
),
libraryDependencies ++= grpcRuntimeDeps ++ sttpZioDeps
)
Client implementation
- Create a gRPC client pointing to localhost:9000.
- Send a single
HelloRequest. - Send 5
HelloRequests in parallel. - Send a single
HelloRequestwith an invalid guess. - Print "Done" and exit.
object ZClient extends zio.App {
private val clientLayer = GreeterClient.live(
ZManagedChannel(
ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext()
)
)
private val singleHello = GreeterClient.sayHello(HelloRequest("World"))
private val multipleHellos = ZIO.collectAllParN(5)(
List(
GreeterClient.sayHello(HelloRequest("1", Some(1))),
GreeterClient.sayHello(HelloRequest("2", Some(2))),
GreeterClient.sayHello(HelloRequest("3", Some(3))),
GreeterClient.sayHello(HelloRequest("4", Some(4))),
GreeterClient.sayHello(HelloRequest("5", Some(5)))
)
)
private val invalidHello = GreeterClient.sayHello(HelloRequest("Invalid", Some(-1))).ignore
private def myAppLogic =
singleHello *> multipleHellos *> invalidHello *> putStrLn("Done")
def run(args: List[String]): URIO[ZEnv, ExitCode] =
myAppLogic.provideCustomLayer(clientLayer).exitCode
}
Server implementation
- Fail the request if
guessis less than 0. - Based on the value of
guess, delay for some time and then send a request to HTTPBin.
type ZGreeterEnv = Clock with Random with SttpClient
object ZGreeterImpl extends RGreeter[ZGreeterEnv] {
def sayHello(request: HelloRequest): ZIO[ZGreeterEnv, Status, HelloReply] = {
val guess = request.guess.getOrElse(0)
for {
_ <- ZIO.fail(Status.INVALID_ARGUMENT).when(guess < 0)
code <- ???
delayMs = ???
_ <- httpRequest(code)
.delay(delayMs.millis)
.mapError(ex => Status.INTERNAL.withCause(ex))
} yield HelloReply(s"Hello, ${request.name}")
}
def httpRequest(code: Int): RIO[SttpClient, Unit] =
send(basicRequest.get(uri"https://httpbin.org/status/$code")).unit
}
Run it
- To run the server:
$ sbt "zio/runMain com.github.tuleism.ZServer"
[info] running (fork) com.github.tuleism.ZServer
[info] Server is running. Press Ctrl-C to stop.
- To run the client:
$ sbt "zio/runMain com.github.tuleism.ZClient"
[info] running (fork) com.github.tuleism.ZClient
[info] Done
[success] Total time: 12 s
At this point, we only know that it takes roughly 12 seconds for the client to initialize and finish its work.
Let's add distributed tracing to gain more insights into this.
Common tracing requirements
- For both client and server, we need to acquire a Tracer, an object responsible for creating and managing Spans.
- Tracing data is sent to Jaeger, which acts as a standalone collector.
Add new dependencies
- zio-telemetry's OpenTelemetry module.
- We also depend on zio-config to read tracing config from file and zio-magic to ease ZLayer wiring.
val openTelemetryVersion = "1.6.0"
val zioConfigVersion = "1.0.10"
val zioMagicVersion = "0.3.9"
val zioTelemetryVersion = "0.8.2"
val openTelemetryDeps = Seq(
"io.opentelemetry" % "opentelemetry-exporter-jaeger" % openTelemetryVersion,
"io.opentelemetry" % "opentelemetry-sdk" % openTelemetryVersion,
"io.opentelemetry" % "opentelemetry-extension-noop-api" % s"$openTelemetryVersion-alpha"
)
val zioConfigDeps = Seq(
"dev.zio" %% "zio-config" % zioConfigVersion,
"dev.zio" %% "zio-config-magnolia" % zioConfigVersion,
"dev.zio" %% "zio-config-typesafe" % zioConfigVersion
)
val zioMagicDeps = Seq(
"io.github.kitlangton" %% "zio-magic" % zioMagicVersion
)
val zioTelemetryDeps = Seq(
"dev.zio" %% "zio-opentelemetry" % zioTelemetryVersion,
"com.softwaremill.sttp.client3" %% "zio-telemetry-opentelemetry-backend" % sttpVersion
)
Add a config layer
tracing {
enable = false
enable = ${?TRACING_ENABLE}
endpoint = "http://127.0.0.1:14250"
endpoint = ${?JAEGER_ENDPOINT}
}
case class AppConfig(tracing: TracingConfig)
case class TracingConfig(enable: Boolean, endpoint: String)
object AppConfig {
private val configDescriptor = descriptor[AppConfig]
val live: Layer[ReadError[String], Has[AppConfig]] = TypesafeConfig.fromDefaultLoader(configDescriptor)
}
Add a Tracer layer
- Depend on the configuration, we either create a noop
Traceror one that sends data to Jaeger. - Once we have it, we can construct a
Tracinglayer, which give us access to many useful operations in zio-telemetry.
object ZTracer {
private val InstrumentationName = "com.github.tuleism"
private def managed(serviceName: String, endpoint: String) = {
val resource = Resource.builder().put(ResourceAttributes.SERVICE_NAME, serviceName).build()
for {
spanExporter <- ZManaged.fromAutoCloseable(
Task(JaegerGrpcSpanExporter.builder().setEndpoint(endpoint).build())
)
spanProcessor <- ZManaged.fromAutoCloseable(UIO(SimpleSpanProcessor.create(spanExporter)))
tracerProvider <- UIO(
SdkTracerProvider.builder().addSpanProcessor(spanProcessor).setResource(resource).build()
).toManaged_
openTelemetry <- UIO(OpenTelemetrySdk.builder().setTracerProvider(tracerProvider).build()).toManaged_
tracer <- UIO(openTelemetry.getTracer(InstrumentationName)).toManaged_
} yield tracer
}
def live(serviceName: String): RLayer[Has[TracingConfig], Has[Tracer]] =
(
for {
config <- ZIO.service[TracingConfig].toManaged_
tracer <- if (!config.enable) {
Task(NoopOpenTelemetry.getInstance().getTracer(InstrumentationName)).toManaged_
} else {
managed(serviceName, config.endpoint)
}
} yield tracer
).toLayer
}
New server
Instrument the HTTP client
- Use out-of-the-box sttp backend.
- We also add additional HTTP specific attributes according to the OpenTelemetry's semantic convention.
object SttpTracing {
private val wrapper = new ZioTelemetryOpenTelemetryTracer {
def before[T](request: Request[T, Nothing]): RIO[Tracing, Unit] =
Tracing.setAttribute(SemanticAttributes.HTTP_METHOD.getKey, request.method.method) *>
Tracing.setAttribute(SemanticAttributes.HTTP_URL.getKey, request.uri.toString()) *>
ZIO.unit
def after[T](response: Response[T]): RIO[Tracing, Unit] =
Tracing.setAttribute(SemanticAttributes.HTTP_STATUS_CODE.getKey, response.code.code) *>
ZIO.unit
}
val live = AsyncHttpClientZioBackend.layer().flatMap { hasBackend =>
ZIO
.service[Tracing.Service]
.map { tracing =>
ZioTelemetryOpenTelemetryBackend(hasBackend.get, tracing, wrapper)
}
.toLayer
}
}
Instrument the gRPC server
We can add Tracing without changing our server implementation with a ZTransform. For each request:
- We use zio-telemetry's spanFrom, which extracts the propagated context (through gRPC Metadata, using W3C Trace Context format) and starts a new child
Spanright after. - We have access to a RequestContext and thus the full method name used for
Span's name. - We also add additional gRPC specific attributes according to the OpenTelemetry's semantic convention.
object GrpcTracing {
private val propagator: TextMapPropagator = W3CTraceContextPropagator.getInstance()
private val metadataGetter: TextMapGetter[Metadata] = new TextMapGetter[Metadata] {
override def keys(carrier: Metadata): java.lang.Iterable[String] =
carrier.keys()
override def get(carrier: Metadata, key: String): String =
carrier.get(
Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER)
)
}
private def withSemanticAttributes[R, A](effect: ZIO[R, Status, A]): ZIO[Tracing with R, Status, A] =
Tracing.setAttribute(SemanticAttributes.RPC_SYSTEM.getKey, "grpc") *>
effect
.tapBoth(
status =>
Tracing.setAttribute(
SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey,
status.getCode.value()
),
_ =>
Tracing.setAttribute(SemanticAttributes.RPC_GRPC_STATUS_CODE.getKey, Status.OK.getCode.value())
)
def serverTracingTransform[R]: ZTransform[R, Status, R with Tracing with Has[RequestContext]] =
new ZTransform[R, Status, R with Tracing with Has[RequestContext]] {
def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing with Has[RequestContext], Status, A] =
for {
rc <- ZIO.service[RequestContext]
metadata <- rc.metadata.wrap(identity)
result <- withSemanticAttributes(io)
.spanFrom(
propagator,
metadata,
metadataGetter,
rc.methodDescriptor.getFullMethodName,
SpanKind.SERVER,
{ case _ => StatusCode.ERROR }
)
} yield result
def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing with Has[RequestContext], Status, A] =
???
}
}
Update Server Main
- Add required layers for Tracing.
- Transform the original
ZGreeterImpl.
import zio.magic._
object ZServer extends ServerMain {
private val requirements =
ZLayer
.wire[ZEnv with ZGreeterEnv](
ZEnv.live,
AppConfig.live.narrow(_.tracing),
ZTracer.live("hello-server"),
Tracing.live,
SttpTracing.live
)
.orDie
def services: ServiceList[Any] =
ServiceList
.add(ZGreeterImpl.transform[ZGreeterEnv, Has[RequestContext]](GrpcTracing.serverTracingTransform))
.provideLayer(requirements)
}
New client
Inject current context into gRPC Metadata for context propagation
object GrpcTracing {
...
private val metadataSetter: TextMapSetter[Metadata] = (carrier, key, value) =>
carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value)
val contextPropagationClientInterceptor: ZClientInterceptor[Tracing] = ZClientInterceptor.headersUpdater {
(_, _, metadata) =>
metadata.wrapM(Tracing.inject(propagator, _, metadataSetter))
}
...
}
object ZClient extends zio.App {
private val clientLayer = GreeterClient.live(
ZManagedChannel(
ManagedChannelBuilder.forAddress("localhost", 9000).usePlaintext(),
Seq(GrpcTracing.contextPropagationClientInterceptor)
)
)
...
}
Start a Span for each request
- Use
ZTransformto record the relevant gRPC attributes.
object GrpcTracing {
...
def clientTracingTransform[R]: ZTransform[R, Status, R with Tracing] =
new ZTransform[R, Status, R with Tracing] {
def effect[A](io: ZIO[R, Status, A]): ZIO[R with Tracing, Status, A] = withSemanticAttributes(io)
def stream[A](io: ZStream[R, Status, A]): ZStream[R with Tracing, Status, A] = ???
}
}
- Unlike the server, we don't have access to a
RequestContextobject, so we have to set the method name manually. - We also start additional
Spans.
object ZClient extends zio.App {
...
private def errorToStatusCode[E]: PartialFunction[E, StatusCode] = { case _ => StatusCode.ERROR }
private def sayHello(request: HelloRequest) =
GreeterClient
.sayHello(request)
.span(
GreeterGrpc.METHOD_SAY_HELLO.getFullMethodName,
SpanKind.CLIENT,
errorToStatusCode
)
private val singleHello = sayHello(HelloRequest("World"))
.span("singleHello", toErrorStatus = errorToStatusCode)
private val multipleHellos = ZIO
.collectAllParN(5)(
List(
sayHello(HelloRequest("1", Some(1))),
sayHello(HelloRequest("2", Some(2))),
sayHello(HelloRequest("3", Some(3))),
sayHello(HelloRequest("4", Some(4))),
sayHello(HelloRequest("5", Some(5)))
)
)
.span("multipleHellos", toErrorStatus = errorToStatusCode)
private val invalidHello = sayHello(HelloRequest("Invalid", Some(-1))).ignore
.span("invalidHello", toErrorStatus = errorToStatusCode)
}
Add required layers
object ZClient extends zio.App {
...
private val requirements = ZLayer
.wire[ZEnv with Tracing](
ZEnv.live,
AppConfig.live.narrow(_.tracing),
ZTracer.live("hello-client"),
Tracing.live
) >+> clientLayer
def run(args: List[String]): URIO[ZEnv, ExitCode] =
myAppLogic.provideCustomLayer(requirements).exitCode
}
Showtime
Run Jaeger through Docker
- Tracing data can be sent to port 14250.
- We can view Jaeger UI at http://localhost:16686.
$ docker run --rm --name jaeger \
-p 16686:16686 \
-p 14250:14250 \
jaegertracing/all-in-one:1.25
Start the server
$ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZServer"
[info] running (fork) com.github.tuleism.ZServer
[info] Server is running. Press Ctrl-C to stop.
Start the client
$ TRACING_ENABLE=true sbt "zio/runMain com.github.tuleism.ZClient"
[info] running (fork) com.github.tuleism.ZClient
[info] Done
[success] Total time: 12 s
Distributed Tracing in action!
- Now we can see the details for
multipleHellos:
- And which
guessis causing the longest delay.
Integration with Logging
- Let's add tracing context into log messages following the specification.
- We're going to use izumi logstage, our favorite logging library.
- See the diffs.
Add logging dependency
val izumiVersion = "1.0.8"
val loggingDeps = Seq(
"io.7mind.izumi" %% "logstage-core" % izumiVersion,
"io.7mind.izumi" %% "logstage-adapter-slf4j" % izumiVersion
)
Setup logging
- Add
trace_id,span_idto logging context if current trace context is valid.
object Logging {
private def baseLogger = IzLogger()
val live: ZLayer[Has[Tracing.Service], Nothing, Has[LogZIO.Service]] =
(
for {
tracing <- ZIO.service[Tracing.Service]
} yield LogZIO.withDynamicContext(baseLogger)(
Tracing.getCurrentSpanContext
.map(spanContext =>
if (spanContext.isValid)
CustomContext(
"trace_id" -> spanContext.getTraceId,
"span_id" -> spanContext.getSpanId,
"trace_flags" -> spanContext.getTraceFlags.asHex()
)
else
CustomContext.empty
)
.provide(Has(tracing))
)
).toLayer
}
Add a few log messages
- E.g for
singleHello.
object ZClient extends zio.App {
...
private val singleHello = (
for {
_ <- log.info("singleHello")
_ <- sayHello(HelloRequest("World"))
} yield ()
).span("singleHello", toErrorStatus = errorToStatusCode)
}
Sample Logs
[info] running (fork) com.github.tuleism.ZClient
[info] I 2021-11-01T22:59:10.881 (ZClient.scala:37) …tuleism.ZClient.singleHello [24:zio-default-async-11] trace_id=9c8a7ebb87381293bc8937a5f7673cb9, span_id=cb7c9a440472e1be, trace_flags=01 singleHello
[info] I 2021-11-01T22:59:14.064 (ZClient.scala:44) …eism.ZClient.multipleHellos [21:zio-default-async-8 ] trace_id=fe405246fbaa5f876c19f14fa649a99f, span_id=bef19494bef4106e, trace_flags=01 multipleHellos
[info] I 2021-11-01T22:59:18.171 (ZClient.scala:60) …uleism.ZClient.invalidHello [26:zio-default-async-13] trace_id=be5ccd425e0cfb01fd97274abd0c4d72, span_id=ea6499fb9a7c8d28, trace_flags=01 invalidHello
[info] I 2021-11-01T22:59:18.272 (ZClient.scala:66) ….tuleism.ZClient.myAppLogic [15:zio-default-async-2 ] Done
[success] Total time: 12 s
Extra notes
- If we receive an HTTP 5xx response, we should set the
Spanstatus to error according to the semantic convention. However, it is currently not possible withzio-telemetry. - We need a better way to implement tracing for
zio-grpcclient.




Top comments (0)