Skip to content
This repository was archived by the owner on Apr 7, 2025. It is now read-only.

Commit 79b1032

Browse files
authored
Metrics for monitoring endpoints (#24)
* Add metrics for avg latency by partition, avg offset difference, data depth by partition, estimated data depth * Metrics: change histograms to gauges, change offset-dif to depth-bytes * Metrics: fix latency calculation * Metrics: add testing for the metrics * Metrics: fix thread safety issues and remove unnecessary function in MetricsConfig * Improve metric unit tests * Logging reconnectInterval fixed, move OutputFake to test folder
1 parent fa85d46 commit 79b1032

11 files changed

+845
-164
lines changed
Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
/*
2+
* Teragrep Azure Eventhub Reader
3+
* Copyright (C) 2023 Suomen Kanuuna Oy
4+
*
5+
* This program is free software: you can redistribute it and/or modify
6+
* it under the terms of the GNU Affero General Public License as published by
7+
* the Free Software Foundation, either version 3 of the License, or
8+
* (at your option) any later version.
9+
*
10+
* This program is distributed in the hope that it will be useful,
11+
* but WITHOUT ANY WARRANTY; without even the implied warranty of
12+
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13+
* GNU Affero General Public License for more details.
14+
*
15+
* You should have received a copy of the GNU Affero General Public License
16+
* along with this program. If not, see <https://github.com/teragrep/teragrep/blob/main/LICENSE>.
17+
*
18+
*
19+
* Additional permission under GNU Affero General Public License version 3
20+
* section 7
21+
*
22+
* If you modify this Program, or any covered work, by linking or combining it
23+
* with other code, such other code is not for that reason alone subject to any
24+
* of the requirements of the GNU Affero GPL version 3 as long as this Program
25+
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
26+
* modifications.
27+
*
28+
* Supplemented terms under GNU Affero General Public License version 3
29+
* section 7
30+
*
31+
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
32+
* versions must be marked as "Modified version of" The Program.
33+
*
34+
* Names of the licensors and authors may not be used for publicity purposes.
35+
*
36+
* No rights are granted for use of trade names, trademarks, or service marks
37+
* which are in The Program if any.
38+
*
39+
* Licensee must indemnify licensors and authors for any liability that these
40+
* contractual assumptions impose on licensors and authors.
41+
*
42+
* To the extent this program is licensed as part of the Commercial versions of
43+
* Teragrep, the applicable Commercial License may apply to this file if you as
44+
* a licensee so wish it.
45+
*/
46+
package com.teragrep.aer_01;
47+
48+
import com.codahale.metrics.Counter;
49+
import com.codahale.metrics.MetricRegistry;
50+
import com.codahale.metrics.Timer;
51+
import com.teragrep.aer_01.config.RelpConfig;
52+
import com.teragrep.rlp_01.RelpBatch;
53+
import com.teragrep.rlp_01.RelpConnection;
54+
import org.slf4j.Logger;
55+
import org.slf4j.LoggerFactory;
56+
57+
import java.io.IOException;
58+
import java.nio.channels.UnresolvedAddressException;
59+
import java.util.concurrent.TimeoutException;
60+
61+
import static com.codahale.metrics.MetricRegistry.name;
62+
63+
// TODO unify, this is a copy from cfe_35 which is a copy from rlo_10 with FIXES
64+
final class DefaultOutput implements Output {
65+
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultOutput.class);
66+
67+
private final RelpConnection relpConnection;
68+
private final String relpAddress;
69+
private final int relpPort;
70+
private final int reconnectInterval;
71+
72+
// metrics
73+
private final Counter records;
74+
private final Counter bytes;
75+
private final Counter resends;
76+
private final Counter connects;
77+
private final Counter retriedConnects;
78+
private final Timer sendLatency;
79+
private final Timer connectLatency;
80+
81+
82+
DefaultOutput(
83+
String name,
84+
RelpConfig relpConfig,
85+
MetricRegistry metricRegistry) {
86+
this.relpAddress = relpConfig.destinationAddress;
87+
this.relpPort = relpConfig.destinationPort;
88+
this.reconnectInterval = relpConfig.reconnectInterval;
89+
90+
this.relpConnection = new RelpConnection();
91+
this.relpConnection.setConnectionTimeout(relpConfig.connectionTimeout);
92+
this.relpConnection.setReadTimeout(relpConfig.readTimeout);
93+
this.relpConnection.setWriteTimeout(relpConfig.writeTimeout);
94+
95+
this.records = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "records"));
96+
this.bytes = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "bytes"));
97+
this.resends = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "resends"));
98+
this.connects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "connects"));
99+
this.retriedConnects = metricRegistry.counter(name(DefaultOutput.class, "<[" + name + "]>", "retriedConnects"));
100+
this.sendLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "sendLatency"));
101+
this.connectLatency = metricRegistry.timer(name(DefaultOutput.class, "<[" + name + "]>", "connectLatency"));
102+
103+
connect();
104+
}
105+
106+
private void connect() {
107+
boolean connected = false;
108+
while (!connected) {
109+
try (final Timer.Context context = connectLatency.time()) {
110+
connected = this.relpConnection.connect(relpAddress, relpPort);
111+
connects.inc();
112+
} catch (IOException | TimeoutException e) {
113+
LOGGER.error("Exception while connecting to <[{}]>:<[{}]>", relpAddress, relpPort, e);
114+
} catch (UnresolvedAddressException e) {
115+
LOGGER.error("Can't resolve address of target <[{}]>", relpAddress, e);
116+
}
117+
118+
if (!connected) {
119+
try {
120+
Thread.sleep(reconnectInterval);
121+
retriedConnects.inc();
122+
} catch (InterruptedException e) {
123+
LOGGER.warn("Sleep interrupted while waiting for reconnectInterval <[{}]> on <[{}]>:<[{}]>", reconnectInterval, relpAddress, relpPort, e);
124+
}
125+
}
126+
}
127+
}
128+
129+
130+
@Override
131+
public void accept(byte[] syslogMessage) {
132+
try (final Timer.Context context = sendLatency.time()) {
133+
RelpBatch batch = new RelpBatch();
134+
batch.insert(syslogMessage);
135+
136+
boolean allSent = false;
137+
while (!allSent) {
138+
try {
139+
this.relpConnection.commit(batch);
140+
141+
// metrics
142+
// NOTICE these if batch size changes
143+
records.inc(1);
144+
bytes.inc(syslogMessage.length);
145+
146+
} catch (IllegalStateException | IOException | TimeoutException e) {
147+
LOGGER.error("Exception while committing a batch to <[{}]>:<[{}]>", relpAddress, relpPort, e);
148+
}
149+
// Check if everything has been sent, retry and reconnect if not.
150+
if (!batch.verifyTransactionAll()) {
151+
batch.retryAllFailed();
152+
153+
// metrics
154+
// NOTICE this if batch size changes
155+
resends.inc(1);
156+
relpConnection.tearDown();
157+
try {
158+
Thread.sleep(reconnectInterval);
159+
} catch(InterruptedException e) {
160+
throw new RuntimeException(e);
161+
}
162+
connect();
163+
} else {
164+
allSent = true;
165+
}
166+
}
167+
}
168+
}
169+
170+
@Override
171+
public String toString() {
172+
return "DefaultOutput{" +
173+
"relpAddress='" + relpAddress + '\'' +
174+
", relpPort=" + relpPort +
175+
'}';
176+
}
177+
178+
public void close() {
179+
try {
180+
relpConnection.disconnect();
181+
}
182+
catch (IOException | TimeoutException e) {
183+
LOGGER.warn("Exception while disconnecting from <[{}]>:<[{}]>", relpAddress, relpPort, e);
184+
}
185+
finally {
186+
relpConnection.tearDown();
187+
}
188+
}
189+
}

src/main/java/com/teragrep/aer_01/EventContextConsumer.java

Lines changed: 95 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -46,44 +46,80 @@
4646
package com.teragrep.aer_01;
4747

4848
import com.azure.messaging.eventhubs.models.EventContext;
49-
import com.codahale.metrics.MetricRegistry;
49+
import com.codahale.metrics.*;
50+
import com.codahale.metrics.jmx.JmxReporter;
5051
import com.teragrep.aer_01.config.RelpConfig;
5152
import com.teragrep.aer_01.config.SyslogConfig;
5253
import com.teragrep.aer_01.config.source.Sourceable;
5354
import com.teragrep.rlo_14.Facility;
5455
import com.teragrep.rlo_14.SDElement;
5556
import com.teragrep.rlo_14.Severity;
5657
import com.teragrep.rlo_14.SyslogMessage;
58+
import io.prometheus.client.CollectorRegistry;
59+
import io.prometheus.client.dropwizard.DropwizardExports;
60+
import io.prometheus.client.exporter.MetricsServlet;
61+
import org.eclipse.jetty.server.Server;
62+
import org.eclipse.jetty.servlet.ServletContextHandler;
63+
import org.eclipse.jetty.servlet.ServletHolder;
64+
import org.slf4j.LoggerFactory;
5765

5866
import java.net.InetAddress;
5967
import java.net.UnknownHostException;
6068
import java.nio.charset.StandardCharsets;
6169
import java.time.Instant;
6270
import java.util.Map;
6371
import java.util.UUID;
72+
import java.util.concurrent.TimeUnit;
73+
import java.util.concurrent.atomic.AtomicLong;
6474
import java.util.function.Consumer;
6575

76+
import static com.codahale.metrics.MetricRegistry.name;
77+
6678
final class EventContextConsumer implements AutoCloseable, Consumer<EventContext> {
6779

6880
private final Output output;
6981
private final String realHostName;
7082
private final SyslogConfig syslogConfig;
71-
EventContextConsumer(Sourceable configSource) {
72-
RelpConfig relpConfig = new RelpConfig(configSource);
73-
74-
this.output = new Output(
75-
"defaultOutput",
76-
relpConfig.destinationAddress,
77-
relpConfig.destinationPort,
78-
relpConfig.connectionTimeout,
79-
relpConfig.readTimeout,
80-
relpConfig.writeTimeout,
81-
relpConfig.reconnectInterval,
82-
new MetricRegistry()
83+
private final MetricRegistry metricRegistry;
84+
private final JmxReporter jmxReporter;
85+
private final Slf4jReporter slf4jReporter;
86+
private final Server jettyServer;
87+
88+
// metrics
89+
private final AtomicLong records = new AtomicLong();
90+
private final AtomicLong allSize = new AtomicLong();
91+
92+
EventContextConsumer(Sourceable configSource, int prometheusPort) {
93+
this(configSource, new MetricRegistry(), prometheusPort);
94+
}
95+
96+
EventContextConsumer(Sourceable configSource, MetricRegistry metricRegistry, int prometheusPort) {
97+
this(
98+
configSource,
99+
new DefaultOutput("defaultOutput", new RelpConfig(configSource), metricRegistry),
100+
metricRegistry,
101+
prometheusPort
83102
);
103+
}
84104

105+
EventContextConsumer(Sourceable configSource, Output output, MetricRegistry metricRegistry, int prometheusPort) {
106+
this.metricRegistry = metricRegistry;
107+
this.output = output;
85108
this.realHostName = getRealHostName();
86109
this.syslogConfig = new SyslogConfig(configSource);
110+
111+
this.jmxReporter = JmxReporter.forRegistry(metricRegistry).build();
112+
this.slf4jReporter = Slf4jReporter
113+
.forRegistry(metricRegistry)
114+
.outputTo(LoggerFactory.getLogger(EventContextConsumer.class))
115+
.convertRatesTo(TimeUnit.SECONDS)
116+
.convertDurationsTo(TimeUnit.MILLISECONDS)
117+
.build();
118+
jettyServer = new Server(prometheusPort);
119+
startMetrics();
120+
121+
metricRegistry.register(name(EventContextConsumer.class, "estimated-data-depth"),
122+
(Gauge<Double>) () -> (allSize.get() / records.doubleValue()) / records.doubleValue());
87123
}
88124

89125
private String getRealHostName() {
@@ -96,8 +132,50 @@ private String getRealHostName() {
96132
return hostname;
97133
}
98134

135+
private void startMetrics() {
136+
this.jmxReporter.start();
137+
this.slf4jReporter.start(1, TimeUnit.MINUTES);
138+
139+
// prometheus-exporter
140+
CollectorRegistry.defaultRegistry.register(new DropwizardExports(metricRegistry));
141+
142+
ServletContextHandler context = new ServletContextHandler();
143+
context.setContextPath("/");
144+
jettyServer.setHandler(context);
145+
146+
MetricsServlet metricsServlet = new MetricsServlet();
147+
ServletHolder servletHolder = new ServletHolder(metricsServlet);
148+
context.addServlet(servletHolder, "/metrics");
149+
150+
// Start the webserver.
151+
try {
152+
jettyServer.start();
153+
}
154+
catch (Exception e) {
155+
throw new RuntimeException(e);
156+
}
157+
}
158+
99159
@Override
100160
public void accept(EventContext eventContext) {
161+
int messageLength = eventContext.getEventData().getBody().length;
162+
String partitionId = eventContext.getPartitionContext().getPartitionId();
163+
164+
records.incrementAndGet();
165+
allSize.addAndGet(messageLength);
166+
167+
metricRegistry.gauge(name(EventContextConsumer.class, "latency-seconds", partitionId), () -> new Gauge<Long>() {
168+
@Override
169+
public Long getValue() {
170+
return Instant.now().getEpochSecond() - eventContext.getEventData().getEnqueuedTime().getEpochSecond();
171+
}
172+
});
173+
metricRegistry.gauge(name(EventContextConsumer.class, "depth-bytes", partitionId), () -> new Gauge<Long>() {
174+
@Override
175+
public Long getValue() {
176+
return eventContext.getLastEnqueuedEventProperties().getOffset() - eventContext.getEventData().getOffset();
177+
}
178+
});
101179

102180
String eventUuid = eventContext.getEventData().getMessageId();
103181

@@ -136,12 +214,9 @@ public void accept(EventContext eventContext) {
136214
// TODO metrics about these vs last retrieved, these are tracked per partition!:
137215
eventContext.getLastEnqueuedEventProperties().getEnqueuedTime();
138216
eventContext.getLastEnqueuedEventProperties().getSequenceNumber();
139-
eventContext.getLastEnqueuedEventProperties().getOffset();
140217
eventContext.getLastEnqueuedEventProperties().getRetrievalTime(); // null if not retrieved
141218
142219
// TODO compare these to above
143-
eventContext.getEventData().getOffset();
144-
eventContext.getEventData().getEnqueuedTime();
145220
eventContext.getEventData().getPartitionKey();
146221
eventContext.getEventData().getProperties();
147222
*/
@@ -166,7 +241,10 @@ public void accept(EventContext eventContext) {
166241
}
167242

168243
@Override
169-
public void close() {
244+
public void close() throws Exception {
170245
output.close();
246+
slf4jReporter.close();
247+
jmxReporter.close();
248+
jettyServer.stop();
171249
}
172250
}

src/main/java/com/teragrep/aer_01/Main.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -56,20 +56,20 @@
5656
import com.azure.storage.blob.BlobContainerAsyncClient;
5757
import com.azure.storage.blob.BlobContainerClientBuilder;
5858
import com.teragrep.aer_01.config.AzureConfig;
59+
import com.teragrep.aer_01.config.MetricsConfig;
5960
import com.teragrep.aer_01.config.source.EnvironmentSource;
6061
import com.teragrep.aer_01.config.source.PropertySource;
6162
import com.teragrep.aer_01.config.source.Sourceable;
6263

63-
import java.io.IOException;
64-
6564
// https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-java-get-started-send?tabs=passwordless%2Croles-azure-portal
6665

6766
public final class Main {
6867

69-
public static void main(String[] args) throws IOException, InterruptedException {
68+
public static void main(String[] args) throws Exception {
7069
final Sourceable configSource = getConfigSource();
70+
final int prometheusPort = new MetricsConfig(configSource).prometheusPort;
7171

72-
try (final EventContextConsumer PARTITION_PROCESSOR = new EventContextConsumer(configSource)) {
72+
try (final EventContextConsumer PARTITION_PROCESSOR = new EventContextConsumer(configSource, prometheusPort)) {
7373
AzureConfig azureConfig = new AzureConfig(configSource);
7474
final ErrorContextConsumer ERROR_HANDLER = new ErrorContextConsumer();
7575

0 commit comments

Comments
 (0)