diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/MapStatisticsInboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/MapStatisticsInboundDao.java new file mode 100644 index 0000000000000..52321c65f511f --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/MapStatisticsInboundDao.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.dao; + +import com.navercorp.pinpoint.common.trace.ServiceType; + +/** + * @author intr3p1d + */ +public interface MapStatisticsInboundDao extends CachedStatisticsDao { + // src -> dest + // inbound (rowKey dest <- columnName src) + // outbound (rowKey src -> columnName dest) + void update( + String srcServiceGroupName, String srcApplicationName, ServiceType srcServiceType, + String destServiceGroupName, String destApplicationName, ServiceType destServiceType, + String srcHost, int elapsed, boolean isError + ); +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/MapStatisticsOutboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/MapStatisticsOutboundDao.java new file mode 100644 index 0000000000000..ff9196471198f --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/MapStatisticsOutboundDao.java @@ -0,0 +1,32 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.dao; + +import com.navercorp.pinpoint.common.trace.ServiceType; + +/** + * @author intr3p1d + */ +public interface MapStatisticsOutboundDao extends CachedStatisticsDao { + // src -> dest + // inbound (rowKey dest <- columnName src) + // outbound (rowKey src -> columnName dest) + void update( + String destServiceGroupName, String destApplicationName, ServiceType destServiceType, + String srcServiceGroupName, String srcApplicationName, ServiceType srcServiceType, + String srcHost, int elapsed, boolean isError + ); +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/MapStatisticsSelfDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/MapStatisticsSelfDao.java new file mode 100644 index 0000000000000..ba2a7442b9b89 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/MapStatisticsSelfDao.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.dao; + +import com.navercorp.pinpoint.common.trace.ServiceType; + +/** + * @author intr3p1d + */ +public interface MapStatisticsSelfDao extends CachedStatisticsDao { + void received(String serviceGroup, String applicationName, ServiceType serviceType, int elapsed, boolean isError); + + void updatePing(String serviceGroup, String applicationName, ServiceType serviceType, int elapsed, boolean isError); +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsInboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsInboundDao.java new file mode 100644 index 0000000000000..83bf533798df3 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsInboundDao.java @@ -0,0 +1,130 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.dao.hbase; + +import com.navercorp.pinpoint.collector.dao.MapStatisticsInboundDao; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.ServiceGroupColumnName; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.ServiceGroupRowKey; +import com.navercorp.pinpoint.common.server.util.AcceptedTimeService; +import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; +import com.navercorp.pinpoint.common.server.util.TimeSlot; +import com.navercorp.pinpoint.common.trace.HistogramSchema; +import com.navercorp.pinpoint.common.trace.ServiceType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Repository; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Repository +public class HbaseMapStatisticsInboundDao implements MapStatisticsInboundDao { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final AcceptedTimeService acceptedTimeService; + + private final TimeSlot timeSlot; + private final IgnoreStatFilter ignoreStatFilter; + private final BulkWriter bulkWriter; + private final MapLinkConfiguration mapLinkConfiguration; + + public HbaseMapStatisticsInboundDao( + MapLinkConfiguration mapLinkConfiguration, + IgnoreStatFilter ignoreStatFilter, + AcceptedTimeService acceptedTimeService, + TimeSlot timeSlot, + @Qualifier("inboundBulkWriter") BulkWriter bulkWriter + ) { + this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration"); + this.ignoreStatFilter = Objects.requireNonNull(ignoreStatFilter, "ignoreStatFilter"); + this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); + this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot"); + + this.bulkWriter = Objects.requireNonNull(bulkWriter, "inboundBulkWriter"); + } + + + @Override + public void update( + String srcServiceGroupName, String srcApplicationName, ServiceType srcServiceType, + String destServiceGroupName, String destApplicationName, ServiceType destServiceType, + String srcHost, int elapsed, boolean isError + ) { + Objects.requireNonNull(srcServiceGroupName, "srcServiceGroupName"); + Objects.requireNonNull(destServiceGroupName, "destServiceGroupName"); + Objects.requireNonNull(srcApplicationName, "srcApplicationName"); + Objects.requireNonNull(destServiceGroupName, "destApplicationName"); + + if (logger.isDebugEnabled()) { + logger.debug("[Inbound] {} {}({})[{}] <- {} {}({})", + destServiceGroupName, destApplicationName, destServiceType, srcHost, + srcServiceGroupName, srcApplicationName, srcServiceType + ); + } + + + // TODO dest, src parameter normalization + if (ignoreStatFilter.filter(srcServiceType, srcHost)) { + logger.debug("[Ignore-Inbound] {} {}({})[{}] <- {} {}({})", + destServiceGroupName, destApplicationName, destServiceType, srcHost, + srcServiceGroupName, srcApplicationName, srcServiceType + ); + return; + } + + final long acceptedTime = acceptedTimeService.getAcceptedTime(); + final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime); + + // rowKey is dest in inbound + final RowKey destRowKey = new ServiceGroupRowKey(destServiceGroupName, destServiceType.getCode(), destApplicationName, rowTimeSlot); + + // columnName is src in outbound + final short srcSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(srcServiceType, elapsed, isError); + HistogramSchema histogramSchema = srcServiceType.getHistogramSchema(); + + final ColumnName srcColumnName = new ServiceGroupColumnName(srcServiceGroupName, srcServiceType.getCode(), srcApplicationName, srcSlotNumber); + this.bulkWriter.increment(destRowKey, srcColumnName); + + if (mapLinkConfiguration.isEnableAvg()) { + final ColumnName sumColumnName = new ServiceGroupColumnName(srcServiceGroupName, srcServiceType.getCode(), srcApplicationName, histogramSchema.getSumStatSlot().getSlotTime()); + this.bulkWriter.increment(destRowKey, sumColumnName, elapsed); + } + if (mapLinkConfiguration.isEnableMax()) { + final ColumnName maxColumnName = new ServiceGroupColumnName(srcServiceGroupName, srcServiceType.getCode(), srcApplicationName, histogramSchema.getMaxStatSlot().getSlotTime()); + this.bulkWriter.updateMax(destRowKey, maxColumnName, elapsed); + } + + } + + @Override + public void flushLink() { + this.bulkWriter.flushLink(); + } + + @Override + public void flushAvgMax() { + this.bulkWriter.flushAvgMax(); + } + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsOutboundDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsOutboundDao.java new file mode 100644 index 0000000000000..697d6db27ac45 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsOutboundDao.java @@ -0,0 +1,123 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.dao.hbase; + +import com.navercorp.pinpoint.collector.dao.MapStatisticsOutboundDao; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.ServiceGroupColumnName; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.ServiceGroupRowKey; +import com.navercorp.pinpoint.common.server.util.AcceptedTimeService; +import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; +import com.navercorp.pinpoint.common.server.util.TimeSlot; +import com.navercorp.pinpoint.common.trace.HistogramSchema; +import com.navercorp.pinpoint.common.trace.ServiceType; +import org.apache.commons.lang3.StringUtils; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Repository; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Repository +public class HbaseMapStatisticsOutboundDao implements MapStatisticsOutboundDao { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final AcceptedTimeService acceptedTimeService; + + private final TimeSlot timeSlot; + + private final BulkWriter bulkWriter; + private final MapLinkConfiguration mapLinkConfiguration; + + public HbaseMapStatisticsOutboundDao( + MapLinkConfiguration mapLinkConfiguration, + IgnoreStatFilter ignoreStatFilter, + AcceptedTimeService acceptedTimeService, TimeSlot timeSlot, + @Qualifier("outboundBulkWriter") BulkWriter bulkWriter + ) { + this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration"); + this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); + this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot"); + + this.bulkWriter = Objects.requireNonNull(bulkWriter, "outboundBulkWriter"); + } + + + @Override + public void update( + String destServiceGroupName, String destApplicationName, ServiceType destServiceType, + String srcServiceGroupName, String srcApplicationName, ServiceType srcServiceType, + String srcHost, int elapsed, boolean isError + ) { + // outbound (rowKey src -> columnName dest) + Objects.requireNonNull(destServiceGroupName, "destServiceGroupName"); + Objects.requireNonNull(srcServiceGroupName, "srcServiceGroupName"); + Objects.requireNonNull(destApplicationName, "destApplicationName"); + Objects.requireNonNull(srcServiceGroupName, "srcApplicationName"); + + if (logger.isDebugEnabled()) { + logger.debug("[Outbound] {} {}({})[{}] -> {} {}({})", + srcServiceGroupName, srcApplicationName, srcServiceType, srcHost, + destServiceGroupName, destApplicationName, destServiceType + ); + } + + // there may be no endpoint in case of httpclient + srcHost = StringUtils.defaultString(srcHost); + + final long acceptedTime = acceptedTimeService.getAcceptedTime(); + final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime); + + // rowKey is src in outbound + final RowKey srcRowKey = new ServiceGroupRowKey(srcServiceGroupName, srcServiceType.getCode(), srcApplicationName, rowTimeSlot); + + // columnName is dest in outbound + final short destSlotNumber = ApplicationMapStatisticsUtils.getSlotNumber(destServiceType, elapsed, isError); + HistogramSchema histogramSchema = destServiceType.getHistogramSchema(); + + final ColumnName destColumnName = new ServiceGroupColumnName(destServiceGroupName, destServiceType.getCode(), destApplicationName, destSlotNumber); + this.bulkWriter.increment(srcRowKey, destColumnName); + + if (mapLinkConfiguration.isEnableAvg()) { + final ColumnName sumColumnName = new ServiceGroupColumnName(destServiceGroupName, destServiceType.getCode(), destApplicationName, histogramSchema.getSumStatSlot().getSlotTime()); + this.bulkWriter.increment(srcRowKey, sumColumnName, elapsed); + } + if (mapLinkConfiguration.isEnableMax()) { + final ColumnName maxColumnName = new ServiceGroupColumnName(destServiceGroupName, destServiceType.getCode(), destApplicationName, histogramSchema.getMaxStatSlot().getSlotTime()); + this.bulkWriter.updateMax(srcRowKey, maxColumnName, elapsed); + } + } + + + @Override + public void flushLink() { + this.bulkWriter.flushLink(); + } + + @Override + public void flushAvgMax() { + this.bulkWriter.flushAvgMax(); + } + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsSelfDao.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsSelfDao.java new file mode 100644 index 0000000000000..699b1e49163e0 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/HbaseMapStatisticsSelfDao.java @@ -0,0 +1,122 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.dao.hbase; + +import com.navercorp.pinpoint.collector.dao.MapStatisticsSelfDao; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.BulkWriter; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.ColumnName; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.MapLinkConfiguration; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.RowKey; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.ServiceGroupRowKey; +import com.navercorp.pinpoint.collector.dao.hbase.statistics.ServiceResponseColumnName; +import com.navercorp.pinpoint.common.server.util.AcceptedTimeService; +import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; +import com.navercorp.pinpoint.common.server.util.TimeSlot; +import com.navercorp.pinpoint.common.trace.HistogramSchema; +import com.navercorp.pinpoint.common.trace.ServiceType; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Repository; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Repository +public class HbaseMapStatisticsSelfDao implements MapStatisticsSelfDao { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final AcceptedTimeService acceptedTimeService; + + private final TimeSlot timeSlot; + private final BulkWriter bulkWriter; + private final MapLinkConfiguration mapLinkConfiguration; + + public HbaseMapStatisticsSelfDao(MapLinkConfiguration mapLinkConfiguration, + AcceptedTimeService acceptedTimeService, TimeSlot timeSlot, + @Qualifier("serviceGroupSelfBulkWriter") BulkWriter bulkWriter) { + this.mapLinkConfiguration = Objects.requireNonNull(mapLinkConfiguration, "mapLinkConfiguration"); + this.acceptedTimeService = Objects.requireNonNull(acceptedTimeService, "acceptedTimeService"); + this.timeSlot = Objects.requireNonNull(timeSlot, "timeSlot"); + this.bulkWriter = Objects.requireNonNull(bulkWriter, "bulkWrtier"); + } + + + @Override + public void received(String serviceGroupName, String applicationName, ServiceType applicationServiceType, int elapsed, boolean isError) { + Objects.requireNonNull(serviceGroupName, "serviceGroupName"); + Objects.requireNonNull(applicationName, "applicationName"); + + + if (logger.isDebugEnabled()) { + logger.debug("[Received] {} {} ({})", serviceGroupName, applicationName, applicationServiceType); + } + + // make row key. rowkey is me + final long acceptedTime = acceptedTimeService.getAcceptedTime(); + final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime); + final RowKey selfRowKey = new ServiceGroupRowKey(serviceGroupName, applicationServiceType.getCode(), applicationName, rowTimeSlot); + + final short slotNumber = ApplicationMapStatisticsUtils.getSlotNumber(applicationServiceType, elapsed, isError); + final ColumnName selfColumnName = new ServiceResponseColumnName(applicationName, applicationServiceType.getCode(), slotNumber); + this.bulkWriter.increment(selfRowKey, selfColumnName); + + HistogramSchema histogramSchema = applicationServiceType.getHistogramSchema(); + if (mapLinkConfiguration.isEnableAvg()) { + final ColumnName sumColumnName = new ServiceResponseColumnName(applicationName, applicationServiceType.getCode(), histogramSchema.getSumStatSlot().getSlotTime()); + this.bulkWriter.increment(selfRowKey, sumColumnName, elapsed); + } + + final ColumnName maxColumnName = new ServiceResponseColumnName(applicationName, applicationServiceType.getCode(), histogramSchema.getMaxStatSlot().getSlotTime()); + if (mapLinkConfiguration.isEnableMax()) { + this.bulkWriter.updateMax(selfRowKey, maxColumnName, elapsed); + } + } + + @Override + public void updatePing(String serviceGroupName, String applicationName, ServiceType applicationServiceType, int elapsed, boolean isError) { + Objects.requireNonNull(serviceGroupName, "serviceGroupName"); + Objects.requireNonNull(applicationName, "applicationName"); + + if (logger.isDebugEnabled()) { + logger.debug("[Received] {} {} ({})", serviceGroupName, applicationName, applicationServiceType); + } + + // make row key. rowkey is me + final long acceptedTime = acceptedTimeService.getAcceptedTime(); + final long rowTimeSlot = timeSlot.getTimeSlot(acceptedTime); + final RowKey selfRowKey = new ServiceGroupRowKey(serviceGroupName, applicationServiceType.getCode(), applicationName, rowTimeSlot); + + final short slotNumber = ApplicationMapStatisticsUtils.getPingSlotNumber(applicationServiceType, elapsed, isError); + final ColumnName selfColumnName = new ServiceResponseColumnName(applicationName, applicationServiceType.getCode(), slotNumber); + this.bulkWriter.increment(selfRowKey, selfColumnName); + } + + + @Override + public void flushLink() { + this.bulkWriter.flushLink(); + } + + @Override + public void flushAvgMax() { + this.bulkWriter.flushAvgMax(); + } + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkFactory.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkFactory.java index c295fdc604a3d..df043c38a284e 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkFactory.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/BulkFactory.java @@ -144,4 +144,6 @@ public BulkWriter selfBulkWriter(HbaseOperations hbaseTemplate, private String newBulkWriterName(String className) { return className + "-writer"; } + + } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceGroupBulkFactory.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceGroupBulkFactory.java new file mode 100644 index 0000000000000..9d9064952a304 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceGroupBulkFactory.java @@ -0,0 +1,159 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.dao.hbase.statistics; + +import com.navercorp.pinpoint.collector.dao.hbase.BulkOperationReporter; +import com.navercorp.pinpoint.collector.dao.hbase.HbaseMapStatisticsCallerDao; +import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily; +import com.navercorp.pinpoint.common.hbase.HbaseOperations; +import com.navercorp.pinpoint.common.hbase.TableNameProvider; +import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Configuration +public class ServiceGroupBulkFactory { + private final BulkConfiguration bulkConfiguration; + private final BulkIncrementerFactory bulkIncrementerFactory; + private final BulkOperationReporterFactory bulkOperationReporterFactory; + + public ServiceGroupBulkFactory(BulkConfiguration bulkConfiguration, + BulkIncrementerFactory bulkIncrementerFactory, + BulkOperationReporterFactory bulkOperationReporterFactory) { + this.bulkConfiguration = Objects.requireNonNull(bulkConfiguration, "bulkConfiguration"); + this.bulkIncrementerFactory = Objects.requireNonNull(bulkIncrementerFactory, "bulkIncrementerFactory"); + this.bulkOperationReporterFactory = Objects.requireNonNull(bulkOperationReporterFactory, "bulkOperationReporterFactory"); + } + + private BulkIncrementer newBulkIncrementer(String reporterName, HbaseColumnFamily hbaseColumnFamily, int limitSize) { + BulkOperationReporter reporter = bulkOperationReporterFactory.getBulkOperationReporter(reporterName); + RowKeyMerge merge = new RowKeyMerge(hbaseColumnFamily); + BulkIncrementer bulkIncrementer = new DefaultBulkIncrementer(merge); + + return bulkIncrementerFactory.wrap(bulkIncrementer, limitSize, reporter); + } + + + private BulkUpdater getBulkUpdater(String reporterName) { + BulkOperationReporter reporter = bulkOperationReporterFactory.getBulkOperationReporter(reporterName); + BulkUpdater bulkUpdater = new DefaultBulkUpdater(); + return bulkIncrementerFactory.wrap(bulkUpdater, bulkConfiguration.getCalleeLimitSize(), reporter); + } + + private BulkWriter newBulkWriter(String loggerName, + HbaseOperations hbaseTemplate, + HbaseColumnFamily descriptor, + TableNameProvider tableNameProvider, + RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, + BulkIncrementer bulkIncrementer, + BulkUpdater bulkUpdater) { + if (bulkConfiguration.enableBulk()) { + return new DefaultBulkWriter(loggerName, hbaseTemplate, rowKeyDistributorByHashPrefix, + bulkIncrementer, bulkUpdater, descriptor, tableNameProvider); + } else { + return new SyncWriter(loggerName, hbaseTemplate, rowKeyDistributorByHashPrefix, descriptor, tableNameProvider); + } + } + + private static String newBulkWriterName(String className) { + return className + "-writer"; + } + + @Bean + public BulkIncrementer inboundBulkIncrementer() { + String reporterName = "inboundBulkIncrementerReporter"; + HbaseColumnFamily hbaseColumnFamily = HbaseColumnFamily.MAP_STATISTICS_INBOUND_SERVICE_GROUP_COUNTER; + int limitSize = bulkConfiguration.getCallerLimitSize(); + + return newBulkIncrementer(reporterName, hbaseColumnFamily, limitSize); + } + + @Bean + public BulkUpdater inboundBulkUpdater() { + String reporterName = "inboundBulkUpdaterReporter"; + return getBulkUpdater(reporterName); + } + + @Bean + public BulkWriter inboundBulkWriter(HbaseOperations hbaseTemplate, + TableNameProvider tableNameProvider, + @Qualifier("serviceGroupInboundRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, + @Qualifier("inboundBulkIncrementer") BulkIncrementer bulkIncrementer, + @Qualifier("inboundBulkUpdater") BulkUpdater bulkUpdater) { + String loggerName = newBulkWriterName(HbaseMapStatisticsCallerDao.class.getName()); + return newBulkWriter(loggerName, hbaseTemplate, HbaseColumnFamily.MAP_STATISTICS_INBOUND_SERVICE_GROUP_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater); + } + + + @Bean + public BulkIncrementer outboundBulkIncrementer() { + String reporterName = "outboundBulkIncrementerReporter"; + HbaseColumnFamily hbaseColumnFamily = HbaseColumnFamily.MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_COUNTER; + int limitSize = bulkConfiguration.getCallerLimitSize(); + + return newBulkIncrementer(reporterName, hbaseColumnFamily, limitSize); + } + + @Bean + public BulkUpdater outboundBulkUpdater() { + String reporterName = "outboundBulkUpdaterReporter"; + return getBulkUpdater(reporterName); + } + + + @Bean + public BulkWriter outboundBulkWriter(HbaseOperations hbaseTemplate, + TableNameProvider tableNameProvider, + @Qualifier("serviceGroupOutboundRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, + @Qualifier("outboundBulkIncrementer") BulkIncrementer bulkIncrementer, + @Qualifier("outboundBulkUpdater") BulkUpdater bulkUpdater) { + String loggerName = newBulkWriterName(HbaseMapStatisticsCallerDao.class.getName()); + return newBulkWriter(loggerName, hbaseTemplate, HbaseColumnFamily.MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater); + } + + + @Bean + public BulkIncrementer serviceGroupSelfBulkIncrementer() { + String reporterName = "serviceGroupSelfBulkIncrementerReporter"; + HbaseColumnFamily hbaseColumnFamily = HbaseColumnFamily.MAP_STATISTICS_SELF_SERVICE_GROUP_COUNTER; + int limitSize = bulkConfiguration.getCallerLimitSize(); + + return newBulkIncrementer(reporterName, hbaseColumnFamily, limitSize); + } + + @Bean + public BulkUpdater serviceGroupSelfBulkUpdater() { + String reporterName = "ServiceGroupSelfBulkUpdaterReporter"; + return getBulkUpdater(reporterName); + } + + @Bean + public BulkWriter serviceGroupSelfBulkWriter(HbaseOperations hbaseTemplate, + TableNameProvider tableNameProvider, + @Qualifier("serviceGroupSelfRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix, + @Qualifier("serviceGroupSelfBulkIncrementer") BulkIncrementer bulkIncrementer, + @Qualifier("serviceGroupSelfBulkUpdater") BulkUpdater bulkUpdater) { + String loggerName = newBulkWriterName(HbaseMapStatisticsCallerDao.class.getName()); + return newBulkWriter(loggerName, hbaseTemplate, HbaseColumnFamily.MAP_STATISTICS_SELF_SERVICE_GROUP_COUNTER, tableNameProvider, rowKeyDistributorByHashPrefix, bulkIncrementer, bulkUpdater); + } + +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceGroupColumnName.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceGroupColumnName.java new file mode 100644 index 0000000000000..cdf1b60ef843c --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceGroupColumnName.java @@ -0,0 +1,92 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.dao.hbase.statistics; + +import com.navercorp.pinpoint.common.buffer.AutomaticBuffer; +import com.navercorp.pinpoint.common.buffer.Buffer; +import com.navercorp.pinpoint.common.server.util.ServiceGroupMapUtils; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +public class ServiceGroupColumnName implements ColumnName { + + private final String thatServiceGroupName; + private final short thatApplicationServiceType; + private final String thatApplicationName; + private final short columnSlotNumber; + + // WARNING - cached hash value should not be included for equals/hashCode + private int hash; + private long callCount; + + public ServiceGroupColumnName( + String thatServiceGroupName, + short thatApplicationServiceType, String thatApplicationName, + short columnSlotNumber + ) { + this.thatServiceGroupName = Objects.requireNonNull(thatServiceGroupName, "thatServiceGroupName"); + this.thatApplicationServiceType = thatApplicationServiceType; + this.thatApplicationName = Objects.requireNonNull(thatApplicationName, "thatApplicationName"); + this.columnSlotNumber = columnSlotNumber; + } + + @Override + public byte[] getColumnName() { + return ServiceGroupMapUtils.makeColumnName( + thatServiceGroupName, thatApplicationName, thatApplicationServiceType, columnSlotNumber + ); + } + + @Override + public long getCallCount() { + return callCount; + } + + @Override + public void setCallCount(long callCount) { + this.callCount = callCount; + } + + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ServiceGroupColumnName that = (ServiceGroupColumnName) o; + + if (thatApplicationServiceType != that.thatApplicationServiceType) return false; + if (columnSlotNumber != that.columnSlotNumber) return false; + if (hash != that.hash) return false; + if (callCount != that.callCount) return false; + if (!thatServiceGroupName.equals(that.thatServiceGroupName)) return false; + return thatApplicationName.equals(that.thatApplicationName); + } + + @Override + public int hashCode() { + int result = thatServiceGroupName.hashCode(); + result = 31 * result + (int) thatApplicationServiceType; + result = 31 * result + thatApplicationName.hashCode(); + result = 31 * result + (int) columnSlotNumber; + result = 31 * result + hash; + result = 31 * result + (int) (callCount ^ (callCount >>> 32)); + return result; + } +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceGroupRowKey.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceGroupRowKey.java new file mode 100644 index 0000000000000..df05696d18bea --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceGroupRowKey.java @@ -0,0 +1,84 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.dao.hbase.statistics; + +import com.navercorp.pinpoint.common.server.util.ServiceGroupMapUtils; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +public class ServiceGroupRowKey implements RowKey { + private final String serviceGroup; + private final short serviceType; + private final String applicationName; + private final long rowTimeSlot; + + // WARNING - cached hash value should not be included for equals/hashCode + private int hash; + + public ServiceGroupRowKey( + String serviceGroup, + short serviceType, String applicationName, + long rowTimeSlot + ) { + this.serviceGroup = Objects.requireNonNull(serviceGroup, "serviceGroup"); + this.serviceType = serviceType; + this.applicationName = Objects.requireNonNull(applicationName, "applicationName"); + this.rowTimeSlot = rowTimeSlot; + } + + @Override + public byte[] getRowKey() { + return ServiceGroupMapUtils.makeRowKey(serviceGroup, applicationName, serviceType, rowTimeSlot); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ServiceGroupRowKey that = (ServiceGroupRowKey) o; + + if (serviceType != that.serviceType) return false; + if (rowTimeSlot != that.rowTimeSlot) return false; + if (hash != that.hash) return false; + if (!serviceGroup.equals(that.serviceGroup)) return false; + return applicationName.equals(that.applicationName); + } + + @Override + public int hashCode() { + int result = serviceGroup.hashCode(); + result = 31 * result + (int) serviceType; + result = 31 * result + applicationName.hashCode(); + result = 31 * result + (int) (rowTimeSlot ^ (rowTimeSlot >>> 32)); + result = 31 * result + hash; + return result; + } + + @Override + public String toString() { + return "ServiceGroupRowKey{" + + "callServiceGroup='" + serviceGroup + '\'' + + ", thisServiceType=" + serviceType + + ", thisApplicationName='" + applicationName + '\'' + + ", rowTimeSlot=" + rowTimeSlot + + ", hash=" + hash + + '}'; + } +} \ No newline at end of file diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceResponseColumnName.java b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceResponseColumnName.java new file mode 100644 index 0000000000000..c058bec3f9c38 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/dao/hbase/statistics/ServiceResponseColumnName.java @@ -0,0 +1,100 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.dao.hbase.statistics; + +import com.navercorp.pinpoint.common.buffer.AutomaticBuffer; +import com.navercorp.pinpoint.common.buffer.Buffer; +import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; +import com.navercorp.pinpoint.common.util.BytesUtils; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +public class ServiceResponseColumnName implements ColumnName { + + private final String applicationName; + private final short serviceTypeCode; + private final short columnSlotNumber; + + // WARNING - cached hash value should not be included for equals/hashCode + private int hash; + + private long callCount; + + public ServiceResponseColumnName(String applicationName, short serviceTypeCode, short columnSlotNumber) { + this.applicationName = Objects.requireNonNull(applicationName, "applicationName"); + this.serviceTypeCode = serviceTypeCode; + this.columnSlotNumber = columnSlotNumber; + } + + public long getCallCount() { + return callCount; + } + + public void setCallCount(long callCount) { + this.callCount = callCount; + } + + public byte[] getColumnName() { + + final Buffer buffer = new AutomaticBuffer( + applicationName.length() + BytesUtils.SHORT_BYTE_LENGTH * 2 + ); + buffer.putShort(columnSlotNumber); + + buffer.putShort(serviceTypeCode); + final byte[] applicationNameBytes = BytesUtils.toBytes(applicationName); + buffer.putBytes(applicationNameBytes); + + return buffer.getBuffer(); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + ServiceResponseColumnName that = (ServiceResponseColumnName) o; + + if (serviceTypeCode != that.serviceTypeCode) return false; + if (columnSlotNumber != that.columnSlotNumber) return false; + if (hash != that.hash) return false; + if (callCount != that.callCount) return false; + return applicationName.equals(that.applicationName); + } + + @Override + public int hashCode() { + int result = applicationName.hashCode(); + result = 31 * result + (int) serviceTypeCode; + result = 31 * result + (int) columnSlotNumber; + result = 31 * result + hash; + result = 31 * result + (int) (callCount ^ (callCount >>> 32)); + return result; + } + + @Override + public String toString() { + return "ServiceResponseColumnName{" + + "applicationName='" + applicationName + '\'' + + ", columnSlotNumber=" + columnSlotNumber + + ", hash=" + hash + + ", callCount=" + callCount + + '}'; + } +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/service/HbaseTraceService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/service/HbaseTraceService.java index a1a3511b9cbe8..f0c50d4f2f57d 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/service/HbaseTraceService.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/service/HbaseTraceService.java @@ -57,6 +57,8 @@ public class HbaseTraceService implements TraceService { private final StatisticsService statisticsService; + private final ServiceGroupMapService serviceGroupMapService; + private final ServiceTypeRegistryService registry; private final SpanStorePublisher publisher; @@ -66,6 +68,7 @@ public HbaseTraceService(TraceDao traceDao, ApplicationTraceIndexDao applicationTraceIndexDao, HostApplicationMapDao hostApplicationMapDao, StatisticsService statisticsService, + ServiceGroupMapService serviceGroupMapService, ServiceTypeRegistryService registry, SpanStorePublisher spanStorePublisher, @Qualifier("grpcSpanServerExecutor") Executor grpcSpanServerExecutor) { @@ -73,6 +76,7 @@ public HbaseTraceService(TraceDao traceDao, this.applicationTraceIndexDao = Objects.requireNonNull(applicationTraceIndexDao, "applicationTraceIndexDao"); this.hostApplicationMapDao = Objects.requireNonNull(hostApplicationMapDao, "hostApplicationMapDao"); this.statisticsService = Objects.requireNonNull(statisticsService, "statisticsService"); + this.serviceGroupMapService = Objects.requireNonNull(serviceGroupMapService, "serviceGroupMapService"); this.registry = Objects.requireNonNull(registry, "registry"); this.publisher = Objects.requireNonNull(spanStorePublisher, "spanStorePublisher"); this.grpcSpanServerExecutor = Objects.requireNonNull(grpcSpanServerExecutor, "grpcSpanServerExecutor"); @@ -166,14 +170,39 @@ private void insertSpanStat(SpanBo span) { if (spanServiceType.isQueue()) { // create virtual queue node statisticsService.updateCaller(span.getAcceptorHost(), spanServiceType, span.getRemoteAddr(), span.getApplicationId(), applicationServiceType, span.getEndPoint(), span.getElapsed(), isError); - statisticsService.updateCallee(span.getApplicationId(), applicationServiceType, span.getAcceptorHost(), spanServiceType, span.getAgentId(), span.getElapsed(), isError); + + // updateCaller: (update Outbound) + // span.getRemoteAddr() src host + // span.getEndPoint() dest host + // updateCallee: + // span.getAgentId() src host + + serviceGroupMapService.updateBidirectional( + "default", + span.getApplicationId(), applicationServiceType, + span.getAgentId(), + "default", + span.getAcceptorHost(), spanServiceType, + span.getEndPoint(), + span.getElapsed(), isError + ); + } else { // create virtual user statisticsService.updateCaller(span.getApplicationId(), ServiceType.USER, span.getAgentId(), span.getApplicationId(), applicationServiceType, span.getAgentId(), span.getElapsed(), isError); - // update the span information of the current node (self) statisticsService.updateCallee(span.getApplicationId(), applicationServiceType, span.getApplicationId(), ServiceType.USER, span.getAgentId(), span.getElapsed(), isError); + + serviceGroupMapService.updateBidirectional( + "default", + span.getApplicationId(), ServiceType.USER, + span.getAgentId(), + "default", + span.getApplicationId(), applicationServiceType, + span.getAgentId(), + span.getElapsed(), isError + ); } bugCheck++; } @@ -196,12 +225,26 @@ private void insertSpanStat(SpanBo span) { // emulate virtual queue node's send SpanEvent statisticsService.updateCaller(span.getAcceptorHost(), spanServiceType, span.getRemoteAddr(), span.getApplicationId(), applicationServiceType, span.getEndPoint(), span.getElapsed(), isError); + serviceGroupMapService.updateOutbound( + "default", + span.getAcceptorHost(), spanServiceType, + "default", + span.getApplicationId(), applicationServiceType, + span.getRemoteAddr(), + span.getElapsed(), isError + ); + parentApplicationName = span.getAcceptorHost(); parentApplicationType = spanServiceType; } } statisticsService.updateCallee(span.getApplicationId(), applicationServiceType, parentApplicationName, parentApplicationType, span.getAgentId(), span.getElapsed(), isError); + serviceGroupMapService.updateInbound( + "default", parentApplicationName, parentApplicationType, + "default", span.getApplicationId(), applicationServiceType, + span.getAgentId(), span.getElapsed(), isError + ); bugCheck++; } @@ -211,6 +254,11 @@ private void insertSpanStat(SpanBo span) { // the data may be different due to timeout or network error. statisticsService.updateResponseTime(span.getApplicationId(), applicationServiceType, span.getAgentId(), span.getElapsed(), isError); + serviceGroupMapService.updateSelfResponseTime( + "default", span.getApplicationId(), applicationServiceType, + span.getElapsed(), isError + ); + if (bugCheck != 1) { logger.info("ambiguous span found(bug). span:{}", span); @@ -268,6 +316,11 @@ private void insertSpanEventList(List spanEventList, ServiceType ap // save the information of callee (the span that spanevent called) statisticsService.updateCallee(spanEventApplicationName, spanEventType, applicationId, applicationServiceType, endPoint, elapsed, hasException); + + serviceGroupMapService.updateBidirectional( + "default", applicationId, applicationServiceType, endPoint, "default", spanEventApplicationName, spanEventType, agentId, + elapsed, hasException + ); } } diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/service/ServiceGroupMapService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/service/ServiceGroupMapService.java new file mode 100644 index 0000000000000..ddf58c39b7e31 --- /dev/null +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/service/ServiceGroupMapService.java @@ -0,0 +1,116 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.collector.service; + +import com.navercorp.pinpoint.collector.dao.MapStatisticsInboundDao; +import com.navercorp.pinpoint.collector.dao.MapStatisticsOutboundDao; +import com.navercorp.pinpoint.collector.dao.MapStatisticsSelfDao; +import com.navercorp.pinpoint.common.trace.ServiceType; +import jakarta.validation.constraints.NotBlank; +import org.springframework.stereotype.Service; +import org.springframework.validation.annotation.Validated; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Service +@Validated +public class ServiceGroupMapService { + + private final MapStatisticsInboundDao mapStatisticsInboundDao; + private final MapStatisticsOutboundDao mapStatisticsOutboundDao; + private final MapStatisticsSelfDao mapStatisticsSelfDao; + + public ServiceGroupMapService( + MapStatisticsInboundDao mapStatisticsInboundDao, + MapStatisticsOutboundDao mapStatisticsOutboundDao, + MapStatisticsSelfDao mapStatisticsSelfDao + ) { + this.mapStatisticsInboundDao = Objects.requireNonNull(mapStatisticsInboundDao, "mapStatisticsInboundDao"); + this.mapStatisticsOutboundDao = Objects.requireNonNull(mapStatisticsOutboundDao, "mapStatisticsOutboundDao"); + this.mapStatisticsSelfDao = Objects.requireNonNull(mapStatisticsSelfDao, "mapStatisticsSelfDao"); + } + + public void updateBidirectional( + @NotBlank String srcServiceGroup, @NotBlank String srcApplicationName, ServiceType srcServiceType, @NotBlank String srcHost, @NotBlank String destServiceGroup, + @NotBlank String destApplicationName, ServiceType destServiceType, + @NotBlank String destHost, + int elapsed, boolean isError + ) { + // src -> dest + // inbound (rowKey dest <- columnName src) + // outbound (rowKey src -> columnName dest) + + updateOutbound( + srcServiceGroup, srcApplicationName, srcServiceType, + destServiceGroup, destApplicationName, destServiceType, + srcHost, elapsed, isError + ); + + updateInbound( + srcServiceGroup, srcApplicationName, srcServiceType, + destServiceGroup, destApplicationName, destServiceType, + srcHost, elapsed, isError + ); + + } + + + public void updateInbound( + @NotBlank String srcServiceGroup, @NotBlank String srcApplicationName, ServiceType srcServiceType, + @NotBlank String destServiceGroup, @NotBlank String destApplicationName, ServiceType destServiceType, + @NotBlank String srcHost, int elapsed, boolean isError + ) { + // inbound (rowKey dest <- columnName src) + mapStatisticsInboundDao.update( + srcServiceGroup, srcApplicationName, srcServiceType, + destServiceGroup, destApplicationName, destServiceType, + srcHost, elapsed, isError + ); + } + + public void updateOutbound( + @NotBlank String srcServiceGroup, @NotBlank String srcApplicationName, ServiceType srcServiceType, + @NotBlank String destServiceGroup, @NotBlank String destApplicationName, ServiceType destServiceType, + @NotBlank String srcHost, int elapsed, boolean isError + ) { + // outbound (rowKey src -> columnName dest) + mapStatisticsOutboundDao.update( + destServiceGroup, destApplicationName, destServiceType, + srcServiceGroup, srcApplicationName, srcServiceType, + srcHost, elapsed, isError + ); + } + + public void updateSelfResponseTime( + @NotBlank String serviceGroup, @NotBlank String applicationName, ServiceType applicationServiceType, + int elapsed, boolean isError + ) { + mapStatisticsSelfDao.received( + serviceGroup, applicationName, applicationServiceType, elapsed, isError + ); + } + + public void updateAgentState( + @NotBlank String serviceGroup, @NotBlank String applicationName, ServiceType applicationServiceType + ) { + mapStatisticsSelfDao.updatePing( + serviceGroup, applicationName, applicationServiceType, 0, false + ); + } +} diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/service/StatisticsService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/service/StatisticsService.java index 9e256bee8f06c..d8d8adc10e22e 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/service/StatisticsService.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/service/StatisticsService.java @@ -58,12 +58,12 @@ public StatisticsService(MapStatisticsCalleeDao mapStatisticsCalleeDao, MapStati * @param isError isError */ public void updateCaller( - @NotBlank String callerApplicationName, - ServiceType callerServiceType, - @NotBlank String callerAgentId, - @NotBlank String calleeApplicationName, - ServiceType calleeServiceType, - String calleeHost, + @NotBlank String callerApplicationName, // src + ServiceType callerServiceType, //src + @NotBlank String callerAgentId, //src + @NotBlank String calleeApplicationName, //dest + ServiceType calleeServiceType, //dest + String calleeHost, //dest int elapsed, boolean isError ) { @@ -85,11 +85,11 @@ public void updateCaller( * @param isError isError */ public void updateCallee( - @NotBlank String calleeApplicationName, - ServiceType calleeServiceType, - @NotBlank String callerApplicationName, - ServiceType callerServiceType, - String callerHost, + @NotBlank String calleeApplicationName, // dest + ServiceType calleeServiceType, // dest + @NotBlank String callerApplicationName, //src + ServiceType callerServiceType, //src + String callerHost, //src int elapsed, boolean isError ) { diff --git a/collector/src/main/java/com/navercorp/pinpoint/collector/service/async/AgentLifeCycleAsyncTaskService.java b/collector/src/main/java/com/navercorp/pinpoint/collector/service/async/AgentLifeCycleAsyncTaskService.java index 7a0b9269fa350..4a47e62854168 100644 --- a/collector/src/main/java/com/navercorp/pinpoint/collector/service/async/AgentLifeCycleAsyncTaskService.java +++ b/collector/src/main/java/com/navercorp/pinpoint/collector/service/async/AgentLifeCycleAsyncTaskService.java @@ -18,6 +18,7 @@ import com.navercorp.pinpoint.collector.config.CollectorProperties; import com.navercorp.pinpoint.collector.service.AgentLifeCycleService; +import com.navercorp.pinpoint.collector.service.ServiceGroupMapService; import com.navercorp.pinpoint.collector.service.StatisticsService; import com.navercorp.pinpoint.common.server.bo.AgentLifeCycleBo; import com.navercorp.pinpoint.common.server.util.AgentLifeCycleState; @@ -42,15 +43,18 @@ public class AgentLifeCycleAsyncTaskService { private final AgentLifeCycleService agentLifeCycleService; private final StatisticsService statisticsService; + private final ServiceGroupMapService serviceGroupMapService; private final ServiceTypeRegistryService registry; private final CollectorProperties collectorProperties; public AgentLifeCycleAsyncTaskService(AgentLifeCycleService agentLifeCycleService, StatisticsService statisticsService, + ServiceGroupMapService serviceGroupMapService, ServiceTypeRegistryService registry, CollectorProperties collectorProperties) { this.agentLifeCycleService = agentLifeCycleService; this.statisticsService = statisticsService; + this.serviceGroupMapService = serviceGroupMapService; this.registry = registry; this.collectorProperties = collectorProperties; } @@ -78,6 +82,7 @@ public void handleLifeCycleEvent(AgentProperty agentProperty, long eventTimestam final ServiceType serviceType = registry.findServiceType(agentProperty.getServiceType()); if (isUpdateAgentState(serviceType)) { statisticsService.updateAgentState(applicationName, serviceType, agentId); + serviceGroupMapService.updateAgentState("thisServiceGroup", applicationName, serviceType); } } diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseColumnFamily.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseColumnFamily.java index 1198996d531d0..f30d99204c8d1 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseColumnFamily.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseColumnFamily.java @@ -135,6 +135,30 @@ private SelfStatMap(HbaseTable hBaseTable, byte[] columnFamilyName) { } } + + public static final OutboundServiceMap MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_COUNTER = new OutboundServiceMap(HbaseTable.MAP_STATISTICS_OUTBOUND_SERVICE_GROUP, Bytes.toBytes("C")); + public static class OutboundServiceMap extends HbaseColumnFamily { + private OutboundServiceMap(HbaseTable hBaseTable, byte[] columnFamilyName) { + super(hBaseTable, columnFamilyName); + } + } + + public static final InboundServiceMap MAP_STATISTICS_INBOUND_SERVICE_GROUP_COUNTER = new InboundServiceMap(HbaseTable.MAP_STATISTICS_OUTBOUND_SERVICE_GROUP, Bytes.toBytes("C")); + public static class InboundServiceMap extends HbaseColumnFamily { + private InboundServiceMap(HbaseTable hBaseTable, byte[] columnFamilyName) { + super(hBaseTable, columnFamilyName); + } + } + + public static final SelfServiceMap MAP_STATISTICS_SELF_SERVICE_GROUP_COUNTER = new SelfServiceMap(HbaseTable.MAP_STATISTICS_OUTBOUND_SERVICE_GROUP, Bytes.toBytes("C")); + public static class SelfServiceMap extends HbaseColumnFamily { + private SelfServiceMap(HbaseTable hBaseTable, byte[] columnFamilyName) { + super(hBaseTable, columnFamilyName); + } + } + + + public static final SqlMetadataV2 SQL_METADATA_VER2_SQL = new SqlMetadataV2(HbaseTable.SQL_METADATA_VER2, Bytes.toBytes("Sql")); public static class SqlMetadataV2 extends HbaseColumnFamily { diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTable.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTable.java index 07990ce8494f7..6f4968c6fba89 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTable.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/HbaseTable.java @@ -35,6 +35,9 @@ public enum HbaseTable { MAP_STATISTICS_CALLEE_VER2("ApplicationMapStatisticsCallee_Ver2"), MAP_STATISTICS_CALLER_VER2("ApplicationMapStatisticsCaller_Ver2"), MAP_STATISTICS_SELF_VER2("ApplicationMapStatisticsSelf_Ver2"), + MAP_STATISTICS_OUTBOUND_SERVICE_GROUP("ServiceGroupMapStatisticsOutbound"), + MAP_STATISTICS_INBOUND_SERVICE_GROUP("ServiceGroupMapStatisticsInbound"), + MAP_STATISTICS_SELF_SERVICE_GROUP("ServiceGroupMapStatisticsSelf"), SQL_METADATA_VER2("SqlMetaData_Ver2"), SQL_UID_METADATA("SqlUidMetaData"), STRING_METADATA("StringMetaData"), diff --git a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/DistributorConfiguration.java b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/DistributorConfiguration.java index 9af9c9e880994..6e0a5e5469c85 100644 --- a/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/DistributorConfiguration.java +++ b/commons-hbase/src/main/java/com/navercorp/pinpoint/common/hbase/config/DistributorConfiguration.java @@ -80,6 +80,24 @@ public RowKeyDistributorByHashPrefix statisticsSelfRowKeyDistributor() { return new RowKeyDistributorByHashPrefix(hasher); } + @Bean + public RowKeyDistributorByHashPrefix serviceGroupInboundRowKeyDistributor() { + RowKeyDistributorByHashPrefix.Hasher hasher = newRangeOneByteSimpleHash(0, 36, 32); + return new RowKeyDistributorByHashPrefix(hasher); + } + + @Bean + public RowKeyDistributorByHashPrefix serviceGroupOutboundRowKeyDistributor() { + RowKeyDistributorByHashPrefix.Hasher hasher = newRangeOneByteSimpleHash(0, 36, 32); + return new RowKeyDistributorByHashPrefix(hasher); + } + + @Bean + public RowKeyDistributorByHashPrefix serviceGroupSelfRowKeyDistributor() { + RowKeyDistributorByHashPrefix.Hasher hasher = newRangeOneByteSimpleHash(0, 32, 8); + return new RowKeyDistributorByHashPrefix(hasher); + } + private RowKeyDistributorByHashPrefix.Hasher newRangeOneByteSimpleHash(int start, int end, int maxBuckets) { return new RangeOneByteSimpleHash(start, end, maxBuckets); } diff --git a/commons-server/src/main/java/com/navercorp/pinpoint/common/server/util/ApplicationMapStatisticsUtils.java b/commons-server/src/main/java/com/navercorp/pinpoint/common/server/util/ApplicationMapStatisticsUtils.java index ff1da6a159741..d8f26ec47f3c0 100644 --- a/commons-server/src/main/java/com/navercorp/pinpoint/common/server/util/ApplicationMapStatisticsUtils.java +++ b/commons-server/src/main/java/com/navercorp/pinpoint/common/server/util/ApplicationMapStatisticsUtils.java @@ -76,7 +76,6 @@ public static byte[] makeColumnName(String agentId, short columnSlotNumber) { return buffer.getBuffer(); } - private static short findResponseHistogramSlotNo(ServiceType serviceType, int elapsed, boolean isError, boolean isPing) { Objects.requireNonNull(serviceType, "serviceType"); @@ -136,11 +135,11 @@ public static String getHost(byte[] bytes) { public static byte[] makeRowKey(String applicationName, short applicationType, long timestamp) { Objects.requireNonNull(applicationName, "applicationName"); - final byte[] applicationNameBytes= BytesUtils.toBytes(applicationName); + final byte[] applicationNameBytes = BytesUtils.toBytes(applicationName); final Buffer buffer = new AutomaticBuffer(2 + applicationNameBytes.length + 2 + 8); // buffer.put2PrefixedString(applicationName); - buffer.putShort((short)applicationNameBytes.length); + buffer.putShort((short) applicationNameBytes.length); buffer.putBytes(applicationNameBytes); buffer.putShort(applicationType); long reverseTimeMillis = TimeUtils.reverseTimeMillis(timestamp); @@ -148,6 +147,8 @@ public static byte[] makeRowKey(String applicationName, short applicationType, l return buffer.getBuffer(); } + + public static String getApplicationNameFromRowKey(byte[] bytes, int offset) { Objects.requireNonNull(bytes, "bytes"); diff --git a/commons-server/src/main/java/com/navercorp/pinpoint/common/server/util/ServiceGroupMapUtils.java b/commons-server/src/main/java/com/navercorp/pinpoint/common/server/util/ServiceGroupMapUtils.java new file mode 100644 index 0000000000000..b5192c18c1d77 --- /dev/null +++ b/commons-server/src/main/java/com/navercorp/pinpoint/common/server/util/ServiceGroupMapUtils.java @@ -0,0 +1,62 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.common.server.util; + +import com.navercorp.pinpoint.common.buffer.AutomaticBuffer; +import com.navercorp.pinpoint.common.buffer.Buffer; +import com.navercorp.pinpoint.common.util.BytesUtils; +import com.navercorp.pinpoint.common.util.TimeUtils; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +public class ServiceGroupMapUtils { + public static byte[] makeRowKey( + String serviceName, + String applicationName, short applicationType, long timestamp + ) { + Objects.requireNonNull(serviceName, "serviceName"); + Objects.requireNonNull(applicationName, "applicationName"); + + final byte[] serviceNameBytes = BytesUtils.toBytes(serviceName); + final byte[] applicationNameBytes = BytesUtils.toBytes(applicationName); + + final Buffer buffer = new AutomaticBuffer(24); + buffer.putShort((short) serviceNameBytes.length); + buffer.putBytes(serviceNameBytes); + buffer.putShort((short) applicationNameBytes.length); + buffer.putBytes(applicationNameBytes); + buffer.putShort(applicationType); + long reverseTimeMillis = TimeUtils.reverseTimeMillis(timestamp); + buffer.putLong(reverseTimeMillis); + return buffer.getBuffer(); + } + + public static byte[] makeColumnName( + String serviceName, + String applicationName, short applicationType, + short columnSlotNumber + ) { + final Buffer buffer = new AutomaticBuffer(64); + buffer.putPrefixedString(serviceName); + buffer.putShort(applicationType); + buffer.putPrefixedString(applicationName); + buffer.putShort(columnSlotNumber); + return buffer.getBuffer(); + } +} diff --git a/hbase/scripts/hbase-create-snappy.hbase b/hbase/scripts/hbase-create-snappy.hbase index 7d31f1b6fa4ca..31796fda069d9 100644 --- a/hbase/scripts/hbase-create-snappy.hbase +++ b/hbase/scripts/hbase-create-snappy.hbase @@ -21,6 +21,10 @@ create 'ApplicationMapStatisticsCaller_Ver2', { NAME => 'C', TTL => 5184000, VER create 'ApplicationMapStatisticsCallee_Ver2', { NAME => 'C', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x12\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]} create 'ApplicationMapStatisticsSelf_Ver2', { NAME => 'C', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]} +create 'ServiceGroupMapStatisticsOutbound', { NAME => 'C', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' } +create 'ServiceGroupMapStatisticsInbound', { NAME => 'C', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' } +create 'ServiceGroupMapStatisticsSelf', { NAME => 'C', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' } + create 'HostApplicationMap_Ver2', { NAME => 'M', TTL => 5184000, VERSIONS => 1, COMPRESSION => 'SNAPPY', DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]} list diff --git a/hbase/scripts/hbase-create.hbase b/hbase/scripts/hbase-create.hbase index d8abbdb0a0d5c..39f265755faca 100644 --- a/hbase/scripts/hbase-create.hbase +++ b/hbase/scripts/hbase-create.hbase @@ -20,6 +20,10 @@ create 'ApplicationMapStatisticsCaller_Ver2', { NAME => 'C', TTL => 5184000, VER create 'ApplicationMapStatisticsCallee_Ver2', { NAME => 'C', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x08\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x0e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x10\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x12\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x14\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x16\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x18\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1c\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x1e\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]} create 'ApplicationMapStatisticsSelf_Ver2', { NAME => 'C', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x04\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x05\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x07\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]} +create 'ServiceGroupMapStatisticsOutbound', { NAME => 'C', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' } +create 'ServiceGroupMapStatisticsInbound', { NAME => 'C', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' } +create 'ServiceGroupMapStatisticsSelf', { NAME => 'C', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' } + create 'HostApplicationMap_Ver2', { NAME => 'M', TTL => 5184000, VERSIONS => 1, DATA_BLOCK_ENCODING => 'PREFIX' }, {SPLITS=>["\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x02\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00","\x03\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00"]} list diff --git a/hbase/scripts/hbase-drop.hbase b/hbase/scripts/hbase-drop.hbase index a2a3bcb9c9281..e3835283df9c0 100644 --- a/hbase/scripts/hbase-drop.hbase +++ b/hbase/scripts/hbase-drop.hbase @@ -18,6 +18,10 @@ disable 'ApplicationMapStatisticsCaller_Ver2' disable 'ApplicationMapStatisticsCallee_Ver2' disable 'ApplicationMapStatisticsSelf_Ver2' +disable 'ServiceGroupMapStatisticsOutbound' +disable 'ServiceGroupMapStatisticsInbound' +disable 'ServiceGroupMapStatisticsSelf' + disable 'HostApplicationMap_Ver2' @@ -41,6 +45,10 @@ drop 'ApplicationMapStatisticsCaller_Ver2' drop 'ApplicationMapStatisticsCallee_Ver2' drop 'ApplicationMapStatisticsSelf_Ver2' +drop 'ServiceGroupMapStatisticsOutbound' +drop 'ServiceGroupMapStatisticsInbound' +drop 'ServiceGroupMapStatisticsSelf' + drop 'HostApplicationMap_Ver2' exit diff --git a/hbase/scripts/hbase-flush-table.hbase b/hbase/scripts/hbase-flush-table.hbase index a58170bd28825..8b399501ae86f 100644 --- a/hbase/scripts/hbase-flush-table.hbase +++ b/hbase/scripts/hbase-flush-table.hbase @@ -16,6 +16,10 @@ flush 'ApplicationMapStatisticsCaller_Ver2' flush 'ApplicationMapStatisticsCallee_Ver2' flush 'ApplicationMapStatisticsSelf_Ver2' +flush 'ServiceGroupMapStatisticsOutbound' +flush 'ServiceGroupMapStatisticsInbound' +flush 'ServiceGroupMapStatisticsSelf' + flush 'HostApplicationMap_Ver2' exit diff --git a/hbase/scripts/hbase-major-compact-htable.hbase b/hbase/scripts/hbase-major-compact-htable.hbase index 0ad6259646762..1e168751b94ba 100644 --- a/hbase/scripts/hbase-major-compact-htable.hbase +++ b/hbase/scripts/hbase-major-compact-htable.hbase @@ -19,6 +19,10 @@ major_compact 'ApplicationMapStatisticsCaller_Ver2' major_compact 'ApplicationMapStatisticsCallee_Ver2' major_compact 'ApplicationMapStatisticsSelf_Ver2' +major_compact 'ServiceGroupMapStatisticsOutbound' +major_compact 'ServiceGroupMapStatisticsInbound' +major_compact 'ServiceGroupMapStatisticsSelf' + major_compact 'HostApplicationMap_Ver2' exit diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/ServiceGroupApplicationMap.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/ServiceGroupApplicationMap.java new file mode 100644 index 0000000000000..79ec60ec0f676 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/ServiceGroupApplicationMap.java @@ -0,0 +1,37 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.applicationmap; + +import com.navercorp.pinpoint.web.applicationmap.link.Link; +import com.navercorp.pinpoint.web.applicationmap.nodes.Node; + +import java.util.Collection; + +/** + * @author intr3p1d + */ +public class ServiceGroupApplicationMap implements ApplicationMap { + + @Override + public Collection getNodes() { + return null; + } + + @Override + public Collection getLinks() { + return null; + } +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/ServiceGroupInboundDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/ServiceGroupInboundDao.java new file mode 100644 index 0000000000000..c7679bb47dd85 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/ServiceGroupInboundDao.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.applicationmap.dao; + +import com.navercorp.pinpoint.common.server.util.time.Range; +import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; +import com.navercorp.pinpoint.web.vo.Application; + +/** + * @author intr3p1d + */ +public interface ServiceGroupInboundDao { + LinkDataMap selectInbound(Application destApplication, Range range, boolean timeAggregated); +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/ServiceGroupOutboundDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/ServiceGroupOutboundDao.java new file mode 100644 index 0000000000000..60117017df19b --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/ServiceGroupOutboundDao.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.applicationmap.dao; + +import com.navercorp.pinpoint.common.server.util.time.Range; +import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; +import com.navercorp.pinpoint.web.vo.Application; + +/** + * @author intr3p1d + */ +public interface ServiceGroupOutboundDao { + LinkDataMap selectOutboud(Application callerApplication, Range range, boolean timeAggregated); +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/ServiceGroupResponseDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/ServiceGroupResponseDao.java new file mode 100644 index 0000000000000..bafd01a2aaff3 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/ServiceGroupResponseDao.java @@ -0,0 +1,22 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.applicationmap.dao; + +/** + * @author intr3p1d + */ +public interface ServiceGroupResponseDao { +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseServiceGroupInboundDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseServiceGroupInboundDao.java new file mode 100644 index 0000000000000..b25c0c85cabf4 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseServiceGroupInboundDao.java @@ -0,0 +1,130 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.applicationmap.dao.hbase; + +import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily; +import com.navercorp.pinpoint.common.hbase.HbaseOperations; +import com.navercorp.pinpoint.common.hbase.ResultsExtractor; +import com.navercorp.pinpoint.common.hbase.RowMapper; +import com.navercorp.pinpoint.common.hbase.TableNameProvider; +import com.navercorp.pinpoint.common.server.util.ServiceGroupMapUtils; +import com.navercorp.pinpoint.common.server.util.time.Range; +import com.navercorp.pinpoint.web.applicationmap.dao.ServiceGroupInboundDao; +import com.navercorp.pinpoint.web.applicationmap.dao.mapper.MapStatisticsTimeWindowReducer; +import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection; +import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; +import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMapUtils; +import com.navercorp.pinpoint.web.mapper.RowMapReduceResultExtractor; +import com.navercorp.pinpoint.web.util.TimeWindow; +import com.navercorp.pinpoint.web.util.TimeWindowDownSampler; +import com.navercorp.pinpoint.web.vo.Application; +import com.navercorp.pinpoint.web.vo.RangeFactory; +import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Repository; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Repository +public class HbaseServiceGroupInboundDao implements ServiceGroupInboundDao { + + private static final int MAP_STATISTICS_INBOUND_SERVICE_GROUP_NUM_PARTITIONS = 32; + private static final int SCAN_CACHE_SIZE = 40; + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private static final HbaseColumnFamily.InboundServiceMap DESCRIPTOR = HbaseColumnFamily.MAP_STATISTICS_INBOUND_SERVICE_GROUP_COUNTER; + + private final HbaseOperations hbaseTemplate; + private final TableNameProvider tableNameProvider; + + private final RowMapper mapStatisticsCalleeMapper; + private final RowMapper mapStatisticsCalleeTimeAggregatedMapper; + + private final RangeFactory rangeFactory; + + private final RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + + + public HbaseServiceGroupInboundDao( + @Qualifier("mapHbaseTemplate") HbaseOperations hbaseTemplate, + TableNameProvider tableNameProvider, + @Qualifier("serviceGroupInboundMapper") RowMapper mapStatisticsCalleeMapper, + @Qualifier("serviceGroupInboundTimeAggregatedMapper") RowMapper mapStatisticsCalleeTimeAggregatedMapper, + RangeFactory rangeFactory, + @Qualifier("serviceGroupInboundRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix) { + this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate"); + this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + this.mapStatisticsCalleeMapper = Objects.requireNonNull(mapStatisticsCalleeMapper, "mapStatisticsCalleeMapper"); + this.mapStatisticsCalleeTimeAggregatedMapper = Objects.requireNonNull(mapStatisticsCalleeTimeAggregatedMapper, "mapStatisticsCalleeTimeAggregatedMapper"); + this.rangeFactory = Objects.requireNonNull(rangeFactory, "rangeFactory"); + this.rowKeyDistributorByHashPrefix = Objects.requireNonNull(rowKeyDistributorByHashPrefix, "rowKeyDistributorByHashPrefix"); + } + + @Override + public LinkDataMap selectInbound(Application destApplication, Range range, boolean timeAggregated) { + Objects.requireNonNull(destApplication, "destApplication"); + Objects.requireNonNull(range, "range"); + + final TimeWindow timeWindow = new TimeWindow(range, TimeWindowDownSampler.SAMPLER); + // find distributed key - ver2. + final Scan scan = createScan(destApplication, range, DESCRIPTOR.getName()); + + + ResultsExtractor resultsExtractor; + if (timeAggregated) { + resultsExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCalleeTimeAggregatedMapper, new MapStatisticsTimeWindowReducer(timeWindow)); + } else { + resultsExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCalleeMapper, new MapStatisticsTimeWindowReducer(timeWindow)); + } + + TableName serviceGroupInboundTableName = tableNameProvider.getTableName(DESCRIPTOR.getTable()); + LinkDataMap linkDataMap = hbaseTemplate.findParallel(serviceGroupInboundTableName, scan, rowKeyDistributorByHashPrefix, resultsExtractor, MAP_STATISTICS_INBOUND_SERVICE_GROUP_NUM_PARTITIONS); + logger.debug("{} data. {}, {}", LinkDirection.IN_LINK, linkDataMap, range); + if (LinkDataMapUtils.hasLength(linkDataMap)) { + return linkDataMap; + } + return new LinkDataMap(); + } + + private Scan createScan(Application application, Range range, byte[] family) { + range = rangeFactory.createStatisticsRange(range); + + if (logger.isDebugEnabled()) { + logger.debug("scan time:{} ", range.prettyToString()); + } + + // start key is replaced by end key because timestamp has been reversed + byte[] startKey = ServiceGroupMapUtils.makeRowKey("default", application.getName(), application.getServiceTypeCode(), range.getTo()); + byte[] endKey = ServiceGroupMapUtils.makeRowKey("default", application.getName(), application.getServiceTypeCode(), range.getFrom()); + + Scan scan = new Scan(); + scan.setCaching(SCAN_CACHE_SIZE); + scan.withStartRow(startKey); + scan.withStopRow(endKey); + scan.addFamily(family); + scan.setId("ServiceGroupMapScan"); + + return scan; + } +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseServiceGroupOutboundDao.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseServiceGroupOutboundDao.java new file mode 100644 index 0000000000000..2cd175430c13b --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/hbase/HbaseServiceGroupOutboundDao.java @@ -0,0 +1,138 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.applicationmap.dao.hbase; + +import com.navercorp.pinpoint.common.hbase.HbaseColumnFamily; +import com.navercorp.pinpoint.common.hbase.HbaseOperations; +import com.navercorp.pinpoint.common.hbase.ResultsExtractor; +import com.navercorp.pinpoint.common.hbase.RowMapper; +import com.navercorp.pinpoint.common.hbase.TableNameProvider; +import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; +import com.navercorp.pinpoint.common.server.util.ServiceGroupMapUtils; +import com.navercorp.pinpoint.common.server.util.time.Range; +import com.navercorp.pinpoint.web.applicationmap.dao.ServiceGroupOutboundDao; +import com.navercorp.pinpoint.web.applicationmap.dao.mapper.MapStatisticsTimeWindowReducer; +import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection; +import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; +import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMapUtils; +import com.navercorp.pinpoint.web.mapper.RowMapReduceResultExtractor; +import com.navercorp.pinpoint.web.util.TimeWindow; +import com.navercorp.pinpoint.web.util.TimeWindowDownSampler; +import com.navercorp.pinpoint.web.vo.Application; +import com.navercorp.pinpoint.web.vo.RangeFactory; +import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Repository; + +import java.util.Objects; + +/** + * @author intr3p1d + */ +@Repository +public class HbaseServiceGroupOutboundDao implements ServiceGroupOutboundDao { + private static final int MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_NUM_PARTITIONS = 32; + private static final int SCAN_CACHE_SIZE = 40; + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private static final HbaseColumnFamily.OutboundServiceMap DESCRIPTOR = HbaseColumnFamily.MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_COUNTER; + + private final HbaseOperations hbaseTemplate; + private final TableNameProvider tableNameProvider; + + private final RowMapper mapStatisticsCallerMapper; + private final RowMapper mapStatisticsCallerTimeAggregatedMapper; + + private final RangeFactory rangeFactory; + + private final RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + + + public HbaseServiceGroupOutboundDao( + @Qualifier("mapHbaseTemplate") HbaseOperations hbaseTemplate, + TableNameProvider tableNameProvider, + @Qualifier("serviceGroupOutboundMapper") RowMapper mapStatisticsCallerMapper, + @Qualifier("serviceGroupOutboundTimeAggregatedMapper") RowMapper mapStatisticsCallerTimeAggregatedMapper, + RangeFactory rangeFactory, + @Qualifier("serviceGroupOutboundRowKeyDistributor") RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix + ) { + this.hbaseTemplate = Objects.requireNonNull(hbaseTemplate, "hbaseTemplate"); + this.tableNameProvider = Objects.requireNonNull(tableNameProvider, "tableNameProvider"); + this.mapStatisticsCallerMapper = Objects.requireNonNull(mapStatisticsCallerMapper, "mapStatisticsCallerMapper"); + this.mapStatisticsCallerTimeAggregatedMapper = Objects.requireNonNull(mapStatisticsCallerTimeAggregatedMapper, "mapStatisticsTimeAggregatedCallerMapper"); + this.rangeFactory = Objects.requireNonNull(rangeFactory, "rangeFactory"); + this.rowKeyDistributorByHashPrefix = Objects.requireNonNull(rowKeyDistributorByHashPrefix, "rowKeyDistributorByHashPrefix"); + } + + + @Override + public LinkDataMap selectOutboud(Application callerApplication, Range range, boolean timeAggregated) { Objects.requireNonNull(callerApplication, "callerApplication"); + Objects.requireNonNull(callerApplication, "callerApplication"); + Objects.requireNonNull(range, "range"); + + final TimeWindow timeWindow = new TimeWindow(range, TimeWindowDownSampler.SAMPLER); + // find distributed key. + final Scan scan = createScan(callerApplication, range, DESCRIPTOR.getName()); + + ResultsExtractor resultsExtractor; + if (timeAggregated) { + resultsExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCallerTimeAggregatedMapper, new MapStatisticsTimeWindowReducer(timeWindow)); + } else { + resultsExtractor = new RowMapReduceResultExtractor<>(mapStatisticsCallerMapper, new MapStatisticsTimeWindowReducer(timeWindow)); + } + + TableName serviceGroupOutboundTableName = tableNameProvider.getTableName(DESCRIPTOR.getTable()); + LinkDataMap linkDataMap = this.hbaseTemplate.findParallel(serviceGroupOutboundTableName, scan, rowKeyDistributorByHashPrefix, resultsExtractor, MAP_STATISTICS_OUTBOUND_SERVICE_GROUP_NUM_PARTITIONS); + logger.debug("tableInfo({}). {} data. {}, {} : ", serviceGroupOutboundTableName.getNameAsString(), LinkDirection.OUT_LINK, linkDataMap, range ); + + if (LinkDataMapUtils.hasLength(linkDataMap)) { + return linkDataMap; + } + + return new LinkDataMap(); + } + + private Scan createScan(Application application, Range range, byte[]... familyArgs) { + + range = rangeFactory.createStatisticsRange(range); + + if (logger.isDebugEnabled()) { + logger.debug("scan Time:{}", range.prettyToString()); + } + + // start key is replaced by end key because timestamp has been reversed + byte[] startKey = ServiceGroupMapUtils.makeRowKey("default", application.getName(), application.getServiceTypeCode(), range.getTo()); + byte[] endKey = ServiceGroupMapUtils.makeRowKey("default", application.getName(), application.getServiceTypeCode(), range.getFrom()); + + Scan scan = new Scan(); + scan.setCaching(SCAN_CACHE_SIZE); + scan.withStartRow(startKey); + scan.withStopRow(endKey); + for (byte[] family : familyArgs) { + scan.addFamily(family); + } + scan.setId("ServiceGroupMapScan"); + + return scan; + + + } +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupInboundMapper.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupInboundMapper.java new file mode 100644 index 0000000000000..e135737024485 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupInboundMapper.java @@ -0,0 +1,136 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.applicationmap.dao.mapper; + +import com.navercorp.pinpoint.common.buffer.Buffer; +import com.navercorp.pinpoint.common.buffer.FixedBuffer; +import com.navercorp.pinpoint.common.hbase.RowMapper; +import com.navercorp.pinpoint.common.hbase.util.CellUtils; +import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; +import com.navercorp.pinpoint.common.trace.ServiceType; +import com.navercorp.pinpoint.common.util.TimeUtils; +import com.navercorp.pinpoint.loader.service.ServiceTypeRegistryService; +import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection; +import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; +import com.navercorp.pinpoint.web.component.ApplicationFactory; +import com.navercorp.pinpoint.web.vo.Application; +import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +/** + * @author intr3p1d + */ +@Component +public class ServiceGroupInboundMapper implements RowMapper { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final LinkFilter filter; + + @Autowired + private ServiceTypeRegistryService registry; + + @Autowired + private ApplicationFactory applicationFactory; + + @Autowired + @Qualifier("serviceGroupInboundRowKeyDistributor") + private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + + public ServiceGroupInboundMapper() { + this(LinkFilter::skip); + } + + public ServiceGroupInboundMapper(LinkFilter filter) { + this.filter = filter; + } + + @Override + public LinkDataMap mapRow(Result result, int rowNum) throws Exception { + if (result.isEmpty()) { + return new LinkDataMap(); + } + logger.debug("mapRow: {}", rowNum); + + final byte[] rowKey = getOriginalKey(result.getRow()); + + final Buffer row = new FixedBuffer(rowKey); + final Application destApplication = readDestApplication(row); + final long timestamp = TimeUtils.recoveryTimeMillis(row.readLong()); + + final LinkDataMap linkDataMap = new LinkDataMap(); + for (Cell cell : result.rawCells()) { + + final byte[] qualifier = CellUtil.cloneQualifier(cell); + final Application srcApplication = readSourceApplication(qualifier, destApplication.getServiceType()); + if (filter.filter(srcApplication)) { + continue; + } + + long requestCount = CellUtils.valueToLong(cell); + short histogramSlot = ApplicationMapStatisticsUtils.getHistogramSlotFromColumnName(qualifier); + + String srcHost = srcApplication.getName(); + String destHost = destApplication.getName(); + + boolean isError = histogramSlot == (short) -1; + + if (logger.isDebugEnabled()) { + logger.debug(" Fetched {}. {} srcHost:{} -> {} (slot:{}/{}), ", LinkDirection.IN_LINK, srcApplication, srcHost, destApplication, histogramSlot, requestCount); + } + + final short slotTime = (isError) ? (short) -1 : histogramSlot; + linkDataMap.addLinkData(srcApplication, srcApplication.getName(), destApplication, srcHost, timestamp, slotTime, requestCount); + + if (logger.isDebugEnabled()) { + logger.debug(" Fetched {}. statistics:{}", LinkDirection.IN_LINK, linkDataMap); + } + } + + return linkDataMap; + } + + private Application readSourceApplication(byte[] qualifier, ServiceType destServiceType) { + short srcServiceType = ApplicationMapStatisticsUtils.getDestServiceTypeFromColumnName(qualifier); + // Caller may be a user node, and user nodes may call nodes with the same application name but different service type. + // To distinguish between these user nodes, append callee's service type to the application name. + String srcApplicationName; + if (registry.findServiceType(srcServiceType).isUser()) { + srcApplicationName = ApplicationMapStatisticsUtils.getDestApplicationNameFromColumnNameForUser(qualifier, destServiceType); + } else { + srcApplicationName = ApplicationMapStatisticsUtils.getDestApplicationNameFromColumnName(qualifier); + } + return applicationFactory.createApplication(srcApplicationName, srcServiceType); + } + + private Application readDestApplication(Buffer row) { + String serviceName = row.read2PrefixedString(); + String applicationName = row.read2PrefixedString(); + short serviceType = row.readShort(); + return applicationFactory.createApplication(applicationName, serviceType); + } + + private byte[] getOriginalKey(byte[] rowKey) { + return rowKeyDistributorByHashPrefix.getOriginalKey(rowKey); + } +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupInboundTimeAggregatedMapper.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupInboundTimeAggregatedMapper.java new file mode 100644 index 0000000000000..bf843652780bb --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupInboundTimeAggregatedMapper.java @@ -0,0 +1,135 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.applicationmap.dao.mapper; + +import com.navercorp.pinpoint.common.buffer.Buffer; +import com.navercorp.pinpoint.common.buffer.FixedBuffer; +import com.navercorp.pinpoint.common.hbase.RowMapper; +import com.navercorp.pinpoint.common.hbase.util.CellUtils; +import com.navercorp.pinpoint.common.server.util.ApplicationMapStatisticsUtils; +import com.navercorp.pinpoint.common.trace.ServiceType; +import com.navercorp.pinpoint.loader.service.ServiceTypeRegistryService; +import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection; +import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; +import com.navercorp.pinpoint.web.component.ApplicationFactory; +import com.navercorp.pinpoint.web.vo.Application; +import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.client.Result; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +/** + * @author intr3p1d + */ +@Component +public class ServiceGroupInboundTimeAggregatedMapper implements RowMapper { + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final LinkFilter filter; + + @Autowired + private ServiceTypeRegistryService registry; + + @Autowired + private ApplicationFactory applicationFactory; + + @Autowired + @Qualifier("serviceGroupInboundRowKeyDistributor") + private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + + public ServiceGroupInboundTimeAggregatedMapper() { + this(LinkFilter::skip); + } + + public ServiceGroupInboundTimeAggregatedMapper(LinkFilter filter) { + this.filter = filter; + } + + @Override + public LinkDataMap mapRow(Result result, int rowNum) throws Exception { + if (result.isEmpty()) { + return new LinkDataMap(); + } + logger.debug("mapRow: {}", rowNum); + + final byte[] rowKey = getOriginalKey(result.getRow()); + + final Buffer row = new FixedBuffer(rowKey); + final Application destApplication = readDestApplication(row); + final long timestamp = 0; // time aggregated + + final LinkDataMap linkDataMap = new LinkDataMap(); + for (Cell cell : result.rawCells()) { + + final byte[] qualifier = CellUtil.cloneQualifier(cell); + final Application srcApplication = readSourceApplication(qualifier, destApplication.getServiceType()); + if (filter.filter(srcApplication)) { + continue; + } + + long requestCount = CellUtils.valueToLong(cell); + short histogramSlot = ApplicationMapStatisticsUtils.getHistogramSlotFromColumnName(qualifier); + + String srcHost = srcApplication.getName(); + String destHost = destApplication.getName(); + + boolean isError = histogramSlot == (short) -1; + + if (logger.isDebugEnabled()) { + logger.debug(" Fetched {}. {} srcHost:{} -> {} (slot:{}/{}), ", LinkDirection.IN_LINK, srcApplication, srcHost, destApplication, histogramSlot, requestCount); + } + + final short slotTime = (isError) ? (short) -1 : histogramSlot; + linkDataMap.addLinkData(srcApplication, srcApplication.getName(), destApplication, srcHost, timestamp, slotTime, requestCount); + + if (logger.isDebugEnabled()) { + logger.debug(" Fetched {}. statistics:{}", LinkDirection.IN_LINK, linkDataMap); + } + } + + return linkDataMap; + } + + private Application readSourceApplication(byte[] qualifier, ServiceType destServiceType) { + short srcServiceType = ApplicationMapStatisticsUtils.getDestServiceTypeFromColumnName(qualifier); + // Caller may be a user node, and user nodes may call nodes with the same application name but different service type. + // To distinguish between these user nodes, append callee's service type to the application name. + String srcApplicationName; + if (registry.findServiceType(srcServiceType).isUser()) { + srcApplicationName = ApplicationMapStatisticsUtils.getDestApplicationNameFromColumnNameForUser(qualifier, destServiceType); + } else { + srcApplicationName = ApplicationMapStatisticsUtils.getDestApplicationNameFromColumnName(qualifier); + } + return applicationFactory.createApplication(srcApplicationName, srcServiceType); + } + + private Application readDestApplication(Buffer row) { + String serviceName = row.read2PrefixedString(); + String applicationName = row.read2PrefixedString(); + short serviceType = row.readShort(); + return applicationFactory.createApplication(applicationName, serviceType); + } + + private byte[] getOriginalKey(byte[] rowKey) { + return rowKeyDistributorByHashPrefix.getOriginalKey(rowKey); + } + +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupOutboundMapper.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupOutboundMapper.java new file mode 100644 index 0000000000000..b4c5dd425ae05 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupOutboundMapper.java @@ -0,0 +1,121 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.applicationmap.dao.mapper; + +import com.navercorp.pinpoint.common.buffer.Buffer; +import com.navercorp.pinpoint.common.buffer.FixedBuffer; +import com.navercorp.pinpoint.common.buffer.OffsetFixedBuffer; +import com.navercorp.pinpoint.common.hbase.RowMapper; +import com.navercorp.pinpoint.common.hbase.util.CellUtils; +import com.navercorp.pinpoint.common.util.TimeUtils; +import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection; +import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; +import com.navercorp.pinpoint.web.component.ApplicationFactory; +import com.navercorp.pinpoint.web.vo.Application; +import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Result; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +/** + * @author intr3p1d + */ +@Component +public class ServiceGroupOutboundMapper implements RowMapper { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final LinkFilter filter; + + @Autowired + private ApplicationFactory applicationFactory; + + @Autowired + @Qualifier("serviceGroupOutboundRowKeyDistributor") + private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + + public ServiceGroupOutboundMapper() { + this(LinkFilter::skip); + } + + public ServiceGroupOutboundMapper(LinkFilter filter) { + this.filter = filter; + } + + @Override + public LinkDataMap mapRow(Result result, int rowNum) throws Exception { + if (result.isEmpty()) { + return new LinkDataMap(); + } + + logger.debug("mapRow: {}", rowNum); + final byte[] rowKey = getOriginalKey(result.getRow()); + + final Buffer row = new FixedBuffer(rowKey); + final Application caller = readCallerApplication(row); + final long timestamp = TimeUtils.recoveryTimeMillis(row.readLong()); + + // key is dest ApplicationName + final LinkDataMap linkDataMap = new LinkDataMap(); + for (Cell cell : result.rawCells()) { + final Buffer buffer = new OffsetFixedBuffer(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + final Application callee = readCalleeApplication(buffer); + if (filter.filter(callee)) { + continue; + } + + short histogramSlot = buffer.readShort(); + String callerAgentId = caller.getName(); + String calleeHost = callee.getName(); + + boolean isError = histogramSlot == (short) -1; + + long requestCount = CellUtils.valueToLong(cell); + if (logger.isDebugEnabled()) { + logger.debug(" Fetched {}.(New) {} {} -> {} (slot:{}/{}) calleeHost:{}", LinkDirection.OUT_LINK, caller, callerAgentId, callee, histogramSlot, requestCount, calleeHost); + } + + final short slotTime = (isError) ? (short) -1 : histogramSlot; + + linkDataMap.addLinkData(caller, callerAgentId, callee, calleeHost, timestamp, slotTime, requestCount); + } + + return linkDataMap; + } + + private Application readCallerApplication(Buffer row) { + String serviceName = row.read2PrefixedString(); + String applicationName = row.read2PrefixedString(); + short serviceType = row.readShort(); + return applicationFactory.createApplication(applicationName, serviceType); + } + + private Application readCalleeApplication(Buffer buffer) { + String serviceName = buffer.readPrefixedString(); + short serviceType = buffer.readShort(); + String applicationName = buffer.readPrefixedString(); + return applicationFactory.createApplication(applicationName, serviceType); + } + + private byte[] getOriginalKey(byte[] rowKey) { + return rowKeyDistributorByHashPrefix.getOriginalKey(rowKey); + } +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupOutboundTimeAggregatedMapper.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupOutboundTimeAggregatedMapper.java new file mode 100644 index 0000000000000..7a2fb6932bee8 --- /dev/null +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/dao/mapper/ServiceGroupOutboundTimeAggregatedMapper.java @@ -0,0 +1,123 @@ +/* + * Copyright 2024 NAVER Corp. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.navercorp.pinpoint.web.applicationmap.dao.mapper; + +import com.navercorp.pinpoint.common.buffer.Buffer; +import com.navercorp.pinpoint.common.buffer.FixedBuffer; +import com.navercorp.pinpoint.common.buffer.OffsetFixedBuffer; +import com.navercorp.pinpoint.common.hbase.RowMapper; +import com.navercorp.pinpoint.common.hbase.util.CellUtils; +import com.navercorp.pinpoint.common.util.TimeUtils; +import com.navercorp.pinpoint.web.applicationmap.link.LinkDirection; +import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; +import com.navercorp.pinpoint.web.component.ApplicationFactory; +import com.navercorp.pinpoint.web.vo.Application; +import com.sematext.hbase.wd.RowKeyDistributorByHashPrefix; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.client.Result; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.beans.factory.annotation.Qualifier; +import org.springframework.stereotype.Component; + +/** + * @author intr3p1d + */ +@Component +public class ServiceGroupOutboundTimeAggregatedMapper implements RowMapper { + + private final Logger logger = LogManager.getLogger(this.getClass()); + + private final LinkFilter filter; + + @Autowired + private ApplicationFactory applicationFactory; + + @Autowired + @Qualifier("serviceGroupOutboundRowKeyDistributor") + private RowKeyDistributorByHashPrefix rowKeyDistributorByHashPrefix; + + public ServiceGroupOutboundTimeAggregatedMapper() { + this(LinkFilter::skip); + } + + public ServiceGroupOutboundTimeAggregatedMapper(LinkFilter filter) { + this.filter = filter; + } + + + @Override + public LinkDataMap mapRow(Result result, int rowNum) throws Exception { + if (result.isEmpty()) { + return new LinkDataMap(); + } + + logger.debug("mapRow: {}", rowNum); + final byte[] rowKey = getOriginalKey(result.getRow()); + + final Buffer row = new FixedBuffer(rowKey); + final Application caller = readCallerApplication(row); + final long timestamp = 0; // aggregate timestamp + + // key is dest ApplicationName + final LinkDataMap linkDataMap = new LinkDataMap(); + for (Cell cell : result.rawCells()) { + final Buffer buffer = new OffsetFixedBuffer(cell.getQualifierArray(), cell.getQualifierOffset(), cell.getQualifierLength()); + final Application callee = readCalleeApplication(buffer); + if (filter.filter(callee)) { + continue; + } + + short histogramSlot = buffer.readShort(); + String callerAgentId = caller.getName(); + String calleeHost = callee.getName(); + + boolean isError = histogramSlot == (short) -1; + + long requestCount = CellUtils.valueToLong(cell); + if (logger.isDebugEnabled()) { + logger.debug(" Fetched {}.(New) {} {} -> {} (slot:{}/{}) calleeHost:{}", LinkDirection.OUT_LINK, caller, callerAgentId, callee, histogramSlot, requestCount, calleeHost); + } + + final short slotTime = (isError) ? (short) -1 : histogramSlot; + + linkDataMap.addLinkData(caller, callerAgentId, callee, calleeHost, timestamp, slotTime, requestCount); + } + + return linkDataMap; + } + + private Application readCallerApplication(Buffer row) { + String serviceName = row.read2PrefixedString(); + String applicationName = row.read2PrefixedString(); + short serviceType = row.readShort(); + return applicationFactory.createApplication(applicationName, serviceType); + } + + private Application readCalleeApplication(Buffer buffer) { + String serviceName = buffer.readPrefixedString(); + short serviceType = buffer.readShort(); + String applicationName = buffer.readPrefixedString(); + return applicationFactory.createApplication(applicationName, serviceType); + } + + private byte[] getOriginalKey(byte[] rowKey) { + return rowKeyDistributorByHashPrefix.getOriginalKey(rowKey); + } + + +} diff --git a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/service/LinkDataMapServiceImpl.java b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/service/LinkDataMapServiceImpl.java index cd8c09ba1070e..ce23b5ac435b7 100644 --- a/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/service/LinkDataMapServiceImpl.java +++ b/web/src/main/java/com/navercorp/pinpoint/web/applicationmap/service/LinkDataMapServiceImpl.java @@ -20,6 +20,8 @@ import com.navercorp.pinpoint.common.server.util.time.Range; import com.navercorp.pinpoint.web.applicationmap.dao.MapStatisticsCalleeDao; import com.navercorp.pinpoint.web.applicationmap.dao.MapStatisticsCallerDao; +import com.navercorp.pinpoint.web.applicationmap.dao.ServiceGroupInboundDao; +import com.navercorp.pinpoint.web.applicationmap.dao.ServiceGroupOutboundDao; import com.navercorp.pinpoint.web.applicationmap.rawdata.LinkDataMap; import com.navercorp.pinpoint.web.vo.Application; import org.springframework.stereotype.Service; @@ -36,18 +38,32 @@ public class LinkDataMapServiceImpl implements LinkDataMapService { private final MapStatisticsCalleeDao mapStatisticsCalleeDao; - public LinkDataMapServiceImpl(MapStatisticsCallerDao mapStatisticsCallerDao, MapStatisticsCalleeDao mapStatisticsCalleeDao) { + private final ServiceGroupOutboundDao serviceGroupOutboundDao; + + private final ServiceGroupInboundDao serviceGroupInboundDao; + + public LinkDataMapServiceImpl( + MapStatisticsCallerDao mapStatisticsCallerDao, MapStatisticsCalleeDao mapStatisticsCalleeDao, + ServiceGroupOutboundDao serviceGroupOutboundDao, ServiceGroupInboundDao serviceGroupInboundDao + ) { this.mapStatisticsCallerDao = Objects.requireNonNull(mapStatisticsCallerDao, "mapStatisticsCallerDao"); this.mapStatisticsCalleeDao = Objects.requireNonNull(mapStatisticsCalleeDao, "mapStatisticsCalleeDao"); + this.serviceGroupOutboundDao = Objects.requireNonNull(serviceGroupOutboundDao, "serviceGroupOutboundDao"); + this.serviceGroupInboundDao = Objects.requireNonNull(serviceGroupInboundDao, "serviceGroupInboundDao"); } @Override public LinkDataMap selectCallerLinkDataMap(Application application, Range range, boolean timeAggregated) { - return mapStatisticsCallerDao.selectCaller(application, range, timeAggregated); +// return mapStatisticsCallerDao.selectCaller(application, range, timeAggregated); + + LinkDataMap linkDataMap = mapStatisticsCallerDao.selectCaller(application, range, timeAggregated); + return serviceGroupInboundDao.selectInbound(application, range, timeAggregated); } @Override public LinkDataMap selectCalleeLinkDataMap(Application application, Range range, boolean timeAggregated) { - return mapStatisticsCalleeDao.selectCallee(application, range, timeAggregated); +// return mapStatisticsCalleeDao.selectCallee(application, range, timeAggregated); + LinkDataMap linkDataMap = mapStatisticsCalleeDao.selectCallee(application, range, timeAggregated); + return serviceGroupOutboundDao.selectOutboud(application, range, timeAggregated); } }