Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a containerized mode to the ECM service #5201

Merged
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ class ServiceInstance {
private var applicationName: String = _
private var instance: String = _
private var registryTimestamp: Long = _
private var mappingPorts: String = _
private var mappingHost: String = _
def setApplicationName(applicationName: String): Unit = this.applicationName = applicationName
def getApplicationName: String = applicationName
def setInstance(instance: String): Unit = this.instance = instance
def getInstance: String = instance

def setMappingPorts(mappingPorts: String): Unit = this.mappingPorts = mappingPorts
def getMappingPorts: String = mappingPorts

def setMappingHost(mappingHost: String): Unit = this.mappingHost = mappingHost
def getMappingHost: String = mappingHost

def setRegistryTimestamp(registryTimestamp: Long): Unit = this.registryTimestamp =
registryTimestamp

Expand Down Expand Up @@ -62,6 +70,18 @@ object ServiceInstance {
serviceInstance
}

def apply(
applicationName: String,
instance: String,
mappingPorts: String,
mappingHost: String
): ServiceInstance = {
val serviceInstance = apply(applicationName, instance)
serviceInstance.setMappingPorts(mappingPorts)
serviceInstance.setMappingHost(mappingHost)
serviceInstance
}

def apply(applicationName: String, instance: String, registryTimestamp: Long): ServiceInstance = {
val serviceInstance = new ServiceInstance
serviceInstance.setApplicationName(applicationName)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.ecm.core.containerization.enums;

public enum MappingPortStrategyName {
STATIC("static");
private String name;

MappingPortStrategyName(String name) {
this.name = name;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public static MappingPortStrategyName toEnum(String name) {
return MappingPortStrategyName.valueOf(name.toUpperCase());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.ecm.core.containerization.strategy;

import org.apache.linkis.ecm.core.containerization.enums.MappingPortStrategyName;

public class MappingPortContext {

public static MappingPortStrategy getInstance(MappingPortStrategyName strategyName) {
switch (strategyName) {
case STATIC:
return new StaticMappingPortStrategy();
default:
return new StaticMappingPortStrategy();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.ecm.core.containerization.strategy;

import java.io.IOException;

public interface MappingPortStrategy {
int availablePort() throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.ecm.core.containerization.strategy;

import org.apache.linkis.ecm.core.conf.ContainerizationConf;

import org.apache.commons.io.IOUtils;

import java.io.IOException;
import java.net.ServerSocket;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicInteger;

public class StaticMappingPortStrategy implements MappingPortStrategy {

private static final AtomicInteger currentIndex = new AtomicInteger(0);

@Override
public int availablePort() throws IOException {
return getNewPort(10);
}

public int getNewPort(int retryNum) throws IOException {
int[] portRange = getPortRange();
if (retryNum == 0) {
throw new IOException(
"No available port in the portRange: "
+ ContainerizationConf.ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE()
.getValue());
}
moveIndex();
int minPort = portRange[0];
int newPort = minPort + currentIndex.get() - 1;
ServerSocket socket = null;
try {
socket = new ServerSocket(newPort);
} catch (Exception e) {
return getNewPort(--retryNum);
} finally {
IOUtils.close(socket);
}
return newPort;
}

private synchronized void moveIndex() {
int poolSize = getPoolSize();
currentIndex.set(currentIndex.get() % poolSize + 1);
}

private int[] getPortRange() {
String portRange =
ContainerizationConf.ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE().getValue();

return Arrays.stream(portRange.split("-")).mapToInt(Integer::parseInt).toArray();
}

private int getPoolSize() {
int[] portRange = getPortRange();
int minPort = portRange[0];
int maxPort = portRange[1];

return maxPort - minPort + 1;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.linkis.ecm.core.conf

import org.apache.linkis.common.conf.CommonVars

object ContainerizationConf {

val ENGINE_CONN_CONTAINERIZATION_MAPPING_STATTIC_PORT_RANGE =
CommonVars("linkis.engine.containerization.static.port.range", "1-65535")

val ENGINE_CONN_CONTAINERIZATION_ENABLE =
CommonVars("linkis.engine.containerization.enable", false).getValue

val ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST =
CommonVars("linkis.engine.containerization.mapping.host", "")

val ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS =
CommonVars("linkis.engine.containerization.mapping.ports", "")

val ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY =
CommonVars("linkis.engine.containerization.mapping.strategy", "static")

// 引擎类型-需要开启的端口数量
// Engine Type - Number of Ports Required to Be Opened
val ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST =
CommonVars("linkis.engine.containerization.engine.list", "spark-2,")

}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ package org.apache.linkis.ecm.core.launch
import org.apache.linkis.common.conf.{CommonVars, Configuration}
import org.apache.linkis.common.exception.ErrorException
import org.apache.linkis.common.utils.{Logging, Utils}
import org.apache.linkis.ecm.core.conf.ContainerizationConf.{
ENGINE_CONN_CONTAINERIZATION_ENABLE,
ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST,
ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST,
ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS,
ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY
}
import org.apache.linkis.ecm.core.containerization.enums.MappingPortStrategyName
import org.apache.linkis.ecm.core.containerization.strategy.MappingPortContext
import org.apache.linkis.ecm.core.errorcode.LinkisECMErrorCodeSummary._
import org.apache.linkis.ecm.core.exception.ECMCoreException
import org.apache.linkis.ecm.core.utils.PortUtils
Expand All @@ -35,6 +44,7 @@ import org.apache.linkis.manager.engineplugin.common.launch.process.{
}
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment._
import org.apache.linkis.manager.engineplugin.common.launch.process.LaunchConstants._
import org.apache.linkis.manager.label.utils.LabelUtil

import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.StringUtils
Expand All @@ -54,6 +64,9 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {

private var engineConnPort: String = _

private var mappingPorts: String = ""
private var mappingHost: String = _

protected def newProcessEngineConnCommandBuilder(): ProcessEngineCommandBuilder =
new UnixProcessEngineCommandBuilder

Expand Down Expand Up @@ -142,6 +155,10 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {

def getEngineConnPort: String = engineConnPort

def getMappingPorts: String = mappingPorts

def getMappingHost: String = mappingHost

protected def getProcess(): Process = this.process

/**
Expand All @@ -166,6 +183,20 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {
.findAvailPortByRange(GovernanceCommonConf.ENGINE_CONN_PORT_RANGE.getValue)
.toString

val engineType = LabelUtil.getEngineType(request.labels)
var engineMappingPortSize = getEngineMappingPortSize(engineType)
if (ENGINE_CONN_CONTAINERIZATION_ENABLE && engineMappingPortSize > 0) {
val strategyName = ENGINE_CONN_CONTAINERIZATION_MAPPING_STRATEGY.getValue
val mappingPortStrategy =
MappingPortContext.getInstance(MappingPortStrategyName.toEnum(strategyName))

while (engineMappingPortSize > 0) {
mappingPorts += mappingPortStrategy.availablePort() + ","
engineMappingPortSize = engineMappingPortSize - 1
}
mappingHost = ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST.getValue
}

var springConf =
Map[String, String]("server.port" -> engineConnPort, "spring.profiles.active" -> "engineconn")
val properties =
Expand All @@ -188,10 +219,24 @@ trait ProcessEngineConnLaunch extends EngineConnLaunch with Logging {
engineConnConf = engineConnConf ++: request.creationDesc.properties.asScala
.filterNot(_._1.startsWith("spring."))
.toMap

engineConnConf += (ENGINE_CONN_CONTAINERIZATION_MAPPING_PORTS.key -> mappingPorts)
engineConnConf += (ENGINE_CONN_CONTAINERIZATION_MAPPING_HOST.key -> mappingHost)

arguments.addEngineConnConf(engineConnConf)
EngineConnArgumentsParser.getEngineConnArgumentsParser.parseToArgs(arguments.build())
}

def getEngineMappingPortSize(engineType: String): Int = {
val engineList = ENGINE_CONN_CONTAINERIZATION_ENGINE_LIST.getValue
val infoList = engineList.trim
.split(",")
.map(_.split("-"))
.filter(engine => engine(0).equals(engineType))
if (infoList.length > 0) infoList(0)(1).toInt
else 0
}

override def kill(): Unit = {
if (process != null) {
process.destroy()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,9 @@ abstract class AbstractEngineConnLaunchService extends EngineConnLaunchService w
case pro: ProcessEngineConnLaunch =>
val serviceInstance = ServiceInstance(
GovernanceCommonConf.ENGINE_CONN_SPRING_NAME.getValue,
ECMUtils.getInstanceByPort(pro.getEngineConnPort)
ECMUtils.getInstanceByPort(pro.getEngineConnPort),
pro.getMappingPorts,
pro.getMappingHost
)
conn.setServiceInstance(serviceInstance)
case _ =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,10 @@ public class AMEngineNodeVo {

private String engineType;

private String mappingPorts;

private String mappingHost;

public String getEmInstance() {
return emInstance;
}
Expand Down Expand Up @@ -288,4 +292,20 @@ public String getEngineType() {
public void setEngineType(String engineType) {
this.engineType = engineType;
}

public String getMappingPorts() {
return mappingPorts;
}

public void setMappingPorts(String mappingPorts) {
this.mappingPorts = mappingPorts;
}

public String getMappingHost() {
return mappingHost;
}

public void setMappingHost(String mappingHost) {
this.mappingHost = mappingHost;
}
}
Loading
Loading