ids =
+ jobRequests.stream().map(JobRequest::getId).collect(Collectors.toList());
logger.info("success query failover jobs , job size: {}, ids: {}", ids.size(), ids);
// failover to local server
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index efd5e76a45..8ef5c268b5 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -28,6 +28,7 @@ import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary
import org.apache.linkis.entrance.errorcode.EntranceErrorCodeSummary._
import org.apache.linkis.entrance.exception.{EntranceErrorException, SubmitFailedException}
import org.apache.linkis.entrance.execute.EntranceJob
+import org.apache.linkis.entrance.job.EntranceExecutionJob
import org.apache.linkis.entrance.log.{Cache, CacheLogWriter, HDFSCacheLogWriter, LogReader}
import org.apache.linkis.entrance.parser.ParserUtils
import org.apache.linkis.entrance.timeout.JobTimeoutManager
@@ -43,16 +44,16 @@ import org.apache.linkis.rpc.conf.RPCConfiguration
import org.apache.linkis.scheduler.queue.{Job, SchedulerEventState}
import org.apache.linkis.server.conf.ServerConfiguration
import org.apache.linkis.storage.utils.StorageUtils
+
import org.apache.commons.lang3.StringUtils
import org.apache.commons.lang3.exception.ExceptionUtils
-import org.apache.linkis.common.log.LogUtils
import org.springframework.beans.BeanUtils
-import org.apache.linkis.entrance.job.EntranceExecutionJob
import java.{lang, util}
import java.text.{MessageFormat, SimpleDateFormat}
import java.util.Date
+
import scala.collection.JavaConverters._
abstract class EntranceServer extends Logging {
@@ -280,8 +281,10 @@ abstract class EntranceServer extends Logging {
consumeQueueTasks.foreach(job => {
taskIds.add(job.getJobRequest.getId.asInstanceOf[Long])
job match {
- case entranceExecutionJob : EntranceExecutionJob =>
- val msg = LogUtils.generateWarn(s"job ${job.getJobRequest.getId} clean from ConsumeQueue, wait for failover")
+ case entranceExecutionJob: EntranceExecutionJob =>
+ val msg = LogUtils.generateWarn(
+ s"job ${job.getJobRequest.getId} clean from ConsumeQueue, wait for failover"
+ )
entranceExecutionJob.getLogListener.foreach(_.onLogUpdate(entranceExecutionJob, msg))
entranceExecutionJob.getLogWriter.foreach(_.close())
case _ =>
@@ -308,7 +311,9 @@ abstract class EntranceServer extends Logging {
}
val logAppender = new java.lang.StringBuilder()
- logAppender.append("*************************************FAILOVER**************************************")
+ logAppender.append(
+ "*************************************FAILOVER**************************************"
+ )
// try to kill ec
killOldEC(jobRequest, logAppender);
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index 959d8c68bc..13db69700f 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -233,12 +233,15 @@ object EntranceConfiguration {
val ENTRANCE_FAILOVER_SCAN_INTERVAL =
CommonVars("linkis.entrance.failover.scan.interval", 30 * 1000).getValue
- val ENTRANCE_FAILOVER_DATA_NUM_LIMIT = CommonVars("linkis.entrance.failover.data.num.limit", 10).getValue
+ val ENTRANCE_FAILOVER_DATA_NUM_LIMIT =
+ CommonVars("linkis.entrance.failover.data.num.limit", 10).getValue
- val ENTRANCE_FAILOVER_DATA_INTERVAL_TIME = CommonVars("linkis.entrance.failover.data.interval.time", new TimeType("7d").toLong).getValue
+ val ENTRANCE_FAILOVER_DATA_INTERVAL_TIME =
+ CommonVars("linkis.entrance.failover.data.interval.time", new TimeType("7d").toLong).getValue
// if true, the waitForRetry job in runningJobs can be failover
- val ENTRANCE_FAILOVER_RETRY_JOB_ENABLED = CommonVars("linkis.entrance.failover.retry.job.enable", true)
+ val ENTRANCE_FAILOVER_RETRY_JOB_ENABLED =
+ CommonVars("linkis.entrance.failover.retry.job.enable", true)
val ENTRANCE_UPDATE_BATCH_SIZE = CommonVars("linkis.entrance.update.batch.size", 100)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.scala
index 1977fa68ac..faee683fbf 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceFIFOUserConsumer.scala
@@ -27,6 +27,7 @@ import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer
import java.util
import java.util.concurrent.ExecutorService
+
import scala.collection.JavaConverters.collectionAsScalaIterableConverter
class EntranceFIFOUserConsumer(
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
index 4bd0caca1b..0f31351b48 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceGroupFactory.scala
@@ -27,18 +27,12 @@ import org.apache.linkis.governance.common.protocol.conf.{
RequestQueryEngineConfigWithGlobalConfig,
ResponseQueryConfig
}
-import org.apache.linkis.instance.label.client.InstanceLabelClient
-import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
-import org.apache.linkis.manager.label.constant.{LabelKeyConstant, LabelValueConstant}
-import org.apache.linkis.governance.common.protocol.conf.{RequestQueryEngineConfigWithGlobalConfig, ResponseQueryConfig}
import org.apache.linkis.manager.label.entity.Label
import org.apache.linkis.manager.label.entity.engine.{
ConcurrentEngineConnLabel,
EngineTypeLabel,
UserCreatorLabel
}
-import org.apache.linkis.manager.label.entity.route.RouteLabel
-import org.apache.linkis.manager.label.entity.engine.{ConcurrentEngineConnLabel, EngineTypeLabel, UserCreatorLabel}
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.utils.TaskUtils
@@ -51,13 +45,10 @@ import org.apache.commons.lang3.StringUtils
import java.util
import java.util.concurrent.TimeUnit
import java.util.regex.Pattern
+
import scala.collection.JavaConverters._
+
import com.google.common.cache.{Cache, CacheBuilder}
-import org.apache.linkis.common.ServiceInstance
-import org.apache.linkis.instance.label.client.InstanceLabelClient
-import org.apache.linkis.manager.label.builder.factory.LabelBuilderFactoryContext
-import org.apache.linkis.manager.label.constant.{LabelConstant, LabelKeyConstant}
-import org.apache.linkis.manager.label.entity.route.RouteLabel
class EntranceGroupFactory extends GroupFactory with Logging {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
index f114981c5c..a067d65829 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
@@ -18,7 +18,6 @@
package org.apache.linkis.entrance.scheduler
import org.apache.linkis.common.ServiceInstance
-import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.entrance.conf.EntranceConfiguration
import org.apache.linkis.instance.label.client.InstanceLabelClient
@@ -32,10 +31,11 @@ import org.apache.linkis.scheduler.queue.parallelqueue.{ParallelConsumerManager,
import java.util
import java.util.concurrent.TimeUnit
+
import scala.collection.JavaConverters._
class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
- extends ParallelConsumerManager(maxParallelismUsers, schedulerName){
+ extends ParallelConsumerManager(maxParallelismUsers, schedulerName) {
override protected def createConsumer(groupName: String): FIFOUserConsumer = {
val group = getSchedulerContext.getOrCreateGroupFactory.getGroup(groupName)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
index df7b846a7d..3fed0f78be 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/JobHistoryHelper.scala
@@ -30,13 +30,15 @@ import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.protocol.query.cache.{CacheTaskResult, RequestReadCache}
import org.apache.linkis.rpc.Sender
import org.apache.linkis.scheduler.queue.SchedulerEventState
+
import org.apache.commons.lang3.StringUtils
import javax.servlet.http.HttpServletRequest
+
import java.util
import java.util.Date
+
import scala.collection.JavaConverters._
-import sun.net.util.IPAddressUtil
import com.google.common.net.InetAddresses
@@ -316,15 +318,15 @@ object JobHistoryHelper extends Logging {
val ecResourceMap =
if (resourceInfo == null) new util.HashMap[String, ResourceWithStatus] else resourceInfo
if (resourceMap != null) {
- resourceMap.asInstanceOf[util.HashMap[String, ResourceWithStatus]].putAll(ecResourceMap)
+ resourceMap.asInstanceOf[util.Map[String, ResourceWithStatus]].putAll(ecResourceMap)
} else {
metricsMap.put(TaskConstant.ENTRANCEJOB_YARNRESOURCE, ecResourceMap)
}
- var engineInstanceMap: util.HashMap[String, AnyRef] = null
+ var engineInstanceMap: util.Map[String, AnyRef] = null
if (metricsMap.containsKey(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)) {
engineInstanceMap = metricsMap
.get(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)
- .asInstanceOf[util.HashMap[String, AnyRef]]
+ .asInstanceOf[util.Map[String, AnyRef]]
} else {
engineInstanceMap = new util.HashMap[String, AnyRef]()
metricsMap.put(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP, engineInstanceMap)
@@ -334,7 +336,7 @@ object JobHistoryHelper extends Logging {
val ticketId = infoMap.get(TaskConstant.TICKET_ID).asInstanceOf[String]
val engineExtraInfoMap = engineInstanceMap
.getOrDefault(ticketId, new util.HashMap[String, AnyRef])
- .asInstanceOf[util.HashMap[String, AnyRef]]
+ .asInstanceOf[util.Map[String, AnyRef]]
engineExtraInfoMap.putAll(infoMap)
engineInstanceMap.put(ticketId, engineExtraInfoMap)
} else {
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelConstant.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelConstant.java
index b43501ed9e..4db4bfca40 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelConstant.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/constant/LabelConstant.java
@@ -22,6 +22,4 @@ public class LabelConstant {
public static final int LABEL_BUILDER_ERROR_CODE = 40001;
public static final int LABEL_UTIL_CONVERT_ERROR_CODE = 40002;
-
- public static final String OFFLINE = "offline";
}
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
index 6568fb838b..88267e4800 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
@@ -110,25 +110,13 @@ Integer countUndoneTaskWithCreatorOnly(
/**
* query wait for failover job
*
- * Sql example:
- * SELECT a.* FROM linkis_ps_job_history_group_history a
- * where (a.instances = ''
- * or a.instances is null
- * or a.instances not in ('192.168.1.123:9104','192.168.1.124:9104')
- * or EXISTS (
- * select 1 from
- * (
- * select '192.168.1.123:9104' as instances, 1697775054098 as registryTime
- * union all
- * select '192.168.1.124:9104' as instances, 1666239054098 as registryTime
- * ) b
- * where a.instances = b.instances and UNIX_TIMESTAMP(a.created_time) * 1000 < b.registryTime
- * )
- * )
- * and
- * status in ('Inited','Running','Scheduled','WaitForRetry')
- * and UNIX_TIMESTAMP(a.created_time) * 1000 >= 1666239054098
- * limit 10
+ * Sql example: SELECT a.* FROM linkis_ps_job_history_group_history a where (a.instances = ''
+ * or a.instances is null or a.instances not in ('192.168.1.123:9104','192.168.1.124:9104') or
+ * EXISTS ( select 1 from ( select '192.168.1.123:9104' as instances, 1697775054098 as
+ * registryTime union all select '192.168.1.124:9104' as instances, 1666239054098 as registryTime
+ * ) b where a.instances = b.instances and UNIX_TIMESTAMP(a.created_time) * 1000 < b.registryTime
+ * ) ) and status in ('Inited','Running','Scheduled','WaitForRetry') and
+ * UNIX_TIMESTAMP(a.created_time) * 1000 >= 1666239054098 limit 10
*
* @param instancesMap
* @param statusList
@@ -136,8 +124,9 @@ Integer countUndoneTaskWithCreatorOnly(
* @param limit
* @return
*/
- List selectFailoverJobHistory(@Param("instancesMap") Map instancesMap,
- @Param("statusList") List statusList,
- @Param("startTimestamp") Long startTimestamp,
- @Param("limit") Integer limit);
+ List selectFailoverJobHistory(
+ @Param("instancesMap") Map instancesMap,
+ @Param("statusList") List statusList,
+ @Param("startTimestamp") Long startTimestamp,
+ @Param("limit") Integer limit);
}
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
index 22084f88a6..a44cd0e262 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/scala/org/apache/linkis/jobhistory/service/impl/JobHistoryQueryServiceImpl.scala
@@ -252,7 +252,8 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
logger.info(s"query failover jobs, start timestamp:${startTimestamp}, limit:${limit}")
val jobResp = new JobRespProtocol
Utils.tryCatch {
- val jobList = jobHistoryMapper.selectFailoverJobHistory(reqMap, statusList, startTimestamp, limit)
+ val jobList =
+ jobHistoryMapper.selectFailoverJobHistory(reqMap, statusList, startTimestamp, limit)
val jobReqList = jobList.asScala.map(jobHistory2JobRequest).toList
val map = new util.HashMap[String, Object]()
map.put(JobRequestConstants.JOB_HISTORY_LIST, jobReqList)
@@ -266,14 +267,6 @@ class JobHistoryQueryServiceImpl extends JobHistoryQueryService with Logging {
jobResp
}
- /* private def queryTaskList2RequestPersistTaskList(queryTask: java.util.List[QueryTask]): java.util.List[RequestPersistTask] = {
- import scala.collection.JavaConversions._
- val tasks = new util.ArrayList[RequestPersistTask]
- import org.apache.linkis.jobhistory.conversions.TaskConversions.queryTask2RequestPersistTask
- queryTask.foreach(f => tasks.add(f))
- tasks
- } */
-
override def getJobHistoryByIdAndName(jobId: java.lang.Long, userName: String): JobHistory = {
val jobReq = new JobHistory
jobReq.setId(jobId)
diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala
index 883f252d70..930bfac73a 100644
--- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala
+++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala
@@ -17,10 +17,8 @@
package org.apache.linkis.gateway.ujes.parser
-import org.apache.commons.lang3.StringUtils
import org.apache.linkis.common.ServiceInstance
import org.apache.linkis.common.entity.JobInstance
-import org.apache.linkis.common.utils.JsonUtils
import org.apache.linkis.gateway.config.GatewayConfiguration
import org.apache.linkis.gateway.http.GatewayContext
import org.apache.linkis.gateway.parser.AbstractGatewayParser
@@ -30,6 +28,9 @@ import org.apache.linkis.protocol.utils.ZuulEntranceUtils
import org.apache.linkis.rpc.interceptor.ServiceInstanceUtils
import org.apache.linkis.server.BDPJettyServerHelper
import org.apache.linkis.server.conf.ServerConfiguration
+
+import org.apache.commons.lang3.StringUtils
+
import org.springframework.stereotype.Component
import javax.annotation.Resource
@@ -37,7 +38,6 @@ import javax.annotation.Resource
@Component
class EntranceRequestGatewayParser extends AbstractGatewayParser {
-
@Resource
private var jobHistoryQueryService: JobHistoryQueryService = _
@@ -49,9 +49,9 @@ class EntranceRequestGatewayParser extends AbstractGatewayParser {
if (sendResponseWhenNotMatchVersion(gatewayContext, version)) return
val serviceInstance = if (execId.startsWith(EntranceRequestGatewayParser.API_REQUEST)) {
if (
- gatewayContext.getRequest.getQueryParams.containsKey(
- EntranceRequestGatewayParser.INSTANCE
- )
+ gatewayContext.getRequest.getQueryParams.containsKey(
+ EntranceRequestGatewayParser.INSTANCE
+ )
) {
val instances =
gatewayContext.getRequest.getQueryParams.get(EntranceRequestGatewayParser.INSTANCE)
@@ -83,7 +83,8 @@ class EntranceRequestGatewayParser extends AbstractGatewayParser {
}
def buildJobInstance(taskId: Long, gatewayContext: GatewayContext): JobInstance = {
- val histories = jobHistoryQueryService.search(taskId, null, null, null, null, null, null, null)
+ val histories =
+ jobHistoryQueryService.search(taskId, null, null, null, null, null, null, null, null)
if (histories.isEmpty) {
sendErrorResponse(s"taskId $taskId is not exists.", gatewayContext)
return null
From 9e27aec833d3d20e43ff2f6c6c3d1d107860364d Mon Sep 17 00:00:00 2001
From: guoshupei <719126Liyuelynn>
Date: Tue, 28 Feb 2023 14:44:05 +0800
Subject: [PATCH 032/145] set default value
---
.../server/DefaultEntranceServer.java | 13 +-
.../server/EntranceFailoverJobServer.java | 178 +++++++++---------
.../entrance/conf/EntranceConfiguration.scala | 8 +-
3 files changed, 95 insertions(+), 104 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
index 54b855ffbd..b077ab37bb 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
@@ -17,6 +17,7 @@
package org.apache.linkis.entrance.server;
+import org.apache.commons.io.IOUtils;
import org.apache.linkis.entrance.EntranceContext;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.conf.EntranceConfiguration;
@@ -25,9 +26,8 @@
import org.apache.linkis.entrance.job.EntranceExecutionJob;
import org.apache.linkis.entrance.log.LogReader;
import org.apache.linkis.rpc.Sender;
-
-import org.apache.commons.io.IOUtils;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
@@ -35,9 +35,6 @@
import javax.annotation.PostConstruct;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
/** Description: */
@Component(ServiceNameConsts.ENTRANCE_SERVER)
public class DefaultEntranceServer extends EntranceServer {
@@ -91,9 +88,9 @@ private void shutdownEntrance(ContextClosedEvent event) {
logger.warn("Entrance exit to stop all job");
EntranceJob[] allUndoneTask = getAllUndoneTask(null);
if (null != allUndoneTask) {
- String msg = "Entrance exits the automatic cleanup task and can be rerun(服务退出自动清理任务,可以重跑)";
for (EntranceJob job : allUndoneTask) {
- job.onFailure(msg, null);
+ job.onFailure(
+ "Entrance exits the automatic cleanup task and can be rerun(服务退出自动清理任务,可以重跑)", null);
IOUtils.closeQuietly(((EntranceExecutionJob) job).getLogWriter().get());
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
index 77e85cba69..73c91f6a36 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
@@ -29,12 +29,12 @@
import org.apache.linkis.publicservice.common.lock.service.CommonLockService;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.queue.SchedulerEventState;
-
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
-
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -43,9 +43,6 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
@Component(ServiceNameConsts.ENTRANCE_FAILOVER_SERVER)
public class EntranceFailoverJobServer {
@@ -61,96 +58,93 @@ public class EntranceFailoverJobServer {
@PostConstruct
public void init() {
- this.scheduledExecutor =
- Executors.newSingleThreadScheduledExecutor(
- Utils.threadFactory("Linkis-Failover-Scheduler-Thread-", true));
- failoverTask();
+ if (EntranceConfiguration.ENTRANCE_FAILOVER_ENABLED()) {
+ this.scheduledExecutor =
+ Executors.newSingleThreadScheduledExecutor(
+ Utils.threadFactory("Linkis-Failover-Scheduler-Thread-", true));
+ failoverTask();
+ }
}
public void failoverTask() {
- if (EntranceConfiguration.ENTRANCE_FAILOVER_ENABLED()) {
- scheduledExecutor.scheduleWithFixedDelay(
- () -> {
- EntranceSchedulerContext schedulerContext =
- (EntranceSchedulerContext)
- entranceServer
- .getEntranceContext()
- .getOrCreateScheduler()
- .getSchedulerContext();
-
- // entrance do not failover job when it is offline
- if (schedulerContext.getOfflineFlag()) return;
-
- CommonLock commonLock = new CommonLock();
- commonLock.setLockObject(ENTRANCE_FAILOVER_LOCK);
- Boolean locked = false;
- try {
- locked = commonLockService.lock(commonLock, 10 * 1000L);
- if (!locked) return;
- logger.info("success locked {}", ENTRANCE_FAILOVER_LOCK);
-
- // get all entrance server from eureka
- ServiceInstance[] serviceInstances =
- Sender.getInstances(Sender.getThisServiceInstance().getApplicationName());
- if (serviceInstances == null || serviceInstances.length <= 0) return;
-
- // serverInstance to map
- Map serverInstanceMap =
- Arrays.stream(serviceInstances)
- .collect(
- Collectors.toMap(
- ServiceInstance::getInstance,
- ServiceInstance::getRegistryTimestamp,
- (k1, k2) -> k2));
-
- // It is very important to avoid repeated execute job
- // when failover self job, if self instance is empty, the job can be repeated execute
- if (!serverInstanceMap.containsKey(Sender.getThisInstance())) {
- logger.warn(
- "server has just started and has not get self info, it does not failover");
- return;
- }
-
- // get failover job expired time (获取任务故障转移过期时间,配置为0表示不过期, 过期则不处理)
- long expiredTimestamp = 0L;
- if (EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME() > 0) {
- expiredTimestamp =
- System.currentTimeMillis()
- - EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME();
- }
-
- // get uncompleted status
- List statusList =
- Arrays.stream(SchedulerEventState.uncompleteStatusArray())
- .map(Object::toString)
- .collect(Collectors.toList());
-
- List jobRequests =
- JobHistoryHelper.queryWaitForFailoverTask(
- serverInstanceMap,
- statusList,
- expiredTimestamp,
- EntranceConfiguration.ENTRANCE_FAILOVER_DATA_NUM_LIMIT());
- if (jobRequests.isEmpty()) return;
- List ids =
- jobRequests.stream().map(JobRequest::getId).collect(Collectors.toList());
- logger.info("success query failover jobs , job size: {}, ids: {}", ids.size(), ids);
-
- // failover to local server
- for (JobRequest jobRequest : jobRequests) {
- entranceServer.failoverExecute(jobRequest);
- }
- logger.info("finished execute failover jobs, job ids: {}", ids);
-
- } catch (Exception e) {
- logger.error("failover failed", e);
- } finally {
- if (locked) commonLockService.unlock(commonLock);
+ scheduledExecutor.scheduleWithFixedDelay(
+ () -> {
+ EntranceSchedulerContext schedulerContext =
+ (EntranceSchedulerContext)
+ entranceServer.getEntranceContext().getOrCreateScheduler().getSchedulerContext();
+
+ // entrance do not failover job when it is offline
+ if (schedulerContext.getOfflineFlag()) return;
+
+ CommonLock commonLock = new CommonLock();
+ commonLock.setLockObject(ENTRANCE_FAILOVER_LOCK);
+ Boolean locked = false;
+ try {
+ locked = commonLockService.lock(commonLock, 30 * 1000L);
+ if (!locked) return;
+ logger.info("success locked {}", ENTRANCE_FAILOVER_LOCK);
+
+ // get all entrance server from eureka
+ ServiceInstance[] serviceInstances =
+ Sender.getInstances(Sender.getThisServiceInstance().getApplicationName());
+ if (serviceInstances == null || serviceInstances.length <= 0) return;
+
+ // serverInstance to map
+ Map serverInstanceMap =
+ Arrays.stream(serviceInstances)
+ .collect(
+ Collectors.toMap(
+ ServiceInstance::getInstance,
+ ServiceInstance::getRegistryTimestamp,
+ (k1, k2) -> k2));
+
+ // It is very important to avoid repeated execute job
+ // when failover self job, if self instance is empty, the job can be repeated execute
+ if (!serverInstanceMap.containsKey(Sender.getThisInstance())) {
+ logger.warn(
+ "server has just started and has not get self info, it does not failover");
+ return;
}
- },
- EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INIT_TIME(),
- EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INTERVAL(),
- TimeUnit.MILLISECONDS);
- }
+
+ // get failover job expired time (获取任务故障转移过期时间,配置为0表示不过期, 过期则不处理)
+ long expiredTimestamp = 0L;
+ if (EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME() > 0) {
+ expiredTimestamp =
+ System.currentTimeMillis()
+ - EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME();
+ }
+
+ // get uncompleted status
+ List statusList =
+ Arrays.stream(SchedulerEventState.uncompleteStatusArray())
+ .map(Object::toString)
+ .collect(Collectors.toList());
+
+ List jobRequests =
+ JobHistoryHelper.queryWaitForFailoverTask(
+ serverInstanceMap,
+ statusList,
+ expiredTimestamp,
+ EntranceConfiguration.ENTRANCE_FAILOVER_DATA_NUM_LIMIT());
+ if (jobRequests.isEmpty()) return;
+ List ids =
+ jobRequests.stream().map(JobRequest::getId).collect(Collectors.toList());
+ logger.info("success query failover jobs , job size: {}, ids: {}", ids.size(), ids);
+
+ // failover to local server
+ for (JobRequest jobRequest : jobRequests) {
+ entranceServer.failoverExecute(jobRequest);
+ }
+ logger.info("finished execute failover jobs, job ids: {}", ids);
+
+ } catch (Exception e) {
+ logger.error("failover failed", e);
+ } finally {
+ if (locked) commonLockService.unlock(commonLock);
+ }
+ },
+ EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INIT_TIME(),
+ EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INTERVAL(),
+ TimeUnit.MILLISECONDS);
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index 13db69700f..d8248620b7 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -241,7 +241,7 @@ object EntranceConfiguration {
// if true, the waitForRetry job in runningJobs can be failover
val ENTRANCE_FAILOVER_RETRY_JOB_ENABLED =
- CommonVars("linkis.entrance.failover.retry.job.enable", true)
+ CommonVars("linkis.entrance.failover.retry.job.enable", false)
val ENTRANCE_UPDATE_BATCH_SIZE = CommonVars("linkis.entrance.update.batch.size", 100)
@@ -255,12 +255,12 @@ object EntranceConfiguration {
val ENTRANCE_GROUP_SCAN_INTERVAL = CommonVars("linkis.entrance.group.scan.interval", 60 * 1000)
val ENTRANCE_FAILOVER_RETAIN_ENGINE_CONN_ENABLED =
- CommonVars("linkis.entrance.failover.retain.engine.conn.enable", true)
+ CommonVars("linkis.entrance.failover.retain.engine.conn.enable", false)
val ENTRANCE_FAILOVER_RETAIN_YARN_RESOURCE_ENABLED =
- CommonVars("linkis.entrance.failover.retain.yarn.resource.enable", true)
+ CommonVars("linkis.entrance.failover.retain.yarn.resource.enable", false)
val ENTRANCE_FAILOVER_RUNNING_KILL_ENABLED =
- CommonVars("linkis.entrance.failover.running.kill.enable", true)
+ CommonVars("linkis.entrance.failover.running.kill.enable", false)
}
From 3dda863d37aa94326b77c52a33251bf934e588d3 Mon Sep 17 00:00:00 2001
From: guoshupei <719126Liyuelynn>
Date: Tue, 28 Feb 2023 15:27:17 +0800
Subject: [PATCH 033/145] add config to properties
---
.../linkis/entrance/server/DefaultEntranceServer.java | 9 ++++++---
.../entrance/server/EntranceFailoverJobServer.java | 7 +++++--
.../org/apache/linkis/entrance/EntranceServer.scala | 2 +-
linkis-dist/package/conf/linkis-cg-entrance.properties | 7 ++++++-
linkis-dist/package/conf/linkis-mg-gateway.properties | 4 ++--
5 files changed, 20 insertions(+), 9 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
index b077ab37bb..14bea60435 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
@@ -17,7 +17,6 @@
package org.apache.linkis.entrance.server;
-import org.apache.commons.io.IOUtils;
import org.apache.linkis.entrance.EntranceContext;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.conf.EntranceConfiguration;
@@ -26,8 +25,9 @@
import org.apache.linkis.entrance.job.EntranceExecutionJob;
import org.apache.linkis.entrance.log.LogReader;
import org.apache.linkis.rpc.Sender;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
+import org.apache.commons.io.IOUtils;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.EventListener;
@@ -35,6 +35,9 @@
import javax.annotation.PostConstruct;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/** Description: */
@Component(ServiceNameConsts.ENTRANCE_SERVER)
public class DefaultEntranceServer extends EntranceServer {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
index 73c91f6a36..d7f5ce5951 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
@@ -29,12 +29,12 @@
import org.apache.linkis.publicservice.common.lock.service.CommonLockService;
import org.apache.linkis.rpc.Sender;
import org.apache.linkis.scheduler.queue.SchedulerEventState;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
+
import java.util.Arrays;
import java.util.List;
import java.util.Map;
@@ -43,6 +43,9 @@
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
@Component(ServiceNameConsts.ENTRANCE_FAILOVER_SERVER)
public class EntranceFailoverJobServer {
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index 8ef5c268b5..8e9bbeeac0 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -390,7 +390,7 @@ abstract class EntranceServer extends Logging {
engineStopRequest.setServiceInstance(ecInstance)
// send to linkismanager kill ec
Sender
- .getSender(RPCConfiguration.LINKIS_MANAGER_APPLICATION_NAME.getValue)
+ .getSender(RPCConfiguration.LINKIS_MANAGER_SERVICE_NAME.getValue)
.send(engineStopRequest)
val msg =
s"job ${jobRequest.getId} send EngineStopRequest to linkismanager, kill EC instance $ecInstance"
diff --git a/linkis-dist/package/conf/linkis-cg-entrance.properties b/linkis-dist/package/conf/linkis-cg-entrance.properties
index e89ced2159..639256d5cf 100644
--- a/linkis-dist/package/conf/linkis-cg-entrance.properties
+++ b/linkis-dist/package/conf/linkis-cg-entrance.properties
@@ -33,4 +33,9 @@ spring.server.port=9104
wds.linkis.entrance.user.creator.ip.interceptor.switch=false
## you may set service version if you want to distinguish different configuration version
-spring.eureka.instance.metadata-map.linkis.conf.version=v1
\ No newline at end of file
+spring.eureka.instance.metadata-map.linkis.conf.version=v1
+
+
+wds.linkis.server.mybatis.mapperLocations=classpath*:mapper/common/*.xml,classpath*:mapper/mysql/*.xml
+wds.linkis.server.mybatis.BasePackage=org.apache.linkis.publicservice.common.lock.dao
+wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.publicservice.common.lock.entity
\ No newline at end of file
diff --git a/linkis-dist/package/conf/linkis-mg-gateway.properties b/linkis-dist/package/conf/linkis-mg-gateway.properties
index 84be3d897d..27656f7f31 100644
--- a/linkis-dist/package/conf/linkis-mg-gateway.properties
+++ b/linkis-dist/package/conf/linkis-mg-gateway.properties
@@ -21,8 +21,8 @@ wds.linkis.gateway.conf.url.pass.auth=/dss/
wds.linkis.gateway.conf.enable.token.auth=true
wds.linkis.is.gateway=true
wds.linkis.server.mybatis.mapperLocations=classpath*:mapper/common/*.xml,classpath*:mapper/mysql/*.xml
-wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.instance.label.entity
-wds.linkis.server.mybatis.BasePackage=org.apache.linkis.instance.label.dao,org.apache.linkis.gateway.authentication.dao
+wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.instance.label.entity,org.apache.linkis.jobhistory.entity
+wds.linkis.server.mybatis.BasePackage=org.apache.linkis.instance.label.dao,org.apache.linkis.gateway.authentication.dao,org.apache.linkis.jobhistory.dao
wds.linkis.label.entity.packages=org.apache.linkis.gateway.ujes.route.label
wds.linkis.login_encrypt.enable=false
##LDAP
From ff2871919e7903877e175f14979d797e20706dd6 Mon Sep 17 00:00:00 2001
From: guoshupei <719126Liyuelynn>
Date: Tue, 28 Feb 2023 21:09:58 +0800
Subject: [PATCH 034/145] change HashMap to Map
---
.../main/scala/org/apache/linkis/entrance/EntranceServer.scala | 2 +-
.../linkis/entrance/interceptor/impl/CustomVariableUtils.scala | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index 8e9bbeeac0..55be20fd4d 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -312,7 +312,7 @@ abstract class EntranceServer extends Logging {
val logAppender = new java.lang.StringBuilder()
logAppender.append(
- "*************************************FAILOVER**************************************"
+ "*************************************FAILOVER**************************************\n"
)
// try to kill ec
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala
index 7a7cb7463a..a40c3fa35d 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/interceptor/impl/CustomVariableUtils.scala
@@ -63,7 +63,7 @@ object CustomVariableUtils extends Logging {
}
val variableMap = TaskUtils
.getVariableMap(jobRequest.getParams)
- .asInstanceOf[util.HashMap[String, String]]
+ .asInstanceOf[util.Map[String, String]]
variables.putAll(variableMap)
if (!variables.containsKey("user")) {
variables.put("user", jobRequest.getExecuteUser)
From 39d45d3b40d27427a2aa28d334f2bc88189b55e5 Mon Sep 17 00:00:00 2001
From: guoshupei <719126Liyuelynn>
Date: Wed, 1 Mar 2023 11:03:56 +0800
Subject: [PATCH 035/145] update default value
---
.../main/scala/org/apache/linkis/entrance/EntranceServer.scala | 2 +-
.../org/apache/linkis/entrance/conf/EntranceConfiguration.scala | 2 +-
2 files changed, 2 insertions(+), 2 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index 55be20fd4d..5560cc716d 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -312,7 +312,7 @@ abstract class EntranceServer extends Logging {
val logAppender = new java.lang.StringBuilder()
logAppender.append(
- "*************************************FAILOVER**************************************\n"
+ "*************************************FAILOVER************************************** \n"
)
// try to kill ec
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index d8248620b7..17f2dffd9c 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -237,7 +237,7 @@ object EntranceConfiguration {
CommonVars("linkis.entrance.failover.data.num.limit", 10).getValue
val ENTRANCE_FAILOVER_DATA_INTERVAL_TIME =
- CommonVars("linkis.entrance.failover.data.interval.time", new TimeType("7d").toLong).getValue
+ CommonVars("linkis.entrance.failover.data.interval.time", new TimeType("1d").toLong).getValue
// if true, the waitForRetry job in runningJobs can be failover
val ENTRANCE_FAILOVER_RETRY_JOB_ENABLED =
From fdc54d45cb2b9f6f88a1eeef9e06ba3424a24fa5 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Thu, 2 Mar 2023 12:08:08 +0800
Subject: [PATCH 036/145] Optimal refresh consumer group maxAllowRunningJobs
logic
---
.../entrance/scheduler/EntranceParallelConsumerManager.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
index a067d65829..a6e24388a6 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
@@ -75,7 +75,7 @@ class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: S
}
def refreshAllGroupMaxAllowRunningJobs(validInsCount: Int): Unit = {
- if (validInsCount <= 0) return
+ if (validInsCount <= 1) return
listConsumers()
.foreach(item => {
item.getGroup match {
From 0b0ef7917eeccc2707754b88eb74421c1d3e3913 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Sun, 5 Mar 2023 18:44:38 +0800
Subject: [PATCH 037/145] rename config key
---
.../org/apache/linkis/server/conf/ServerConfiguration.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/conf/ServerConfiguration.scala b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/conf/ServerConfiguration.scala
index 8d9f9d65ad..3c6a25a343 100644
--- a/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/conf/ServerConfiguration.scala
+++ b/linkis-commons/linkis-module/src/main/scala/org/apache/linkis/server/conf/ServerConfiguration.scala
@@ -208,6 +208,6 @@ object ServerConfiguration extends Logging {
CommonVars("wds.linkis.session.proxy.user.ticket.key", "linkis_user_session_proxy_ticket_id_v1")
val LINKIS_SERVER_ENTRANCE_HEADER_KEY =
- CommonVars("wds.linkis.server.entrance.header.key", "jobInstanceKey")
+ CommonVars("linkis.server.entrance.header.key", "jobInstanceKey")
}
From 75eddde7591a59c2b721a92afd8b5dc88b1143e3 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Mon, 6 Mar 2023 11:46:01 +0800
Subject: [PATCH 038/145] rename metric config key
---
.../scala/org/apache/linkis/entrance/EntranceServer.scala | 4 ++--
.../linkis/entrance/conf/EntranceConfiguration.scala | 8 ++++----
2 files changed, 6 insertions(+), 6 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index 5560cc716d..e9c3da2cda 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -625,7 +625,7 @@ abstract class EntranceServer extends Logging {
)
val metricMap = new util.HashMap[String, Object]()
- if (EntranceConfiguration.ENTRANCE_FAILOVER_RETAIN_ENGINE_CONN_ENABLED.getValue) {
+ if (EntranceConfiguration.ENTRANCE_FAILOVER_RETAIN_METRIC_ENGINE_CONN_ENABLED.getValue) {
if (
jobRequest.getMetrics != null && jobRequest.getMetrics.containsKey(
TaskConstant.ENTRANCEJOB_ENGINECONN_MAP
@@ -638,7 +638,7 @@ abstract class EntranceServer extends Logging {
}
}
- if (EntranceConfiguration.ENTRANCE_FAILOVER_RETAIN_YARN_RESOURCE_ENABLED.getValue) {
+ if (EntranceConfiguration.ENTRANCE_FAILOVER_RETAIN_METRIC_YARN_RESOURCE_ENABLED.getValue) {
if (
jobRequest.getMetrics != null && jobRequest.getMetrics.containsKey(
TaskConstant.ENTRANCEJOB_YARNRESOURCE
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index 17f2dffd9c..617584f278 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -254,11 +254,11 @@ object EntranceConfiguration {
val ENTRANCE_GROUP_SCAN_INTERVAL = CommonVars("linkis.entrance.group.scan.interval", 60 * 1000)
- val ENTRANCE_FAILOVER_RETAIN_ENGINE_CONN_ENABLED =
- CommonVars("linkis.entrance.failover.retain.engine.conn.enable", false)
+ val ENTRANCE_FAILOVER_RETAIN_METRIC_ENGINE_CONN_ENABLED =
+ CommonVars("linkis.entrance.failover.retain.metric.engine.conn.enable", false)
- val ENTRANCE_FAILOVER_RETAIN_YARN_RESOURCE_ENABLED =
- CommonVars("linkis.entrance.failover.retain.yarn.resource.enable", false)
+ val ENTRANCE_FAILOVER_RETAIN_METRIC_YARN_RESOURCE_ENABLED =
+ CommonVars("linkis.entrance.failover.retain.metric.yarn.resource.enable", false)
val ENTRANCE_FAILOVER_RUNNING_KILL_ENABLED =
CommonVars("linkis.entrance.failover.running.kill.enable", false)
From 6fee59f67d637dad555fc96715164e327c1eee08 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Wed, 8 Mar 2023 11:02:34 +0800
Subject: [PATCH 039/145] - failover server close - use logger template
---
.../server/EntranceFailoverJobServer.java | 182 ++++++++++--------
.../EntranceParallelConsumerManager.scala | 10 +-
2 files changed, 108 insertions(+), 84 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
index d7f5ce5951..d162be0820 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
@@ -31,6 +31,8 @@
import org.apache.linkis.scheduler.queue.SchedulerEventState;
import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.event.ContextClosedEvent;
+import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
@@ -38,9 +40,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.*;
import java.util.stream.Collectors;
import org.slf4j.Logger;
@@ -59,6 +59,8 @@ public class EntranceFailoverJobServer {
private ScheduledExecutorService scheduledExecutor;
+ private Future future;
+
@PostConstruct
public void init() {
if (EntranceConfiguration.ENTRANCE_FAILOVER_ENABLED()) {
@@ -69,85 +71,101 @@ public void init() {
}
}
+ @EventListener
+ private void shutdownFailover(ContextClosedEvent event) {
+ if (future != null && !future.isDone()) {
+ future.cancel(true);
+ }
+ if (scheduledExecutor != null) {
+ scheduledExecutor.shutdown();
+ logger.info("Entrance Failover Server exit!");
+ }
+ }
+
public void failoverTask() {
- scheduledExecutor.scheduleWithFixedDelay(
- () -> {
- EntranceSchedulerContext schedulerContext =
- (EntranceSchedulerContext)
- entranceServer.getEntranceContext().getOrCreateScheduler().getSchedulerContext();
-
- // entrance do not failover job when it is offline
- if (schedulerContext.getOfflineFlag()) return;
-
- CommonLock commonLock = new CommonLock();
- commonLock.setLockObject(ENTRANCE_FAILOVER_LOCK);
- Boolean locked = false;
- try {
- locked = commonLockService.lock(commonLock, 30 * 1000L);
- if (!locked) return;
- logger.info("success locked {}", ENTRANCE_FAILOVER_LOCK);
-
- // get all entrance server from eureka
- ServiceInstance[] serviceInstances =
- Sender.getInstances(Sender.getThisServiceInstance().getApplicationName());
- if (serviceInstances == null || serviceInstances.length <= 0) return;
-
- // serverInstance to map
- Map serverInstanceMap =
- Arrays.stream(serviceInstances)
- .collect(
- Collectors.toMap(
- ServiceInstance::getInstance,
- ServiceInstance::getRegistryTimestamp,
- (k1, k2) -> k2));
-
- // It is very important to avoid repeated execute job
- // when failover self job, if self instance is empty, the job can be repeated execute
- if (!serverInstanceMap.containsKey(Sender.getThisInstance())) {
- logger.warn(
- "server has just started and has not get self info, it does not failover");
- return;
- }
-
- // get failover job expired time (获取任务故障转移过期时间,配置为0表示不过期, 过期则不处理)
- long expiredTimestamp = 0L;
- if (EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME() > 0) {
- expiredTimestamp =
- System.currentTimeMillis()
- - EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME();
- }
-
- // get uncompleted status
- List statusList =
- Arrays.stream(SchedulerEventState.uncompleteStatusArray())
- .map(Object::toString)
- .collect(Collectors.toList());
-
- List jobRequests =
- JobHistoryHelper.queryWaitForFailoverTask(
- serverInstanceMap,
- statusList,
- expiredTimestamp,
- EntranceConfiguration.ENTRANCE_FAILOVER_DATA_NUM_LIMIT());
- if (jobRequests.isEmpty()) return;
- List ids =
- jobRequests.stream().map(JobRequest::getId).collect(Collectors.toList());
- logger.info("success query failover jobs , job size: {}, ids: {}", ids.size(), ids);
-
- // failover to local server
- for (JobRequest jobRequest : jobRequests) {
- entranceServer.failoverExecute(jobRequest);
- }
- logger.info("finished execute failover jobs, job ids: {}", ids);
-
- } catch (Exception e) {
- logger.error("failover failed", e);
- } finally {
- if (locked) commonLockService.unlock(commonLock);
- }
- },
- EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INIT_TIME(),
- EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INTERVAL(),
- TimeUnit.MILLISECONDS);
+ future =
+ scheduledExecutor.scheduleWithFixedDelay(
+ () -> {
+ EntranceSchedulerContext schedulerContext =
+ (EntranceSchedulerContext)
+ entranceServer
+ .getEntranceContext()
+ .getOrCreateScheduler()
+ .getSchedulerContext();
+
+ // entrance do not failover job when it is offline
+ if (schedulerContext.getOfflineFlag()) return;
+
+ CommonLock commonLock = new CommonLock();
+ commonLock.setLockObject(ENTRANCE_FAILOVER_LOCK);
+ Boolean locked = false;
+ try {
+ locked = commonLockService.lock(commonLock, 30 * 1000L);
+ if (!locked) return;
+ logger.info("success locked {}", ENTRANCE_FAILOVER_LOCK);
+
+ // get all entrance server from eureka
+ ServiceInstance[] serviceInstances =
+ Sender.getInstances(Sender.getThisServiceInstance().getApplicationName());
+ if (serviceInstances == null || serviceInstances.length <= 0) return;
+
+ // serverInstance to map
+ Map serverInstanceMap =
+ Arrays.stream(serviceInstances)
+ .collect(
+ Collectors.toMap(
+ ServiceInstance::getInstance,
+ ServiceInstance::getRegistryTimestamp,
+ (k1, k2) -> k2));
+
+ // It is very important to avoid repeated execute job
+ // when failover self job, if self instance is empty, the job can be repeated
+ // execute
+ if (!serverInstanceMap.containsKey(Sender.getThisInstance())) {
+ logger.warn(
+ "server has just started and has not get self info, it does not failover");
+ return;
+ }
+
+ // get failover job expired time (获取任务故障转移过期时间,配置为0表示不过期, 过期则不处理)
+ long expiredTimestamp = 0L;
+ if (EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME() > 0) {
+ expiredTimestamp =
+ System.currentTimeMillis()
+ - EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME();
+ }
+
+ // get uncompleted status
+ List statusList =
+ Arrays.stream(SchedulerEventState.uncompleteStatusArray())
+ .map(Object::toString)
+ .collect(Collectors.toList());
+
+ List jobRequests =
+ JobHistoryHelper.queryWaitForFailoverTask(
+ serverInstanceMap,
+ statusList,
+ expiredTimestamp,
+ EntranceConfiguration.ENTRANCE_FAILOVER_DATA_NUM_LIMIT());
+ if (jobRequests.isEmpty()) return;
+ List ids =
+ jobRequests.stream().map(JobRequest::getId).collect(Collectors.toList());
+ logger.info("success query failover jobs , job size: {}, ids: {}", ids.size(), ids);
+
+ // failover to local server
+ for (JobRequest jobRequest : jobRequests) {
+ entranceServer.failoverExecute(jobRequest);
+ }
+ logger.info("finished execute failover jobs, job ids: {}", ids);
+
+ } catch (Exception e) {
+ logger.error("failover failed", e);
+ } finally {
+ if (locked) commonLockService.unlock(commonLock);
+ }
+ },
+ EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INIT_TIME(),
+ EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INTERVAL(),
+ TimeUnit.MILLISECONDS);
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
index a6e24388a6..060fcbdd65 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
@@ -30,10 +30,12 @@ import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer
import org.apache.linkis.scheduler.queue.parallelqueue.{ParallelConsumerManager, ParallelGroup}
import java.util
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{ScheduledFuture, TimeUnit}
import scala.collection.JavaConverters._
+import com.sun.javafx.util.Logging
+
class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
extends ParallelConsumerManager(maxParallelismUsers, schedulerName) {
@@ -84,7 +86,11 @@ class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: S
group.setMaxAllowRunningJobs(maxAllowRunningJobs)
logger
.info(
- s"group ${group.getGroupName} refresh maxAllowRunningJobs => ${group.getMaxRunningJobs}/$validInsCount=$maxAllowRunningJobs"
+ "group {} refresh maxAllowRunningJobs => {}/{}={}",
+ group.getGroupName,
+ group.getMaxRunningJobs,
+ validInsCount,
+ maxAllowRunningJobs
)
case _ =>
}
From 37567a86bbdac42952727497737fb6fc5f596843 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Wed, 8 Mar 2023 11:41:03 +0800
Subject: [PATCH 040/145] Remove useless references
---
.../entrance/scheduler/EntranceParallelConsumerManager.scala | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
index 060fcbdd65..d30f53a8f5 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
@@ -30,12 +30,10 @@ import org.apache.linkis.scheduler.queue.fifoqueue.FIFOUserConsumer
import org.apache.linkis.scheduler.queue.parallelqueue.{ParallelConsumerManager, ParallelGroup}
import java.util
-import java.util.concurrent.{ScheduledFuture, TimeUnit}
+import java.util.concurrent.TimeUnit
import scala.collection.JavaConverters._
-import com.sun.javafx.util.Logging
-
class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: String)
extends ParallelConsumerManager(maxParallelismUsers, schedulerName) {
From 71d3e089dcd455027bc36026711e26a4abeb8f40 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Wed, 8 Mar 2023 12:36:25 +0800
Subject: [PATCH 041/145] cast string when use logger template
---
.../scheduler/EntranceParallelConsumerManager.scala | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
index d30f53a8f5..726d93c500 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
@@ -86,9 +86,9 @@ class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: S
.info(
"group {} refresh maxAllowRunningJobs => {}/{}={}",
group.getGroupName,
- group.getMaxRunningJobs,
- validInsCount,
- maxAllowRunningJobs
+ group.getMaxRunningJobs.toString,
+ validInsCount.toString,
+ maxAllowRunningJobs.toString
)
case _ =>
}
From 8ae8a3de4b3d2741459adcd63d309ff4e54dfbc5 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Wed, 8 Mar 2023 15:29:07 +0800
Subject: [PATCH 042/145] use logger template
---
.../scheduler/EntranceParallelConsumerManager.scala | 10 ++++++----
1 file changed, 6 insertions(+), 4 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
index 726d93c500..afaf6b16e7 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
@@ -85,10 +85,12 @@ class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: S
logger
.info(
"group {} refresh maxAllowRunningJobs => {}/{}={}",
- group.getGroupName,
- group.getMaxRunningJobs.toString,
- validInsCount.toString,
- maxAllowRunningJobs.toString
+ Array(
+ group.getGroupName,
+ group.getMaxRunningJobs,
+ validInsCount,
+ maxAllowRunningJobs
+ )
)
case _ =>
}
From cb048534aa2a7180365b3c1f15b3cacc053c461e Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Wed, 8 Mar 2023 16:47:07 +0800
Subject: [PATCH 043/145] use logger template
---
.../entrance/scheduler/EntranceParallelConsumerManager.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
index afaf6b16e7..5e74d48939 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
@@ -90,7 +90,7 @@ class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: S
group.getMaxRunningJobs,
validInsCount,
maxAllowRunningJobs
- )
+ ): _*
)
case _ =>
}
From 800074e400c834735fb04299f4422d35803cc3bf Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Wed, 8 Mar 2023 18:01:33 +0800
Subject: [PATCH 044/145] use logger template
---
.../scheduler/EntranceParallelConsumerManager.scala | 6 +++---
1 file changed, 3 insertions(+), 3 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
index 5e74d48939..6d756ad1a8 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
@@ -87,9 +87,9 @@ class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: S
"group {} refresh maxAllowRunningJobs => {}/{}={}",
Array(
group.getGroupName,
- group.getMaxRunningJobs,
- validInsCount,
- maxAllowRunningJobs
+ group.getMaxRunningJobs.toString,
+ validInsCount.toString,
+ maxAllowRunningJobs.toString
): _*
)
case _ =>
From 2d4f7848754f5a439f1d870350863d3f19883bce Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Tue, 28 Mar 2023 15:26:07 +0800
Subject: [PATCH 045/145] Update the maximum concurrency of orchestrator from
200 to 1000
---
.../linkis/orchestrator/conf/OrchestratorConfiguration.scala | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
index 50dbef632c..10f3a64d13 100644
--- a/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
+++ b/linkis-orchestrator/linkis-orchestrator-core/src/main/scala/org/apache/linkis/orchestrator/conf/OrchestratorConfiguration.scala
@@ -48,7 +48,7 @@ object OrchestratorConfiguration {
CommonVars("wds.linkis.orchestrator.execution.task.max.parallelism", 5)
val TASK_RUNNER_MAX_SIZE =
- CommonVars("wds.linkis.orchestrator.execution.task.runner.max.size", 200)
+ CommonVars("wds.linkis.orchestrator.execution.task.runner.max.size", 1000)
val EXEC_RUNNER_FACTORY_CLASS =
CommonVars("wds.linkis.orchestrator.exec.task.runner.factory.class", "")
From 8dafb2cdc421bba440fae45abad83c733dc3cf2b Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Tue, 28 Mar 2023 17:21:19 +0800
Subject: [PATCH 046/145] - moved JobInstance from linkis-common to
linkis-protocol - add isInitedStr,isRunningStr method and remove
uncompleteStatusArray method in SchedulerEventState
---
.../linkis/protocol/engine}/JobInstance.scala | 2 +-
.../scheduler/queue/SchedulerEventState.scala | 6 ++---
.../common/entity/job/JobRequest.java | 1 +
.../entrance/restful/EntranceRestfulApi.java | 2 +-
.../server/EntranceFailoverJobServer.java | 23 +++++++++++++------
.../linkis/entrance/EntranceServer.scala | 4 ++--
.../parser/EntranceRequestGatewayParser.scala | 2 +-
7 files changed, 25 insertions(+), 15 deletions(-)
rename linkis-commons/{linkis-common/src/main/scala/org/apache/linkis/common/entity => linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine}/JobInstance.scala (95%)
diff --git a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/entity/JobInstance.scala b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/JobInstance.scala
similarity index 95%
rename from linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/entity/JobInstance.scala
rename to linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/JobInstance.scala
index aa9db730ee..5e2eb10a59 100644
--- a/linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/entity/JobInstance.scala
+++ b/linkis-commons/linkis-protocol/src/main/scala/org/apache/linkis/protocol/engine/JobInstance.scala
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package org.apache.linkis.common.entity
+package org.apache.linkis.protocol.engine
case class JobInstance(
status: String,
diff --git a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala
index a64103628c..26087d99f0 100644
--- a/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala
+++ b/linkis-commons/linkis-scheduler/src/main/scala/org/apache/linkis/scheduler/queue/SchedulerEventState.scala
@@ -38,8 +38,8 @@ object SchedulerEventState extends Enumeration {
SchedulerEventState.withName(jobState)
)
- def uncompleteStatusArray(): Array[SchedulerEventState] = {
- SchedulerEventState.values.filterNot(isCompleted).toArray
- }
+ def isInitedByStr(jobState: String): Boolean = SchedulerEventState.withName(jobState) == Inited
+
+ def isRunningByStr(jobState: String): Boolean = isRunning(SchedulerEventState.withName(jobState))
}
diff --git a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/entity/job/JobRequest.java b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/entity/job/JobRequest.java
index 75134bd84a..46fa8a69ef 100644
--- a/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/entity/job/JobRequest.java
+++ b/linkis-computation-governance/linkis-computation-governance-common/src/main/java/org/apache/linkis/governance/common/entity/job/JobRequest.java
@@ -49,6 +49,7 @@ public class JobRequest {
/** result location */
private String resultLocation;
+ /** Task status updates is ordered, if false, not checked */
private Boolean updateOrderFlag = true;
private String observeInfo;
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
index 71b0df4250..6dcfcdc4b7 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
@@ -17,7 +17,6 @@
package org.apache.linkis.entrance.restful;
-import org.apache.linkis.common.entity.JobInstance;
import org.apache.linkis.common.log.LogUtils;
import org.apache.linkis.entrance.EntranceServer;
import org.apache.linkis.entrance.conf.EntranceConfiguration;
@@ -29,6 +28,7 @@
import org.apache.linkis.governance.common.entity.job.JobRequest;
import org.apache.linkis.manager.common.protocol.resource.ResourceWithStatus;
import org.apache.linkis.protocol.constants.TaskConstant;
+import org.apache.linkis.protocol.engine.JobInstance;
import org.apache.linkis.protocol.engine.JobProgressInfo;
import org.apache.linkis.protocol.utils.ZuulEntranceUtils;
import org.apache.linkis.rpc.Sender;
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
index d162be0820..4e66da5cc3 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/EntranceFailoverJobServer.java
@@ -37,12 +37,16 @@
import javax.annotation.PostConstruct;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;
import java.util.stream.Collectors;
+import scala.Enumeration;
+import scala.collection.Iterator;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -135,16 +139,10 @@ public void failoverTask() {
- EntranceConfiguration.ENTRANCE_FAILOVER_DATA_INTERVAL_TIME();
}
- // get uncompleted status
- List statusList =
- Arrays.stream(SchedulerEventState.uncompleteStatusArray())
- .map(Object::toString)
- .collect(Collectors.toList());
-
List jobRequests =
JobHistoryHelper.queryWaitForFailoverTask(
serverInstanceMap,
- statusList,
+ getUnCompleteStatus(),
expiredTimestamp,
EntranceConfiguration.ENTRANCE_FAILOVER_DATA_NUM_LIMIT());
if (jobRequests.isEmpty()) return;
@@ -168,4 +166,15 @@ public void failoverTask() {
EntranceConfiguration.ENTRANCE_FAILOVER_SCAN_INTERVAL(),
TimeUnit.MILLISECONDS);
}
+
+ private List getUnCompleteStatus() {
+ List status = new ArrayList<>();
+ Enumeration.ValueSet values = SchedulerEventState.values();
+ Iterator iterator = values.iterator();
+ while (iterator.hasNext()) {
+ Enumeration.Value next = iterator.next();
+ if (!SchedulerEventState.isCompleted(next)) status.add(next.toString());
+ }
+ return status;
+ }
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index e9c3da2cda..45be36287b 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -319,13 +319,13 @@ abstract class EntranceServer extends Logging {
killOldEC(jobRequest, logAppender);
// deal Inited jobRequest, if status is Inited, need to deal by all Interceptors, such as set log_path
- if (jobRequest.getStatus.equals(SchedulerEventState.Inited.toString)) {
+ if (SchedulerEventState.isInitedByStr(jobRequest.getStatus)) {
dealInitedJobRequest(jobRequest, logAppender)
}
if (
EntranceConfiguration.ENTRANCE_FAILOVER_RUNNING_KILL_ENABLED.getValue &&
- jobRequest.getStatus.equals(SchedulerEventState.Running.toString)
+ SchedulerEventState.isRunningByStr(jobRequest.getStatus)
) {
// deal Running jobRequest, if enabled, status changed from Running to Cancelled
dealRunningJobRequest(jobRequest, logAppender)
diff --git a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala
index 930bfac73a..04f206d6f6 100644
--- a/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala
+++ b/linkis-spring-cloud-services/linkis-service-gateway/linkis-gateway-server-support/src/main/scala/org/apache/linkis/gateway/ujes/parser/EntranceRequestGatewayParser.scala
@@ -18,12 +18,12 @@
package org.apache.linkis.gateway.ujes.parser
import org.apache.linkis.common.ServiceInstance
-import org.apache.linkis.common.entity.JobInstance
import org.apache.linkis.gateway.config.GatewayConfiguration
import org.apache.linkis.gateway.http.GatewayContext
import org.apache.linkis.gateway.parser.AbstractGatewayParser
import org.apache.linkis.gateway.ujes.parser.EntranceExecutionGatewayParser._
import org.apache.linkis.jobhistory.service.JobHistoryQueryService
+import org.apache.linkis.protocol.engine.JobInstance
import org.apache.linkis.protocol.utils.ZuulEntranceUtils
import org.apache.linkis.rpc.interceptor.ServiceInstanceUtils
import org.apache.linkis.server.BDPJettyServerHelper
From e624b373359f9c35bdd27ad5a77ff49962f01e24 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Tue, 28 Mar 2023 17:57:46 +0800
Subject: [PATCH 047/145] Add description
---
.../org/apache/linkis/entrance/conf/EntranceConfiguration.scala | 2 ++
1 file changed, 2 insertions(+)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
index 617584f278..839b3123cc 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/conf/EntranceConfiguration.scala
@@ -245,6 +245,7 @@ object EntranceConfiguration {
val ENTRANCE_UPDATE_BATCH_SIZE = CommonVars("linkis.entrance.update.batch.size", 100)
+ // if true, the job in ConsumeQueue can be failover
val ENTRANCE_SHUTDOWN_FAILOVER_CONSUME_QUEUE_ENABLED =
CommonVars("linkis.entrance.shutdown.failover.consume.queue.enable", true).getValue
@@ -260,6 +261,7 @@ object EntranceConfiguration {
val ENTRANCE_FAILOVER_RETAIN_METRIC_YARN_RESOURCE_ENABLED =
CommonVars("linkis.entrance.failover.retain.metric.yarn.resource.enable", false)
+ // if true, job whose status is running will be set to Cancelled
val ENTRANCE_FAILOVER_RUNNING_KILL_ENABLED =
CommonVars("linkis.entrance.failover.running.kill.enable", false)
From 94f3ec15a1995ee00614058ef7fb867d4035c654 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Fri, 7 Apr 2023 21:49:37 +0800
Subject: [PATCH 048/145] replace constant
---
.../entrance/restful/EntranceRestfulApi.java | 23 +++++++++----------
.../server/DefaultEntranceServer.java | 2 +-
.../linkis/entrance/EntranceServer.scala | 18 +++++++--------
3 files changed, 21 insertions(+), 22 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
index 3335eec90f..90a1bdbd2b 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
@@ -439,7 +439,7 @@ public Message progressWithResource(HttpServletRequest req, @PathVariable("id")
message = Message.ok();
message.setMethod("/api/entrance/" + id + "/progressWithResource");
message
- .data(TaskConstant.ENTRANCEJOB_YARNRESOURCE, null)
+ .data(TaskConstant.JOB_YARNRESOURCE, null)
.data("progress", 0)
.data("execID", "")
.data("taskID", id)
@@ -499,18 +499,17 @@ private void buildYarnResource(
JobRequest jobRequest, Map metricsVo, Message message) {
try {
Map metrics = jobRequest.getMetrics();
- if (metrics.containsKey(TaskConstant.ENTRANCEJOB_YARNRESOURCE)) {
+ if (metrics.containsKey(TaskConstant.JOB_YARNRESOURCE)) {
HashMap resourceMap =
- (HashMap)
- metrics.get(TaskConstant.ENTRANCEJOB_YARNRESOURCE);
+ (HashMap) metrics.get(TaskConstant.JOB_YARNRESOURCE);
ArrayList resoureList = new ArrayList<>(12);
if (null != resourceMap && !resourceMap.isEmpty()) {
resourceMap.forEach(
(applicationId, resource) -> {
resoureList.add(new YarnResourceWithStatusVo(applicationId, resource));
});
- metricsVo.put(TaskConstant.ENTRANCEJOB_YARNRESOURCE, resoureList);
+ metricsVo.put(TaskConstant.JOB_YARNRESOURCE, resoureList);
Optional cores =
resourceMap.values().stream()
.map(resource -> resource.queueCores())
@@ -533,17 +532,17 @@ private void buildYarnResource(
}
String coreRGB = RGBUtils.getRGB(corePercent);
String memoryRGB = RGBUtils.getRGB(memoryPercent);
- metricsVo.put(TaskConstant.ENTRANCEJOB_CORE_PERCENT, corePercent);
- metricsVo.put(TaskConstant.ENTRANCEJOB_MEMORY_PERCENT, memoryPercent);
- metricsVo.put(TaskConstant.ENTRANCEJOB_CORE_RGB, coreRGB);
- metricsVo.put(TaskConstant.ENTRANCEJOB_MEMORY_RGB, memoryRGB);
+ metricsVo.put(TaskConstant.JOB_CORE_PERCENT, corePercent);
+ metricsVo.put(TaskConstant.JOB_MEMORY_PERCENT, memoryPercent);
+ metricsVo.put(TaskConstant.JOB_CORE_RGB, coreRGB);
+ metricsVo.put(TaskConstant.JOB_MEMORY_RGB, memoryRGB);
- message.data(TaskConstant.ENTRANCEJOB_YARN_METRICS, metricsVo);
+ message.data(TaskConstant.JOB_YARN_METRICS, metricsVo);
} else {
- message.data(TaskConstant.ENTRANCEJOB_YARNRESOURCE, null);
+ message.data(TaskConstant.JOB_YARNRESOURCE, null);
}
} else {
- message.data(TaskConstant.ENTRANCEJOB_YARNRESOURCE, null);
+ message.data(TaskConstant.JOB_YARNRESOURCE, null);
}
} catch (Exception e) {
logger.error("build yarnResource error", e);
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
index 24d077068f..66a241026c 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
@@ -20,8 +20,8 @@
import org.apache.linkis.common.ServiceInstance;
import org.apache.linkis.entrance.EntranceContext;
import org.apache.linkis.entrance.EntranceServer;
-import org.apache.linkis.entrance.conf.EntranceConfiguration$;
import org.apache.linkis.entrance.conf.EntranceConfiguration;
+import org.apache.linkis.entrance.conf.EntranceConfiguration$;
import org.apache.linkis.entrance.constant.ServiceNameConsts;
import org.apache.linkis.entrance.execute.EntranceJob;
import org.apache.linkis.entrance.job.EntranceExecutionJob;
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index 45be36287b..c44eb07922 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -353,7 +353,7 @@ abstract class EntranceServer extends Logging {
if (
jobRequest.getMetrics == null
- || !jobRequest.getMetrics.containsKey(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)
+ || !jobRequest.getMetrics.containsKey(TaskConstant.JOB_ENGINECONN_MAP)
) {
val msg = s"job ${jobRequest.getId} not have EC info, ignore it"
logger.info(msg)
@@ -362,7 +362,7 @@ abstract class EntranceServer extends Logging {
}
val engineMap = jobRequest.getMetrics
- .get(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)
+ .get(TaskConstant.JOB_ENGINECONN_MAP)
.asInstanceOf[util.Map[String, Object]]
val engineInstance =
@@ -628,26 +628,26 @@ abstract class EntranceServer extends Logging {
if (EntranceConfiguration.ENTRANCE_FAILOVER_RETAIN_METRIC_ENGINE_CONN_ENABLED.getValue) {
if (
jobRequest.getMetrics != null && jobRequest.getMetrics.containsKey(
- TaskConstant.ENTRANCEJOB_ENGINECONN_MAP
+ TaskConstant.JOB_ENGINECONN_MAP
)
) {
val oldEngineconnMap = jobRequest.getMetrics
- .get(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP)
+ .get(TaskConstant.JOB_ENGINECONN_MAP)
.asInstanceOf[util.Map[String, Object]]
- metricMap.put(TaskConstant.ENTRANCEJOB_ENGINECONN_MAP, oldEngineconnMap)
+ metricMap.put(TaskConstant.JOB_ENGINECONN_MAP, oldEngineconnMap)
}
}
if (EntranceConfiguration.ENTRANCE_FAILOVER_RETAIN_METRIC_YARN_RESOURCE_ENABLED.getValue) {
if (
jobRequest.getMetrics != null && jobRequest.getMetrics.containsKey(
- TaskConstant.ENTRANCEJOB_YARNRESOURCE
+ TaskConstant.JOB_YARNRESOURCE
)
) {
val oldResourceMap = jobRequest.getMetrics
- .get(TaskConstant.ENTRANCEJOB_YARNRESOURCE)
+ .get(TaskConstant.JOB_YARNRESOURCE)
.asInstanceOf[util.Map[String, Object]]
- metricMap.put(TaskConstant.ENTRANCEJOB_YARNRESOURCE, oldResourceMap)
+ metricMap.put(TaskConstant.JOB_YARNRESOURCE, oldResourceMap)
}
}
@@ -659,7 +659,7 @@ abstract class EntranceServer extends Logging {
jobRequest.setErrorCode(0)
jobRequest.setErrorDesc("")
jobRequest.setMetrics(metricMap)
- jobRequest.getMetrics.put(TaskConstant.ENTRANCEJOB_SUBMIT_TIME, initDate)
+ jobRequest.getMetrics.put(TaskConstant.JOB_SUBMIT_TIME, initDate)
jobRequest.setUpdateOrderFlag(false)
logAppender.append(
From acc91db7d6dac9132faf0287d43761807df31e3c Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Mon, 10 Apr 2023 09:56:11 +0800
Subject: [PATCH 049/145] replace Option.apply to null
---
.../entrance/restful/EntranceRestfulApi.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
index 90a1bdbd2b..873f5fd8a4 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/restful/EntranceRestfulApi.java
@@ -263,7 +263,7 @@ public Message status(
}
}
- Option job = Option.apply(null);
+ Option job = null;
try {
job = entranceServer.getJob(realId);
} catch (Exception e) {
@@ -281,7 +281,7 @@ public Message status(
message.data("status", status).data("execID", execID);
return message;
}
- if (job.isDefined()) {
+ if (job != null && job.isDefined()) {
if (job.get() instanceof EntranceJob) {
((EntranceJob) job.get()).updateNewestAccessByClientTimestamp();
}
@@ -638,7 +638,7 @@ public Message log(HttpServletRequest req, @PathVariable("id") String id) {
}
}
- Option job = Option.apply(null);
+ Option job = null;
try {
job = entranceServer.getJob(realId);
} catch (final Throwable t) {
@@ -648,7 +648,7 @@ public Message log(HttpServletRequest req, @PathVariable("id") String id) {
message.setMethod("/api/entrance/" + id + "/log");
return message;
}
- if (job.isDefined()) {
+ if (job != null && job.isDefined()) {
logger.debug("begin to get log for {}(开始获取 {} 的日志)", job.get().getId(), job.get().getId());
LogReader logReader =
entranceServer.getEntranceContext().getOrCreateLogManager().getLogReader(realId);
@@ -741,7 +741,7 @@ public Message killJobs(
String id = idNode.get(i).asText();
Long taskID = taskIDNode.get(i).asLong();
String realId = ZuulEntranceUtils.parseExecID(id)[3];
- Option job = Option.apply(null);
+ Option job = null;
try {
job = entranceServer.getJob(realId);
} catch (Exception e) {
@@ -755,7 +755,7 @@ public Message killJobs(
continue;
}
Message message = null;
- if (job.isEmpty()) {
+ if (job == null || job.isEmpty()) {
logger.warn("can not find a job in entranceServer, will force to kill it");
waitToForceKill.add(taskID);
message = Message.ok("Forced Kill task (强制杀死任务)");
@@ -877,7 +877,7 @@ public Message kill(
}
}
- Option job = Option.apply(null);
+ Option job = null;
try {
job = entranceServer.getJob(realId);
} catch (Exception e) {
@@ -894,7 +894,7 @@ public Message kill(
return message;
}
- if (job.isEmpty()) {
+ if (job == null || job.isEmpty()) {
logger.warn("can not find a job in entranceServer, will force to kill it");
// 如果在内存中找不到该任务,那么该任务可能已经完成了,或者就是重启导致的
JobHistoryHelper.forceKill(taskID);
From caeeab9fc13489adad51a06cf86bd0733a761e05 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Mon, 14 Aug 2023 11:23:30 +0800
Subject: [PATCH 050/145] sql optimize and bug fix
---
.../linkis/entrance/EntranceServer.scala | 3 ++
.../EntranceParallelConsumerManager.scala | 38 ++++++++++---------
.../jobhistory/dao/JobHistoryMapper.java | 6 +--
.../mapper/common/JobHistoryMapper.xml | 4 +-
4 files changed, 28 insertions(+), 23 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index c44eb07922..fc73887534 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -531,6 +531,9 @@ abstract class EntranceServer extends Logging {
.createPersistenceEngine()
.updateIfNeeded(jobRequest)
+ // reset `UpdateOrderFlag`
+ jobRequest.setUpdateOrderFlag(true)
+
logger.info(s"job ${jobRequest.getId} update JobRequest success")
val job = getEntranceContext.getOrCreateEntranceParser().parseToJob(jobRequest)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
index 6d756ad1a8..0f86c2a335 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/scheduler/EntranceParallelConsumerManager.scala
@@ -46,26 +46,28 @@ class EntranceParallelConsumerManager(maxParallelismUsers: Int, schedulerName: S
Utils.defaultScheduler.scheduleAtFixedRate(
new Runnable {
override def run(): Unit = {
- logger.info("start refresh consumer group maxAllowRunningJobs")
- // get all entrance server from eureka
- val serviceInstances =
- Sender.getInstances(Sender.getThisServiceInstance.getApplicationName)
- if (null == serviceInstances || serviceInstances.isEmpty) return
+ Utils.tryAndError {
+ logger.info("start refresh consumer group maxAllowRunningJobs")
+ // get all entrance server from eureka
+ val serviceInstances =
+ Sender.getInstances(Sender.getThisServiceInstance.getApplicationName)
+ if (null == serviceInstances || serviceInstances.isEmpty) return
- // get all offline label server
- val routeLabel = LabelBuilderFactoryContext.getLabelBuilderFactory
- .createLabel[RouteLabel](LabelKeyConstant.ROUTE_KEY, LabelValueConstant.OFFLINE_VALUE)
- val labels = new util.ArrayList[Label[_]]
- labels.add(routeLabel)
- val labelInstances = InstanceLabelClient.getInstance.getInstanceFromLabel(labels)
+ // get all offline label server
+ val routeLabel = LabelBuilderFactoryContext.getLabelBuilderFactory
+ .createLabel[RouteLabel](LabelKeyConstant.ROUTE_KEY, LabelValueConstant.OFFLINE_VALUE)
+ val labels = new util.ArrayList[Label[_]]
+ labels.add(routeLabel)
+ val labelInstances = InstanceLabelClient.getInstance.getInstanceFromLabel(labels)
- // get active entrance server
- val allInstances = new util.ArrayList[ServiceInstance]()
- allInstances.addAll(serviceInstances.toList.asJava)
- allInstances.removeAll(labelInstances)
- // refresh all group maxAllowRunningJobs
- refreshAllGroupMaxAllowRunningJobs(allInstances.size())
- logger.info("Finished to refresh consumer group maxAllowRunningJobs")
+ // get active entrance server
+ val allInstances = new util.ArrayList[ServiceInstance]()
+ allInstances.addAll(serviceInstances.toList.asJava)
+ allInstances.removeAll(labelInstances)
+ // refresh all group maxAllowRunningJobs
+ refreshAllGroupMaxAllowRunningJobs(allInstances.size())
+ logger.info("Finished to refresh consumer group maxAllowRunningJobs")
+ }
}
},
EntranceConfiguration.ENTRANCE_GROUP_SCAN_INIT_TIME.getValue,
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
index 64e76de0f0..806d8ec70c 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/java/org/apache/linkis/jobhistory/dao/JobHistoryMapper.java
@@ -117,9 +117,9 @@ void updateJobHistoryCancelById(
* or a.instances is null or a.instances not in ('192.168.1.123:9104','192.168.1.124:9104') or
* EXISTS ( select 1 from ( select '192.168.1.123:9104' as instances, 1697775054098 as
* registryTime union all select '192.168.1.124:9104' as instances, 1666239054098 as registryTime
- * ) b where a.instances = b.instances and UNIX_TIMESTAMP(a.created_time) * 1000 < b.registryTime
- * ) ) and status in ('Inited','Running','Scheduled','WaitForRetry') and
- * UNIX_TIMESTAMP(a.created_time) * 1000 >= 1666239054098 limit 10
+ * ) b where a.instances = b.instances and a.created_time < FROM_UNIXTIME(b.registryTime/1000) ) )
+ * and status in ('Inited','Running','Scheduled','WaitForRetry') and a.created_time >=
+ * FROM_UNIXTIME(1666239054098/1000) limit 10
*
* @param instancesMap
* @param statusList
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml
index a99dbf3c87..9d76d27ddf 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/common/JobHistoryMapper.xml
@@ -244,12 +244,12 @@
select #{key} as instances, #{val} as registryTime
) b
- where a.instances = b.instances and UNIX_TIMESTAMP(a.created_time) * 1000 b.registryTime
+ where a.instances = b.instances and a.created_time FROM_UNIXTIME(b.registryTime/1000)
)
)
and
status in #{status}
- and UNIX_TIMESTAMP(a.created_time) * 1000 >= #{startTimestamp}
+ and a.created_time >= FROM_UNIXTIME(#{startTimestamp}/1000)
limit #{limit}
From 3d681fb8a74142417c510687527438deb2e31577 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Mon, 14 Aug 2023 15:05:45 +0800
Subject: [PATCH 051/145] merge master
---
.../server/DefaultEntranceServer.java | 3 ++-
.../linkis/entrance/EntranceServer.scala | 2 +-
.../conf/linkis-cg-entrance.properties | 1 -
.../package/conf/linkis-mg-gateway.properties | 1 -
.../mapper/postgresql/JobHistoryMapper.xml | 22 +++++++++++++++++++
5 files changed, 25 insertions(+), 4 deletions(-)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
index af2bbaf19c..94531cd5fe 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
+++ b/linkis-computation-governance/linkis-entrance/src/main/java/org/apache/linkis/entrance/server/DefaultEntranceServer.java
@@ -108,7 +108,8 @@ private void shutdownEntrance(ContextClosedEvent event) {
if (null != allUndoneTask) {
for (EntranceJob job : allUndoneTask) {
job.onFailure(
- "Your job will be marked as canceled because the Entrance service restarted(因为Entrance服务重启,您的任务将被标记为取消)", null);
+ "Your job will be marked as canceled because the Entrance service restarted(因为Entrance服务重启,您的任务将被标记为取消)",
+ null);
IOUtils.closeQuietly(((EntranceExecutionJob) job).getLogWriter().get());
}
}
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index 53cf0256e6..2fa5ff23c4 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -36,9 +36,9 @@ import org.apache.linkis.entrance.utils.JobHistoryHelper
import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.governance.common.protocol.task.RequestTaskKill
+import org.apache.linkis.governance.common.utils.LoggerUtils
import org.apache.linkis.manager.common.protocol.engine.EngineStopRequest
import org.apache.linkis.manager.label.entity.entrance.ExecuteOnceLabel
-import org.apache.linkis.governance.common.utils.LoggerUtils
import org.apache.linkis.protocol.constants.TaskConstant
import org.apache.linkis.rpc.Sender
import org.apache.linkis.rpc.conf.RPCConfiguration
diff --git a/linkis-dist/package/conf/linkis-cg-entrance.properties b/linkis-dist/package/conf/linkis-cg-entrance.properties
index 579ac25a18..c0568288a5 100644
--- a/linkis-dist/package/conf/linkis-cg-entrance.properties
+++ b/linkis-dist/package/conf/linkis-cg-entrance.properties
@@ -39,6 +39,5 @@ spring.eureka.instance.metadata-map.linkis.conf.version=v1
linkis.entrance.auto.clean.dirty.data.enable=true
-wds.linkis.server.mybatis.mapperLocations=classpath*:mapper/common/*.xml,classpath*:mapper/mysql/*.xml
wds.linkis.server.mybatis.BasePackage=org.apache.linkis.publicservice.common.lock.dao
wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.publicservice.common.lock.entity
\ No newline at end of file
diff --git a/linkis-dist/package/conf/linkis-mg-gateway.properties b/linkis-dist/package/conf/linkis-mg-gateway.properties
index 27656f7f31..1f1d2416b4 100644
--- a/linkis-dist/package/conf/linkis-mg-gateway.properties
+++ b/linkis-dist/package/conf/linkis-mg-gateway.properties
@@ -20,7 +20,6 @@ wds.linkis.gateway.conf.enable.proxy.user=false
wds.linkis.gateway.conf.url.pass.auth=/dss/
wds.linkis.gateway.conf.enable.token.auth=true
wds.linkis.is.gateway=true
-wds.linkis.server.mybatis.mapperLocations=classpath*:mapper/common/*.xml,classpath*:mapper/mysql/*.xml
wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.instance.label.entity,org.apache.linkis.jobhistory.entity
wds.linkis.server.mybatis.BasePackage=org.apache.linkis.instance.label.dao,org.apache.linkis.gateway.authentication.dao,org.apache.linkis.jobhistory.dao
wds.linkis.label.entity.packages=org.apache.linkis.gateway.ujes.route.label
diff --git a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/postgresql/JobHistoryMapper.xml b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/postgresql/JobHistoryMapper.xml
index 30e4e85b34..e194a2e4cd 100644
--- a/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/postgresql/JobHistoryMapper.xml
+++ b/linkis-public-enhancements/linkis-jobhistory/src/main/resources/mapper/postgresql/JobHistoryMapper.xml
@@ -229,4 +229,26 @@
#{id}
+
From bdc3e0eb72ce543db841bb937bb5d96fcbd7a005 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Tue, 22 Aug 2023 16:08:52 +0800
Subject: [PATCH 052/145] add comment
---
.../main/scala/org/apache/linkis/entrance/EntranceServer.scala | 1 +
1 file changed, 1 insertion(+)
diff --git a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
index 2fa5ff23c4..4931659742 100644
--- a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
+++ b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/EntranceServer.scala
@@ -669,6 +669,7 @@ abstract class EntranceServer extends Logging {
jobRequest.setErrorDesc("")
jobRequest.setMetrics(metricMap)
jobRequest.getMetrics.put(TaskConstant.JOB_SUBMIT_TIME, initDate)
+ // Allow task status updates to be unordered
jobRequest.setUpdateOrderFlag(false)
logAppender.append(
From 5fa73f2576cc159dfd394c7b6bdc3ad57f777380 Mon Sep 17 00:00:00 2001
From: guoshupei <15764973965@163.com>
Date: Tue, 22 Aug 2023 21:57:10 +0800
Subject: [PATCH 053/145] add mybatis config
---
linkis-dist/package/conf/linkis-cg-entrance.properties | 2 +-
linkis-dist/package/conf/linkis-mg-gateway.properties | 1 +
2 files changed, 2 insertions(+), 1 deletion(-)
diff --git a/linkis-dist/package/conf/linkis-cg-entrance.properties b/linkis-dist/package/conf/linkis-cg-entrance.properties
index c0568288a5..62b1de5d5e 100644
--- a/linkis-dist/package/conf/linkis-cg-entrance.properties
+++ b/linkis-dist/package/conf/linkis-cg-entrance.properties
@@ -38,6 +38,6 @@ spring.eureka.instance.metadata-map.linkis.conf.version=v1
## clean dirty data when the entrance start
linkis.entrance.auto.clean.dirty.data.enable=true
-
+wds.linkis.server.mybatis.mapperLocations=classpath*:mapper/common/*.xml,classpath*:mapper/mysql/*.xml
wds.linkis.server.mybatis.BasePackage=org.apache.linkis.publicservice.common.lock.dao
wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.publicservice.common.lock.entity
\ No newline at end of file
diff --git a/linkis-dist/package/conf/linkis-mg-gateway.properties b/linkis-dist/package/conf/linkis-mg-gateway.properties
index 1f1d2416b4..27656f7f31 100644
--- a/linkis-dist/package/conf/linkis-mg-gateway.properties
+++ b/linkis-dist/package/conf/linkis-mg-gateway.properties
@@ -20,6 +20,7 @@ wds.linkis.gateway.conf.enable.proxy.user=false
wds.linkis.gateway.conf.url.pass.auth=/dss/
wds.linkis.gateway.conf.enable.token.auth=true
wds.linkis.is.gateway=true
+wds.linkis.server.mybatis.mapperLocations=classpath*:mapper/common/*.xml,classpath*:mapper/mysql/*.xml
wds.linkis.server.mybatis.typeAliasesPackage=org.apache.linkis.instance.label.entity,org.apache.linkis.jobhistory.entity
wds.linkis.server.mybatis.BasePackage=org.apache.linkis.instance.label.dao,org.apache.linkis.gateway.authentication.dao,org.apache.linkis.jobhistory.dao
wds.linkis.label.entity.packages=org.apache.linkis.gateway.ujes.route.label
From 4ecb22e373d7407226acc7ac7c63d698f11b394f Mon Sep 17 00:00:00 2001
From: ChengJie1053 <18033291053@163.com>
Date: Wed, 20 Sep 2023 20:32:46 +0800
Subject: [PATCH 054/145] [Feature] Add nebula engine to linkis (#4903)
* Add nebula engine to linkis
* Reuse nebula session
* Code optimization and remove wds prefix
---
.../ujes/jdbc/LinkisSQLConnection.scala | 1 +
.../manager/am/conf/AMConfiguration.java | 7 +-
.../manager/label/conf/LabelCommonConfig.java | 3 +
.../label/entity/engine/EngineType.scala | 3 +
.../manager/label/entity/engine/RunType.scala | 1 +
.../label/utils/EngineTypeLabelCreator.java | 2 +
linkis-engineconn-plugins/nebula/pom.xml | 110 +++++
.../nebula/src/main/assembly/distribution.xml | 71 ++++
.../nebula/NebulaEngineConnPlugin.java | 72 ++++
.../NebulaProcessEngineConnLaunchBuilder.java | 22 +
.../nebula/conf/NebulaConfiguration.java | 50 +++
.../nebula/conf/NebulaEngineConf.java | 53 +++
.../errorcode/NebulaErrorCodeSummary.java | 47 +++
.../exception/NebulaClientException.java | 27 ++
.../nebula/exception/NebulaExecuteError.java | 27 ++
.../NebulaStateInvalidException.java | 27 ++
.../executor/NebulaEngineConnExecutor.java | 388 ++++++++++++++++++
.../resources/linkis-engineconn.properties | 23 ++
.../nebula/src/main/resources/log4j2.xml | 91 ++++
.../factory/NebulaEngineConnFactory.scala | 44 ++
pom.xml | 1 +
21 files changed, 1067 insertions(+), 3 deletions(-)
create mode 100644 linkis-engineconn-plugins/nebula/pom.xml
create mode 100644 linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml
create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java
create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/builder/NebulaProcessEngineConnLaunchBuilder.java
create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaConfiguration.java
create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/conf/NebulaEngineConf.java
create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/errorcode/NebulaErrorCodeSummary.java
create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaClientException.java
create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaExecuteError.java
create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/exception/NebulaStateInvalidException.java
create mode 100644 linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/executor/NebulaEngineConnExecutor.java
create mode 100644 linkis-engineconn-plugins/nebula/src/main/resources/linkis-engineconn.properties
create mode 100644 linkis-engineconn-plugins/nebula/src/main/resources/log4j2.xml
create mode 100644 linkis-engineconn-plugins/nebula/src/main/scala/org/apache/linkis/engineplugin/nebula/factory/NebulaEngineConnFactory.scala
diff --git a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
index b800698766..e111615cee 100644
--- a/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
+++ b/linkis-computation-governance/linkis-jdbc-driver/src/main/scala/org/apache/linkis/ujes/jdbc/LinkisSQLConnection.scala
@@ -431,6 +431,7 @@ class LinkisSQLConnection(private[jdbc] val ujesClient: UJESClient, props: Prope
case EngineType.HIVE => RunType.HIVE
case EngineType.TRINO => RunType.TRINO_SQL
case EngineType.PRESTO => RunType.PRESTO_SQL
+ case EngineType.NEBULA => RunType.NEBULA_SQL
case EngineType.ELASTICSEARCH => RunType.ES_SQL
case EngineType.JDBC => RunType.JDBC
case EngineType.PYTHON => RunType.SHELL
diff --git a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
index d916387d29..8aba142670 100644
--- a/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
+++ b/linkis-computation-governance/linkis-manager/linkis-application-manager/src/main/java/org/apache/linkis/manager/am/conf/AMConfiguration.java
@@ -68,7 +68,8 @@ public class AMConfiguration {
public static final CommonVars MULTI_USER_ENGINE_TYPES =
CommonVars.apply(
- "wds.linkis.multi.user.engine.types", "jdbc,es,presto,io_file,appconn,openlookeng,trino");
+ "wds.linkis.multi.user.engine.types",
+ "jdbc,es,presto,io_file,appconn,openlookeng,trino,nebula");
public static final CommonVars ALLOW_BATCH_KILL_ENGINE_TYPES =
CommonVars.apply("wds.linkis.allow.batch.kill.engine.types", "spark,hive,python");
@@ -104,8 +105,8 @@ public class AMConfiguration {
public static String getDefaultMultiEngineUser() {
String jvmUser = Utils.getJvmUser();
return String.format(
- "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", io_file:\"root\"}",
- jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser);
+ "{jdbc:\"%s\", es: \"%s\", presto:\"%s\", appconn:\"%s\", openlookeng:\"%s\", trino:\"%s\", nebula:\"%s\",io_file:\"root\"}",
+ jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser, jvmUser);
}
public static boolean isMultiUserEngine(String engineType) {
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java
index d0854186a5..f4b52a156b 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/conf/LabelCommonConfig.java
@@ -69,6 +69,9 @@ public class LabelCommonConfig {
public static final CommonVars DATAX_ENGINE_VERSION =
CommonVars.apply("wds.linkis.datax.engine.version", "3.0.0");
+ public static final CommonVars NEBULA_ENGINE_VERSION =
+ CommonVars.apply("wds.linkis.nebula.engine.version", "3.0.0");
+
public static final CommonVars PRESTO_ENGINE_VERSION =
CommonVars.apply("wds.linkis.presto.engine.version", "0.234");
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
index d47bb8ec39..77e7204a73 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/EngineType.scala
@@ -45,6 +45,8 @@ object EngineType extends Enumeration with Logging {
val PRESTO = Value("presto")
+ val NEBULA = Value("nebula")
+
val FLINK = Value("flink")
val APPCONN = Value("appconn")
@@ -89,6 +91,7 @@ object EngineType extends Enumeration with Logging {
case _ if IO_ENGINE_HDFS.toString.equalsIgnoreCase(str) => IO_ENGINE_HDFS
case _ if PIPELINE.toString.equalsIgnoreCase(str) => PIPELINE
case _ if PRESTO.toString.equalsIgnoreCase(str) => PRESTO
+ case _ if NEBULA.toString.equalsIgnoreCase(str) => NEBULA
case _ if FLINK.toString.equalsIgnoreCase(str) => FLINK
case _ if APPCONN.toString.equals(str) => APPCONN
case _ if SQOOP.toString.equalsIgnoreCase(str) => SQOOP
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
index 21a067ed45..abb3e010f8 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/entity/engine/RunType.scala
@@ -35,6 +35,7 @@ object RunType extends Enumeration {
val PIPELINE = Value("pipeline")
val JDBC = Value("jdbc")
val PRESTO_SQL = Value("psql")
+ val NEBULA_SQL = Value("ngql")
val JAR = Value("jar")
val APPCONN = Value("appconn")
val FUNCTION_MDQ_TYPE = Value("function.mdq")
diff --git a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
index 0d6ae3c5c0..e90f282aaf 100644
--- a/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
+++ b/linkis-computation-governance/linkis-manager/linkis-label-common/src/main/java/org/apache/linkis/manager/label/utils/EngineTypeLabelCreator.java
@@ -69,6 +69,8 @@ private static void init() {
EngineType.FLINK().toString(), LabelCommonConfig.FLINK_ENGINE_VERSION.getValue());
defaultVersion.put(
EngineType.PRESTO().toString(), LabelCommonConfig.PRESTO_ENGINE_VERSION.getValue());
+ defaultVersion.put(
+ EngineType.NEBULA().toString(), LabelCommonConfig.NEBULA_ENGINE_VERSION.getValue());
defaultVersion.put(
EngineType.SQOOP().toString(), LabelCommonConfig.SQOOP_ENGINE_VERSION.getValue());
defaultVersion.put(
diff --git a/linkis-engineconn-plugins/nebula/pom.xml b/linkis-engineconn-plugins/nebula/pom.xml
new file mode 100644
index 0000000000..bfe9714569
--- /dev/null
+++ b/linkis-engineconn-plugins/nebula/pom.xml
@@ -0,0 +1,110 @@
+
+
+
+ 4.0.0
+
+ org.apache.linkis
+ linkis
+ ${revision}
+ ../../pom.xml
+
+
+ linkis-engineplugin-nebula
+
+
+
+ org.apache.linkis
+ linkis-engineconn-plugin-core
+ ${project.version}
+
+
+
+ org.apache.linkis
+ linkis-computation-engineconn
+ ${project.version}
+
+
+
+ org.apache.linkis
+ linkis-storage
+ ${project.version}
+ provided
+
+
+
+ org.apache.linkis
+ linkis-rpc
+ ${project.version}
+ provided
+
+
+
+ org.apache.linkis
+ linkis-common
+ ${project.version}
+ provided
+
+
+
+
+ com.vesoft
+ client
+ ${nebula.version}
+
+
+
+
+
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+
+
+
+ org.apache.maven.plugins
+ maven-assembly-plugin
+ false
+
+ false
+ out
+ false
+ false
+
+ src/main/assembly/distribution.xml
+
+
+
+
+ make-assembly
+
+ single
+
+ package
+
+
+ src/main/assembly/distribution.xml
+
+
+
+
+
+
+
+
+
diff --git a/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml b/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml
new file mode 100644
index 0000000000..eaa9c296f1
--- /dev/null
+++ b/linkis-engineconn-plugins/nebula/src/main/assembly/distribution.xml
@@ -0,0 +1,71 @@
+
+
+
+
+ linkis-engineplugin-nebula
+
+ dir
+ zip
+
+ true
+ nebula
+
+
+
+
+
+ /dist/${nebula.version}/lib
+ true
+ true
+ false
+ false
+ true
+
+
+
+
+
+
+
+ ${basedir}/src/main/resources
+
+ linkis-engineconn.properties
+ log4j2.xml
+
+ 0777
+ dist/${nebula.version}/conf
+ unix
+
+
+
+ ${basedir}/target
+
+ *.jar
+
+
+ *doc.jar
+
+ 0777
+ plugin/${nebula.version}
+
+
+
+
+
+
diff --git a/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java
new file mode 100644
index 0000000000..a22d2c8a84
--- /dev/null
+++ b/linkis-engineconn-plugins/nebula/src/main/java/org/apache/linkis/engineplugin/nebula/NebulaEngineConnPlugin.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.engineplugin.nebula;
+
+import org.apache.linkis.engineplugin.nebula.builder.NebulaProcessEngineConnLaunchBuilder;
+import org.apache.linkis.engineplugin.nebula.factory.NebulaEngineConnFactory;
+import org.apache.linkis.manager.engineplugin.common.EngineConnPlugin;
+import org.apache.linkis.manager.engineplugin.common.creation.EngineConnFactory;
+import org.apache.linkis.manager.engineplugin.common.launch.EngineConnLaunchBuilder;
+import org.apache.linkis.manager.engineplugin.common.resource.EngineResourceFactory;
+import org.apache.linkis.manager.engineplugin.common.resource.GenericEngineResourceFactory;
+import org.apache.linkis.manager.label.entity.Label;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class NebulaEngineConnPlugin implements EngineConnPlugin {
+ private Object resourceLocker = new Object();
+ private Object engineFactoryLocker = new Object();
+ private volatile EngineResourceFactory engineResourceFactory;
+ private volatile EngineConnFactory engineFactory;
+ private List