From dfb7faf8ecf7e6d427a3bf7f310fdb432421f710 Mon Sep 17 00:00:00 2001 From: peacewong Date: Sun, 26 Nov 2023 23:31:25 +0800 Subject: [PATCH] add creator task running number limit --- .../scheduler/EntranceFIFOUserConsumer.java | 91 ------------------- .../EntranceParallelConsumerManager.java | 35 ------- .../scheduler/EntranceFIFOUserConsumer.scala | 43 ++++++++- .../EntranceParallelConsumerManager.scala | 30 ++---- 4 files changed, 48 insertions(+), 151 deletions(-) delete mode 100644 linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.java delete mode 100644 linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.java diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.java deleted file mode 100644 index ac180d1aa3..0000000000 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.java +++ /dev/null @@ -1,91 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.entrance.scheduler; - -import org.apache.linkis.scheduler.SchedulerContext; -import org.apache.linkis.scheduler.queue.Consumer; -import org.apache.linkis.scheduler.queue.Group; -import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer; - -import java.util.concurrent.ExecutorService; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -public class EntranceFIFOUserConsumer extends FIFOUserConsumer { - - private static final Logger logger = LoggerFactory.getLogger(EntranceFIFOUserConsumer.class); - - public EntranceFIFOUserConsumer( - SchedulerContext schedulerContext, ExecutorService executeService, Group group) { - super(schedulerContext, executeService, group); - } - - @Override - public boolean runScheduleIntercept() { - Consumer[] consumers = getSchedulerContext().getOrCreateConsumerManager().listConsumers(); - int creatorRunningJobNum = 0; - - // APP_TEST_hadoop_hive or IDE_hadoop_hive - String groupNameStr = getGroup().getGroupName(); - String[] groupNames = groupNameStr.split("_"); - int length = groupNames.length; - if (length < 3) { - return true; - } - - // APP_TEST - int lastIndex = groupNameStr.lastIndexOf("_"); - int secondLastIndex = groupNameStr.lastIndexOf("_", lastIndex - 1); - String creatorName = groupNameStr.substring(0, secondLastIndex); - - // hive - String ecType = groupNames[length - 1]; - - for (Consumer consumer : consumers) { - String groupName = consumer.getGroup().getGroupName(); - if (groupName.startsWith(creatorName) && groupName.endsWith(ecType)) { - creatorRunningJobNum += consumer.getRunningEvents().length; - } - } - - int creatorECTypeMaxRunningJobs = - CreatorECTypeDefaultConf.getCreatorECTypeMaxRunningJobs(creatorName, ecType); - - if (logger.isDebugEnabled()) { - logger.debug( - "Creator: {} EC: {} there are currently:{} jobs running and maximum limit: {}", - creatorName, - ecType, - creatorRunningJobNum, - creatorECTypeMaxRunningJobs); - } - - if (creatorRunningJobNum > creatorECTypeMaxRunningJobs) { - logger.error( - "Creator: {} EC: {} there are currently:{} jobs running that exceed the maximum limit: {}", - creatorName, - ecType, - creatorRunningJobNum, - creatorECTypeMaxRunningJobs); - return false; - } else { - return true; - } - } -} diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.java deleted file mode 100644 index 98f0929ee9..0000000000 --- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.java +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.entrance.scheduler; - -import org.apache.linkis.scheduler.queue.Group; -import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer; -import org.apache.linkis.scheduler.queue.parallelqueue.ParallelConsumerManager; - -public class EntranceParallelConsumerManager extends ParallelConsumerManager { - - public EntranceParallelConsumerManager(int maxParallelismUsers, String schedulerName) { - super(maxParallelismUsers, schedulerName); - } - - @Override - public FIFOUserConsumer createConsumer(String groupName) { - Group group = getSchedulerContext().getOrCreateGroupFactory().getGroup(groupName); - return new EntranceFIFOUserConsumer(getSchedulerContext(), getOrCreateExecutorService(), group); - } -} diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.scala index e2f0ab1d5a..48f375ee4a 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.scala @@ -22,7 +22,7 @@ import org.apache.linkis.entrance.conf.EntranceConfiguration import org.apache.linkis.entrance.job.EntranceExecutionJob import org.apache.linkis.entrance.utils.JobHistoryHelper import org.apache.linkis.scheduler.SchedulerContext -import org.apache.linkis.scheduler.queue.Group +import org.apache.linkis.scheduler.queue.{Consumer, Group} import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer import java.util @@ -67,4 +67,45 @@ class EntranceFIFOUserConsumer( } + override def runScheduleIntercept: Boolean = { + val consumers = getSchedulerContext.getOrCreateConsumerManager.listConsumers + var creatorRunningJobNum = 0 + // APP_TEST_hadoop_hive or IDE_hadoop_hive + val groupNameStr = getGroup.getGroupName + val groupNames = groupNameStr.split("_") + val length = groupNames.length + if (length < 3) return true + // APP_TEST + val lastIndex = groupNameStr.lastIndexOf("_") + val secondLastIndex = groupNameStr.lastIndexOf("_", lastIndex - 1) + val creatorName = groupNameStr.substring(0, secondLastIndex) + // hive + val ecType = groupNames(length - 1) + for (consumer <- consumers) { + val groupName = consumer.getGroup.getGroupName + if (groupName.startsWith(creatorName) && groupName.endsWith(ecType)) + creatorRunningJobNum += consumer.getRunningEvents.length + } + val creatorECTypeMaxRunningJobs = + CreatorECTypeDefaultConf.getCreatorECTypeMaxRunningJobs(creatorName, ecType) + if (logger.isDebugEnabled) + logger.debug( + "Creator: {} EC: {} there are currently:{} jobs running and maximum limit: {}", + creatorName, + ecType, + creatorRunningJobNum, + creatorECTypeMaxRunningJobs + ) + if (creatorRunningJobNum > creatorECTypeMaxRunningJobs) { + logger.error( + "Creator: {} EC: {} there are currently:{} jobs running that exceed the maximum limit: {}", + creatorName, + ecType, + creatorRunningJobNum, + creatorECTypeMaxRunningJobs + ) + false + } else true + } + } diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala index 2cdee97cc7..728fc332f1 100644 --- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala +++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala @@ -20,6 +20,7 @@ package org.apache.linkis.entrance.scheduler import org.apache.linkis.common.ServiceInstance import org.apache.linkis.common.utils.Utils import org.apache.linkis.entrance.conf.EntranceConfiguration +import org.apache.linkis.entrance.utils.EntranceUtils import org.apache.linkis.instance.label.client.InstanceLabelClient import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext import org.apache.linkis.manager.label.constant.{LabelKeyConstant, LabelValueConstant} @@ -44,30 +45,11 @@ class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: S if (EntranceConfiguration.ENTRANCE_GROUP_SCAN_ENABLED.getValue) { Utils.defaultScheduler.scheduleAtFixedRate( - new Runnable { - override def run(): Unit = { - Utils.tryAndError { - logger.info("start refresh consumer group maxAllowRunningJobs") - // get all entrance server from eureka - val serviceInstances = - Sender.getInstances(Sender.getThisServiceInstance.getApplicationName) - if (null == serviceInstances || serviceInstances.isEmpty) return - - // get all offline label server - val routeLabel = LabelBuilderFactoryContext.getLabelBuilderFactory - .createLabel[RouteLabel](LabelKeyConstant.ROUTE_KEY, LabelValueConstant.OFFLINE_VALUE) - val labels = new util.ArrayList[Label[_]] - labels.add(routeLabel) - val labelInstances = InstanceLabelClient.getInstance.getInstanceFromLabel(labels) - - // get active entrance server - val allInstances = new util.ArrayList[ServiceInstance]() - allInstances.addAll(serviceInstances.toList.asJava) - allInstances.removeAll(labelInstances) - // refresh all group maxAllowRunningJobs - refreshAllGroupMaxAllowRunningJobs(allInstances.size()) - logger.info("Finished to refresh consumer group maxAllowRunningJobs") - } + () => { + Utils.tryAndError { + // refresh all group maxAllowRunningJobs + refreshAllGroupMaxAllowRunningJobs(EntranceUtils.getRunningEntranceNumber()) + logger.info("Finished to refresh consumer group maxAllowRunningJobs") } }, EntranceConfiguration.ENTRANCE_GROUP_SCAN_INIT_TIME.getValue,