diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala index 226a8c19e7..bba6febfe8 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/BaseRPCSender.scala @@ -37,8 +37,6 @@ import java.util import scala.concurrent.duration.Duration import scala.runtime.BoxedUnit -import feign.{Feign, Retryer} - private[rpc] class BaseRPCSender extends Sender with Logging { private var name: String = _ private var rpc: RPCReceiveRemote = _ @@ -76,9 +74,6 @@ private[rpc] class BaseRPCSender extends Sender with Logging { private[rpc] def getApplicationName = name - protected def doBuilder(builder: Feign.Builder): Unit = - builder.retryer(Retryer.NEVER_RETRY) - protected def newRPC: RPCReceiveRemote = { getDynamicFeignClient.getFeignClient(classOf[RPCReceiveRemote], name) } diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancer.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancer.scala deleted file mode 100644 index 0c869093f9..0000000000 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancer.scala +++ /dev/null @@ -1,79 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.sender - -import org.springframework.beans.factory.ObjectProvider -import org.springframework.cloud.client.ServiceInstance -import org.springframework.cloud.client.loadbalancer.{ - DefaultResponse, - EmptyResponse, - Request, - Response -} -import org.springframework.cloud.loadbalancer.core.{ - ReactorServiceInstanceLoadBalancer, - ServiceInstanceListSupplier -} - -import java.util - -import scala.collection.JavaConverters._ - -import reactor.core.publisher.Mono - -private class LinkisLoadBalancer( - serviceInstance: ServiceInstance, - serviceInstanceListSupplierProvider: ObjectProvider[ServiceInstanceListSupplier] -) extends ReactorServiceInstanceLoadBalancer { - - override def choose(request: Request[_]): Mono[Response[ServiceInstance]] = { - val supplier: ServiceInstanceListSupplier = serviceInstanceListSupplierProvider.getIfAvailable() - supplier.get(request).next.map(this.getInstanceResponse) - } - - private def getInstanceResponse( - instances: util.List[ServiceInstance] - ): Response[ServiceInstance] = { - if (instances.isEmpty) { - new EmptyResponse - } - var instanceResult: ServiceInstance = null - - instances.asScala.find(instance => - serviceInstance.equals(instance.getInstanceId) && - serviceInstance.getHost.equals(instance.getHost) && - serviceInstance.getPort == instance.getPort - ) match { - case Some(instance) => instanceResult = instance - case _ => - } - -// if (instanceResult == null) instanceResult = instances.get(0) - if (instanceResult == null) { - // 则重试 - -// getOrRefresh( -// refreshAllServers(), -// getServiceInstances(serviceInstance.getApplicationName).contains(serviceInstance), -// serviceInstance -// ) - } - new DefaultResponse(instanceResult) - } - -} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancerClientFactory.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancerClientFactory.scala deleted file mode 100644 index 53cdc08b4b..0000000000 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/LinkisLoadBalancerClientFactory.scala +++ /dev/null @@ -1,66 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.rpc.sender - -import org.apache.linkis.common.ServiceInstance - -import org.apache.commons.lang3.StringUtils - -import org.springframework.beans.factory.annotation.Autowired -import org.springframework.cloud.client.DefaultServiceInstance -import org.springframework.cloud.client.loadbalancer.LoadBalancerClientsProperties -import org.springframework.cloud.client.loadbalancer.reactive.ReactiveLoadBalancer -import org.springframework.cloud.loadbalancer.core.ServiceInstanceListSupplier -import org.springframework.cloud.loadbalancer.support.LoadBalancerClientFactory -import org.springframework.core.env.Environment - -private class LinkisLoadBalancerClientFactory( - serviceInstance: ServiceInstance, - loadBalancerClientsProperties: LoadBalancerClientsProperties -) extends LoadBalancerClientFactory(loadBalancerClientsProperties: LoadBalancerClientsProperties) { - - @Autowired - private var env: Environment = _ - - @Autowired - private var loadBalancerClientFactory: LoadBalancerClientFactory = _ - - override def getInstance( - serviceId: String - ): ReactiveLoadBalancer[org.springframework.cloud.client.ServiceInstance] = { - if (null != serviceInstance && StringUtils.isNotBlank(serviceInstance.getInstance)) { - val hostAndPort: Array[String] = serviceInstance.getInstance.split(":") - val defaultServiceInstance: DefaultServiceInstance = new DefaultServiceInstance( - serviceInstance.getApplicationName, - serviceId, - hostAndPort.head, - hostAndPort.last.toInt, - true - ) - val name: String = env.getProperty(LoadBalancerClientFactory.PROPERTY_NAME) - new LinkisLoadBalancer( - defaultServiceInstance, - super.getLazyProvider(name, classOf[ServiceInstanceListSupplier]) - ) - } else { - super.getInstance(serviceId) - } - - } - -} diff --git a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala index 490aa73569..a638560e11 100644 --- a/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala +++ b/linkis-commons/linkis-rpc/src/main/scala/org/apache/linkis/rpc/sender/SpringMVCRPCSender.scala @@ -18,26 +18,18 @@ package org.apache.linkis.rpc.sender import org.apache.linkis.common.ServiceInstance -import org.apache.linkis.common.conf.{Configuration => DWCConfiguration} import org.apache.linkis.rpc.{BaseRPCSender, RPCMessageEvent, RPCSpringBeanCache} import org.apache.linkis.rpc.interceptor.{RPCInterceptor, ServiceInstanceRPCInterceptorChain} import org.apache.commons.lang3.StringUtils import org.springframework.beans.factory.annotation.Autowired -import org.springframework.cloud.client.loadbalancer.LoadBalancerClientsProperties -import org.springframework.cloud.loadbalancer.blocking.client.BlockingLoadBalancerClient -import org.springframework.cloud.openfeign.loadbalancer.FeignBlockingLoadBalancerClient import org.springframework.core.env.Environment -import feign._ - private[rpc] class SpringMVCRPCSender private[rpc] ( private[rpc] val serviceInstance: ServiceInstance ) extends BaseRPCSender(serviceInstance.getApplicationName) { - import SpringCloudFeignConfigurationCache._ - override protected def getRPCInterceptors: Array[RPCInterceptor] = RPCSpringBeanCache.getRPCInterceptors @@ -47,31 +39,6 @@ private[rpc] class SpringMVCRPCSender private[rpc] ( @Autowired private var env: Environment = _ - override protected def doBuilder(builder: Feign.Builder): Unit = { - val client = getClient.asInstanceOf[FeignBlockingLoadBalancerClient] - val loadBalancerClientFactory = - new LinkisLoadBalancerClientFactory(serviceInstance, new LoadBalancerClientsProperties) - - val blockingLoadBalancerClient: BlockingLoadBalancerClient = new BlockingLoadBalancerClient( - loadBalancerClientFactory - ) - - val newClient = new FeignBlockingLoadBalancerClient( - client.getDelegate, - blockingLoadBalancerClient, - // getLoadBalancedRetryFactory, - loadBalancerClientFactory - ) - - super.doBuilder(builder) - builder - .contract(getContract) - .encoder(getEncoder) - .decoder(getDecoder) - .client(newClient) - .requestInterceptor(getRPCTicketIdRequestInterceptor) - } - /** * Deliver is an asynchronous method that requests the target microservice asynchronously, * ensuring that the target microservice is requested once, but does not guarantee that the target