Skip to content

Commit

Permalink
add MonitorBytesHandler, MonitorQpsHandler, JVMGCMetrics, Update Metr…
Browse files Browse the repository at this point in the history
…icsCollector
  • Loading branch information
zhuangjinjin committed Oct 23, 2019
1 parent 5580762 commit fc5f491
Show file tree
Hide file tree
Showing 16 changed files with 262 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,8 @@
*/
package io.github.ukuz.piccolo.api.exchange.handler;

import io.github.ukuz.piccolo.api.PiccoloContext;
import io.github.ukuz.piccolo.api.connection.Connection;
import io.github.ukuz.piccolo.api.exchange.ExchangeException;
import io.netty.channel.Channel;

/**
* @author ukuz90
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ public void init(PiccoloContext context) throws ServiceException {
Map<String, String> data = Jsons.fromJson(contentInfo, Map.class);
cacheRouteMap.putAll(data);
}
//register listener
this.configuration.addListener(key, this);
}

@Override
Expand Down Expand Up @@ -77,6 +79,7 @@ public void onConfigurationChanged(ConfigurationChangedEvent event) {
cacheRouteMap.clear();
} else {
Map<String, String> data = Jsons.fromJson(event.getValue(), Map.class);
cacheRouteMap.clear();
cacheRouteMap.putAll(data);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ public Executor getExecutor() {

@Override
public void innerReceive(String dataId, String group, String configInfo) {
LOGGER.info("innerReceive data: {}, group: {}, configInfo: {}", dataId, group, configInfo);
String oldData = cachedData.get(dataId);
ConfigurationChangedEvent event = new ConfigurationChangedEvent(dataId, configInfo, getChangeType(oldData, configInfo));
if (configInfo != null) {
Expand Down
4 changes: 4 additions & 0 deletions piccolo-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@
<groupId>${project.groupId}</groupId>
<artifactId>piccolo-mq</artifactId>
</dependency>
<dependency>
<groupId>${project.groupId}</groupId>
<artifactId>piccolo-monitor</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2019 ukuz90
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.ukuz.piccolo.core.handler;

import io.github.ukuz.piccolo.monitor.MetricsMonitor;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;

/**
* @author ukuz90
*/
@ChannelHandler.Sharable
public class MonitorBytesHandler extends ChannelDuplexHandler {

private final String serverName;

public MonitorBytesHandler(String serverName) {
this.serverName = serverName;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
MetricsMonitor.getQuestBytes(serverName).increment(((ByteBuf) msg).readableBytes());
} else if (msg instanceof BinaryWebSocketFrame) {
MetricsMonitor.getQuestBytes(serverName).increment(((BinaryWebSocketFrame)msg).content().readableBytes());
} else if (msg instanceof TextWebSocketFrame) {
MetricsMonitor.getQuestBytes(serverName).increment(((TextWebSocketFrame)msg).content().readableBytes());
}
super.channelRead(ctx, msg);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
if (msg instanceof ByteBuf) {
MetricsMonitor.getResponseBytes(serverName).increment(((ByteBuf) msg).readableBytes());
} else if (msg instanceof BinaryWebSocketFrame) {
MetricsMonitor.getResponseBytes(serverName).increment(((BinaryWebSocketFrame)msg).content().readableBytes());
} else if (msg instanceof TextWebSocketFrame) {
MetricsMonitor.getResponseBytes(serverName).increment(((TextWebSocketFrame)msg).content().readableBytes());
}
super.write(ctx, msg, promise);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright 2019 ukuz90
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.ukuz.piccolo.core.handler;

import io.github.ukuz.piccolo.monitor.MetricsMonitor;
import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelPromise;

/**
* @author ukuz90
*/
@ChannelHandler.Sharable
public class MonitorQpsHandler extends ChannelDuplexHandler {

private final String serverName;

public MonitorQpsHandler(String serverName) {
this.serverName = serverName;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MetricsMonitor.getQuestCount(serverName).increment();
super.channelRead(ctx, msg);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
MetricsMonitor.getResponseCount(serverName).increment();
super.write(ctx, msg, promise);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.github.ukuz.piccolo.common.thread.NamedThreadFactory;
import io.github.ukuz.piccolo.common.thread.ThreadNames;
import io.github.ukuz.piccolo.core.handler.ChannelHandlers;
import io.github.ukuz.piccolo.core.handler.MonitorBytesHandler;
import io.github.ukuz.piccolo.core.handler.MonitorQpsHandler;
import io.github.ukuz.piccolo.core.properties.ThreadProperties;
import io.github.ukuz.piccolo.monitor.MonitorExecutorFactory;
import io.github.ukuz.piccolo.transport.codec.Codec;
Expand All @@ -52,6 +54,8 @@ public class ConnectServer extends NettyServer {

private InetSocketAddress address;
private GlobalChannelTrafficShapingHandler channelTrafficShapingHandler;
private MonitorBytesHandler monitorBytesHandler;
private MonitorQpsHandler monitorQpsHandler;
private DefaultServiceInstance serviceInstance;

public ConnectServer(PiccoloContext piccoloContext) {
Expand Down Expand Up @@ -87,6 +91,9 @@ protected void doInit() {
traffic.getWriteGlobalLimit(), traffic.getReadGlobalLimit(),
traffic.getWriteChannelLimit(), traffic.getReadChannelLimit());
}

monitorBytesHandler = new MonitorBytesHandler(getName());
monitorQpsHandler = new MonitorQpsHandler(getName());
}

@Override
Expand Down Expand Up @@ -131,6 +138,8 @@ protected void initPipeline(ChannelPipeline pipeline) {
if (channelTrafficShapingHandler != null) {
pipeline.addFirst(channelTrafficShapingHandler);
}
pipeline.addBefore("decoder", "monitor_bytes", monitorBytesHandler);
pipeline.addAfter("decoder", "monitor_qps", monitorQpsHandler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import io.github.ukuz.piccolo.common.thread.NamedThreadFactory;
import io.github.ukuz.piccolo.common.thread.ThreadNames;
import io.github.ukuz.piccolo.core.handler.ChannelHandlers;
import io.github.ukuz.piccolo.core.handler.MonitorBytesHandler;
import io.github.ukuz.piccolo.core.handler.MonitorQpsHandler;
import io.github.ukuz.piccolo.core.properties.ThreadProperties;
import io.github.ukuz.piccolo.monitor.MonitorExecutorFactory;
import io.github.ukuz.piccolo.transport.codec.Codec;
Expand All @@ -54,6 +56,8 @@ public class GatewayServer extends NettyServer {
private final ConnectionManager cxnxManager;
private DefaultServiceInstance serviceInstance;
private GlobalChannelTrafficShapingHandler trafficShapingHandler;
private MonitorBytesHandler monitorBytesHandler;
private MonitorQpsHandler monitorQpsHandler;

public GatewayServer(PiccoloContext piccoloContext) {
this(piccoloContext,
Expand Down Expand Up @@ -88,6 +92,9 @@ protected void doInit() {
traffic.getWriteGlobalLimit(), traffic.getReadGlobalLimit(),
traffic.getWriteChannelLimit(), traffic.getReadChannelLimit());
}

monitorBytesHandler = new MonitorBytesHandler(getName());
monitorQpsHandler = new MonitorQpsHandler(getName());
}

@Override
Expand Down Expand Up @@ -125,6 +132,9 @@ protected void initPipeline(ChannelPipeline pipeline) {
if (trafficShapingHandler != null) {
pipeline.addFirst(trafficShapingHandler);
}

pipeline.addBefore("decoder", "monitor_bytes", monitorBytesHandler);
pipeline.addAfter("decoder","monitor_qps", monitorQpsHandler);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import io.github.ukuz.piccolo.common.thread.ThreadNames;
import io.github.ukuz.piccolo.core.externel.handler.WebSocketIndexHandler;
import io.github.ukuz.piccolo.core.handler.ChannelHandlers;
import io.github.ukuz.piccolo.core.handler.MonitorBytesHandler;
import io.github.ukuz.piccolo.core.handler.MonitorQpsHandler;
import io.github.ukuz.piccolo.core.properties.ThreadProperties;
import io.github.ukuz.piccolo.monitor.MonitorExecutorFactory;
import io.github.ukuz.piccolo.transport.codec.*;
Expand All @@ -53,6 +55,8 @@ public class WebSocketServer extends NettyServer {

private InetSocketAddress address;
private DefaultServiceInstance serviceInstance;
private MonitorBytesHandler monitorBytesHandler;
private MonitorQpsHandler monitorQpsHandler;

public WebSocketServer(PiccoloContext piccoloContext) {
this(piccoloContext, ChannelHandlers.newConnectChannelHandler(piccoloContext), new NettyConnectionManager());
Expand All @@ -77,6 +81,8 @@ protected Codec newCodec() {

@Override
protected void doInit() {
monitorBytesHandler = new MonitorBytesHandler(getName());
monitorQpsHandler = new MonitorQpsHandler(getName());
}

@Override
Expand Down Expand Up @@ -111,8 +117,10 @@ protected void initPipeline(ChannelPipeline pipeline) {
pipeline.addLast(new WebSocketServerProtocolHandler(piccoloContext.getProperties(NetProperties.class).getWsPath(), null, true));
//inbound
pipeline.addLast(new WebSocketIndexHandler());
pipeline.addLast(monitorBytesHandler);
pipeline.addLast(codec.getDecoder());
pipeline.addLast(codec.getEncoder());
pipeline.addLast(monitorQpsHandler);
//duplex
pipeline.addLast(getServerHandler());
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2019 ukuz90
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.ukuz.piccolo.monitor;

import io.github.ukuz.piccolo.monitor.quota.GCQuota;
import io.micrometer.core.instrument.Gauge;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.binder.MeterBinder;

/**
* @author ukuz90
*/
public class JVMGCMetrics implements MeterBinder {

private final GCQuota gcQuota;

private static final String COUNT = "count";
private static final String TIME = "time";

public JVMGCMetrics(GCQuota gcQuota) {
this.gcQuota = gcQuota;
}

@Override
public void bindTo(MeterRegistry registry) {
Gauge.builder("jvm.gc.collection.count", gcQuota, GCQuota::youngGcCollectionCount)
.description("")
.baseUnit(COUNT)
.tag("name", "young gc")
.register(registry);

Gauge.builder("jvm.gc.collection.count", gcQuota, GCQuota::fullGcCollectionCount)
.description("")
.baseUnit(COUNT)
.tag("name", "full gc")
.register(registry);

Gauge.builder("jvm.gc.collection.time", gcQuota, GCQuota::youngGcCollectionTime)
.description("")
.baseUnit(TIME)
.tag("name", "young gc")
.register(registry);

Gauge.builder("jvm.gc.collection.time", gcQuota, GCQuota::fullGcCollectionTime)
.description("")
.baseUnit(TIME)
.tag("name", "full gc")
.register(registry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import io.github.ukuz.piccolo.api.common.utils.StringUtils;
import io.github.ukuz.piccolo.api.external.common.Assert;
import io.github.ukuz.piccolo.monitor.quota.GCQuota;
import io.micrometer.core.instrument.*;
import io.micrometer.core.instrument.binder.jvm.DiskSpaceMetrics;
import io.netty.util.internal.PlatformDependent;
Expand All @@ -31,7 +32,10 @@
*/
public class MetricsMonitor {

private static final String MONITOR_TAG = "piccolo_app_monitor";
private static final String REQUEST_CNT_TAG = "piccolo_recv_count";
private static final String REQUEST_BYTES_TAG = "piccolo_recv_bytes";
private static final String RESPONSE_CNT_TAG = "piccolo_send_count";
private static final String RESPONSE_BYTES_TAG = "piccolo_send_bytes";

public static void gauge(String tag, String module, String name, Number value) {
if (StringUtils.hasText(module)) {
Expand Down Expand Up @@ -70,12 +74,28 @@ private static List<Tag> wrapLabelSet(String ...label) {
return labelSet;
}

public static final Counter getWebSocketQuestCount() {
return counter(MONITOR_TAG, "module", "net", "name", "websocketQuestCount");
// public static final Counter getWebSocketQuestCount() {
// return counter(MONITOR_TAG, "module", "net", "name", "websocketQuestCount");
// }
//
// public static final Counter getWebSocketBytesCount() {
// return counter(MONITOR_TAG, "module", "net", "name", "websocketBytesCount");
// }

public static final Counter getQuestCount(String name) {
return counter(REQUEST_CNT_TAG, "name", name);
}

public static final Counter getQuestBytes(String name) {
return counter(REQUEST_BYTES_TAG, "name", name);
}

public static final Counter getWebSocketBytesCount() {
return counter(MONITOR_TAG, "module", "net", "name", "websocketBytesCount");
public static final Counter getResponseCount(String name) {
return counter(RESPONSE_CNT_TAG, "name", name);
}

public static final Counter getResponseBytes(String name) {
return counter(RESPONSE_BYTES_TAG, "name", name);
}

public static final void monitorDisk() {
Expand All @@ -84,4 +104,8 @@ public static final void monitorDisk() {
}
}

public static final void monitorGC(GCQuota gcQuota) {
new JVMGCMetrics(gcQuota).bindTo(Metrics.globalRegistry);
}

}
Loading

0 comments on commit fc5f491

Please sign in to comment.