Project Name | Stars | Downloads | Repos Using This | Packages Using This | Most Recent Commit | Total Releases | Latest Release | Open Issues | License | Language |
---|---|---|---|---|---|---|---|---|---|---|
Zipkin Js | 484 | 268 | 252 | 2 years ago | 59 | March 01, 2021 | 69 | apache-2.0 | JavaScript | |
Zipkin instrumentation for Node.js and browsers | ||||||||||
Opentelemetry Ext Js | 143 | 6 | 22 days ago | 57 | July 11, 2022 | 18 | apache-2.0 | TypeScript | ||
js extensions for the open-telemetry project | ||||||||||
Java Kafka Client | 102 | 7 | 5 | 2 years ago | 35 | October 28, 2020 | 6 | apache-2.0 | Java | |
OpenTracing Instrumentation for Apache Kafka Client | ||||||||||
Talk Kafka Zipkin | 60 | 2 years ago | 1 | mit | Java | |||||
Demo material from talk about tracing Kafka-based applications with Zipkin | ||||||||||
Post Kafka Opentracing | 16 | 5 years ago | 1 | gpl-3.0 | Java | |||||
Post: Tracing Kafka Applications | ||||||||||
Confluent Kafka Extensions Diagnostics | 3 | 10 days ago | mit | C# | ||||||
Instrumentation of the Confluent.Kafka library | ||||||||||
Phobos_prometheus | 3 | 4 years ago | 1 | apache-2.0 | Ruby | |||||
Gather and scrape Phobos metrics for Prometheus | ||||||||||
Kafka Manager Consumer Lag Exporter | 3 | 4 years ago | Go | |||||||
Kafka Manager Consumer Lag Exporter | ||||||||||
Kamon Kafka | 2 | 3 years ago | 1 | other | Scala | |||||
Kamon Integration for Kafka(Producers|Consumers|Streams) | ||||||||||
Ruby Kafka Instrumentation | 1 | 3 years ago | 2 | mit | Ruby | |||||
Open Tracing instrumentation for the ruby-kafka gem |
OpenTracing instrumentation for Apache Kafka Client.
Two solutions are provided:
pom.xml
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-client</artifactId>
<version>VERSION</version>
</dependency>
pom.xml
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-streams</artifactId>
<version>VERSION</version>
</dependency>
pom.xml
<dependency>
<groupId>io.opentracing.contrib</groupId>
<artifactId>opentracing-kafka-spring</artifactId>
<version>VERSION</version>
</dependency>
// Instantiate tracer
Tracer tracer = ...
// Optionally register tracer with GlobalTracer
GlobalTracer.register(tracer);
// Instantiate KafkaProducer
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
//Decorate KafkaProducer with TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
tracer);
// Send
tracingProducer.send(...);
// Instantiate KafkaConsumer
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
// Decorate KafkaConsumer with TracingKafkaConsumer
TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
tracer);
//Subscribe
tracingConsumer.subscribe(Collections.singletonList("messages"));
// Get records
ConsumerRecords<Integer, String> records = tracingConsumer.poll(1000);
// To retrieve SpanContext from polled record (Consumer side)
ConsumerRecord<Integer, String> record = ...
SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
The decorator-based solution includes support for custom span names by passing in a BiFunction object as an additional argument to the TracingKafkaConsumer or TracingKafkaProducer constructors, either one of the provided BiFunctions or your own custom one.
// Create BiFunction for the KafkaProducer that operates on
// (String operationName, ProducerRecord consumerRecord) and
// returns a String to be used as the name
BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
(operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";
// Instantiate KafkaProducer
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
//Decorate KafkaProducer with TracingKafkaProducer
TracingKafkaProducer<Integer, String> tracingProducer = new TracingKafkaProducer<>(producer,
tracer,
producerSpanNameProvider);
// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.
// Create BiFunction for the KafkaConsumer that operates on
// (String operationName, ConsumerRecord consumerRecord) and
// returns a String to be used as the name
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
(operationName, consumerRecord) -> operationName.toUpperCase();
// Instantiate KafkaConsumer
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
// Decorate KafkaConsumer with TracingKafkaConsumer, passing in the consumerSpanNameProvider BiFunction
TracingKafkaConsumer<Integer, String> tracingConsumer = new TracingKafkaConsumer<>(consumer,
tracer,
consumerSpanNameProvider);
// Spans created by the tracingConsumer will now have the capitalized operation name as the span name.
// "receive" -> "RECEIVE"
// Register tracer with GlobalTracer:
GlobalTracer.register(tracer);
// Add TracingProducerInterceptor to sender properties:
senderProps.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG,
TracingProducerInterceptor.class.getName());
// Instantiate KafkaProducer
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
// Send
producer.send(...);
// Add TracingConsumerInterceptor to consumer properties:
consumerProps.put(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG,
TracingConsumerInterceptor.class.getName());
// Instantiate KafkaConsumer
KafkaConsumer<Integer, String> consumer = new KafkaConsumer<>(consumerProps);
//Subscribe
consumer.subscribe(Collections.singletonList("messages"));
// Get records
ConsumerRecords<Integer, String> records = consumer.poll(1000);
// To retrieve SpanContext from polled record (Consumer side)
ConsumerRecord<Integer, String> record = ...
SpanContext spanContext = TracingKafkaUtils.extractSpanContext(record.headers(), tracer);
// Instantiate TracingKafkaClientSupplier
KafkaClientSupplier supplier = new TracingKafkaClientSupplier(tracer);
// Provide supplier to KafkaStreams
KafkaStreams streams = new KafkaStreams(builder.build(), new StreamsConfig(config), supplier);
streams.start();
// Declare Tracer bean
@Bean
public Tracer tracer() {
return ...
}
// Decorate ConsumerFactory with TracingConsumerFactory
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps()), tracer());
}
// Decorate ProducerFactory with TracingProducerFactory
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(producerProps()), tracer());
}
// Use decorated ProducerFactory in KafkaTemplate
@Bean
public KafkaTemplate<Integer, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
// Use an aspect to decorate @KafkaListeners
@Bean
public TracingKafkaAspect tracingKafkaAspect() {
return new TracingKafkaAspect(tracer());
}
The Spring Kafka factory implementations include support for custom span names by passing in a BiFunction object as an additional argument to the TracingConsumerFactory or TracingProducerFactory constructors, either one of the provided BiFunctions or your own custom one.
// Create BiFunction for the KafkaProducerFactory that operates on
// (String operationName, ProducerRecord consumerRecord) and
// returns a String to be used as the name
BiFunction<String, ProducerRecord, String> producerSpanNameProvider =
(operationName, producerRecord) -> "CUSTOM_PRODUCER_NAME";
// Decorate ProducerFactory with TracingProducerFactory
@Bean
public ProducerFactory<Integer, String> producerFactory() {
return new TracingProducerFactory<>(new DefaultKafkaProducerFactory<>(producerProps()), tracer());
}
// Spans created by the tracingProducer will now have "CUSTOM_PRODUCER_NAME" as the span name.
// Create BiFunction for the KafkaConsumerFactory that operates on
// (String operationName, ConsumerRecord consumerRecord) and
// returns a String to be used as the name
BiFunction<String, ConsumerRecord, String> consumerSpanNameProvider =
(operationName, consumerRecord) -> operationName.toUpperCase();
// Decorate ConsumerFactory with TracingConsumerFactory
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new TracingConsumerFactory<>(new DefaultKafkaConsumerFactory<>(consumerProps()), tracer());
}
// Consumers produced by the traced consumerFactory
The following BiFunctions are already included in the ClientSpanNameProvider class, with CONSUMER_OPERATION_NAME
and PRODUCER_OPERATION_NAME
being the default should no
spanNameProvider be provided:
CONSUMER_OPERATION_NAME
and PRODUCER_OPERATION_NAME
: Returns the operationName
as the span name ("receive" for Consumer, "send" for producer).CONSUMER_PREFIXED_OPERATION_NAME(String prefix)
and PRODUCER_PREFIXED_OPERATION_NAME(String prefix)
: Returns a String concatenation of prefix
and operatioName
.CONSUMER_TOPIC
and PRODUCER_TOPIC
: Returns the Kafka topic name that the record was pushed to/pulled from (record.topic()
).PREFIXED_CONSUMER_TOPIC(String prefix)
and PREFIXED_PRODUCER_TOPIC(String prefix)
: Returns a String concatenation of prefix
and the Kafka topic name (record.topic()
).CONSUMER_OPERATION_NAME_TOPIC
and PRODUCER_OPERATION_NAME_TOPIC
: Returns "operationName
- record.topic()
".CONSUMER_PREFIXED_OPERATION_NAME_TOPIC(String prefix)
and PRODUCER_PREFIXED_OPERATION_NAME_TOPIC(String prefix)
: Returns a String concatenation of prefix
and "operationName
- record.topic()
".