From 8225fde05e04c28cffa635da73f05868b7f47b0c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=85=B4?= Date: Mon, 18 Feb 2019 21:15:41 +0800 Subject: [PATCH] [SCB-1139] update vertx version from 3.5.3 to 3.6.2 (#1084) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * [SCB-1139] update vertx version from 3.5.3 to 3.6.2 1、update netty to 4.1.28 ncnative to 2.0.14.Final jackson 2.9.8 2、vertximpl (just for test) does not have an public construtor, so add the VertxImpl to resolve first, later will be removed 3、DefaultVertxMetrics does not have the vertx parameter, so add an setter method 4、delete the #961, jackson has fixed the promblem by https://github.com/FasterXML/jackson-core/commit/b6c54cf06167b7ef8b45a67e8ddfa427e63c6c20 * remove pseudo :status header of http2 response * change the three file's license to vertx --- LICENSE | 4 +- .../fasterxml/jackson/core/base/DoSFix.java | 151 --- .../jackson/core/base/DoSParserFixed.java | 76 -- .../rest/codec/AbstractRestObjectMapper.java | 5 - .../common/rest/codec/RestObjectMapper.java | 5 - .../param/TestRestClientRequestImpl.java | 2 +- .../core/transport/TransportVertxFactory.java | 1 + .../transport/TestTransportVertxFactory.java | 1 - .../config/client/ConfigCenterClient.java | 2 + .../config/client/TestConfigCenterClient.java | 1 + .../java/io/vertx/core/impl/SyncContext.java | 6 +- .../java/io/vertx/core/impl/SyncVertx.java | 5 +- .../java/io/vertx/core/impl/VertxImpl.java | 1137 +++++++++++++++++ .../java/io/vertx/core/impl/VertxImpl.java | 1118 ++++++++++++++++ .../java/io/vertx/core/impl/VertxImplEx.java | 5 +- .../foundation/vertx/VertxUtils.java | 2 +- .../metrics/DefaultHttpServerMetrics.java | 7 +- .../vertx/metrics/DefaultVertxMetrics.java | 27 +- .../metrics/DefaultVertxMetricsFactory.java | 11 +- .../DefaultClientEndpointMetricManager.java | 11 +- .../vertx/stream/InputStreamToReadStream.java | 7 +- .../http/TestHttpClientPoolFactory.java | 4 +- .../tcp/TestTcpClientConnectionPool.java | 4 +- .../vertx/http/TestReadStreamPart.java | 4 +- .../metrics/TestDefaultHttpClientMetrics.java | 7 +- .../metrics/TestDefaultHttpServerMetrics.java | 13 +- .../metrics/TestDefaultTcpClientMetrics.java | 7 +- .../metrics/TestDefaultTcpServerMetrics.java | 10 +- .../TestDefaultVertxMetricsFactory.java | 3 +- java-chassis-dependencies/pom.xml | 8 +- java-chassis-distribution/src/release/LICENSE | 46 +- .../core/TestVertxMetersInitializer.java | 3 +- pom.xml | 1 + .../client/http/RestUtils.java | 1 + .../client/http/DefaultHttpClientFilter.java | 3 + .../client/http/RestClientInvocation.java | 1 + .../http/impl/Http1xConnectionBaseEx.java | 17 +- .../core/http/impl/VertxImplTestUtils.java | 25 - .../client/http/TestRestClientInvocation.java | 52 +- .../transport/rest/vertx/RestBodyHandler.java | 100 +- .../rest/vertx/RestServerVerticle.java | 2 +- 41 files changed, 2501 insertions(+), 394 deletions(-) delete mode 100644 common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSFix.java delete mode 100644 common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSParserFixed.java create mode 100644 foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/VertxImpl.java create mode 100644 foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImpl.java rename foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java => transports/transport-rest/transport-rest-client/src/test/java/io/vertx/core/http/impl/Http1xConnectionBaseEx.java (67%) delete mode 100644 transports/transport-rest/transport-rest-client/src/test/java/io/vertx/core/http/impl/VertxImplTestUtils.java diff --git a/LICENSE b/LICENSE index a1e780bc829..916a1046239 100644 --- a/LICENSE +++ b/LICENSE @@ -217,7 +217,9 @@ This product bundles files from vertx which is licensed under the Apache License For details, see https://github.com/vert-x3/vertx-web ================================================================ -For foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpImplEx.java +For foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImpl.java + foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/VertxImpl.java + foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/PumpImplEx.java ================================================================ This product bundles files from vertx which is licensed under the Apache License v2. For details, see https://github.com/eclipse-vertx/vert.x diff --git a/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSFix.java b/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSFix.java deleted file mode 100644 index 6c46eca15c7..00000000000 --- a/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSFix.java +++ /dev/null @@ -1,151 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.fasterxml.jackson.core.base; - -import org.apache.servicecomb.foundation.common.utils.JvmUtils; - -import com.fasterxml.jackson.core.JsonFactory; -import com.fasterxml.jackson.core.json.ByteSourceJsonBootstrapper; -import com.fasterxml.jackson.core.json.ReaderBasedJsonParser; -import com.fasterxml.jackson.core.json.UTF8StreamJsonParser; -import com.fasterxml.jackson.databind.MappingJsonFactory; -import com.netflix.config.DynamicPropertyFactory; - -import javassist.CannotCompileException; -import javassist.ClassPool; -import javassist.CtClass; -import javassist.CtMethod; -import javassist.LoaderClassPath; -import javassist.NotFoundException; - -/** - * will be deleted after jackson fix the DoS problem: - * https://github.com/FasterXML/jackson-databind/issues/2157 - */ -public class DoSFix { - private static final String SUFFIX = "Fixed"; - - private static boolean enabled = DynamicPropertyFactory.getInstance() - .getBooleanProperty("servicecomb.jackson.fix.DoS.enabled", true).get(); - - private static boolean fixed; - - private static Class mappingJsonFactoryClass; - - public static synchronized void init() { - if (fixed || !enabled) { - return; - } - - fix(); - } - - public static JsonFactory createJsonFactory() { - try { - return (JsonFactory) mappingJsonFactoryClass.newInstance(); - } catch (Throwable e) { - throw new IllegalStateException("Failed to create JsonFactory.", e); - } - } - - private static void fix() { - try { - ClassLoader classLoader = JvmUtils.correctClassLoader(DoSFix.class.getClassLoader()); - ClassPool pool = new ClassPool(ClassPool.getDefault()); - pool.appendClassPath(new LoaderClassPath(classLoader)); - - fixParserBase(classLoader, pool); - fixReaderParser(classLoader, pool); - fixStreamParser(classLoader, pool); - fixByteSourceJsonBootstrapper(classLoader, pool); - - CtClass ctJsonFactoryFixedClass = fixJsonFactory(classLoader, pool); - fixMappingJsonFactoryClass(classLoader, pool, ctJsonFactoryFixedClass); - - fixed = true; - } catch (Throwable e) { - throw new IllegalStateException( - "Failed to fix jackson DoS bug.", - e); - } - } - - private static void fixMappingJsonFactoryClass(ClassLoader classLoader, ClassPool pool, - CtClass ctJsonFactoryFixedClass) throws NotFoundException, CannotCompileException { - CtClass ctMappingJsonFactoryClass = pool - .getAndRename(MappingJsonFactory.class.getName(), MappingJsonFactory.class.getName() + SUFFIX); - ctMappingJsonFactoryClass.setSuperclass(ctJsonFactoryFixedClass); - mappingJsonFactoryClass = ctMappingJsonFactoryClass.toClass(classLoader, null); - } - - private static CtClass fixJsonFactory(ClassLoader classLoader, ClassPool pool) - throws NotFoundException, CannotCompileException { - CtClass ctJsonFactoryClass = pool.getCtClass(JsonFactory.class.getName()); - CtClass ctJsonFactoryFixedClass = pool.makeClass(JsonFactory.class.getName() + SUFFIX); - ctJsonFactoryFixedClass.setSuperclass(ctJsonFactoryClass); - for (CtMethod ctMethod : ctJsonFactoryClass.getDeclaredMethods()) { - if (ctMethod.getName().equals("_createParser")) { - ctJsonFactoryFixedClass.addMethod(new CtMethod(ctMethod, ctJsonFactoryFixedClass, null)); - } - } - ctJsonFactoryFixedClass - .replaceClassName(ReaderBasedJsonParser.class.getName(), ReaderBasedJsonParser.class.getName() + SUFFIX); - ctJsonFactoryFixedClass - .replaceClassName(UTF8StreamJsonParser.class.getName(), UTF8StreamJsonParser.class.getName() + SUFFIX); - ctJsonFactoryFixedClass.replaceClassName(ByteSourceJsonBootstrapper.class.getName(), - ByteSourceJsonBootstrapper.class.getName() + SUFFIX); - ctJsonFactoryFixedClass.toClass(classLoader, null); - - return ctJsonFactoryFixedClass; - } - - private static void fixByteSourceJsonBootstrapper(ClassLoader classLoader, ClassPool pool) - throws NotFoundException, CannotCompileException { - CtClass ctByteSourceJsonBootstrapper = pool - .getAndRename(ByteSourceJsonBootstrapper.class.getName(), ByteSourceJsonBootstrapper.class.getName() + SUFFIX); - ctByteSourceJsonBootstrapper - .replaceClassName(UTF8StreamJsonParser.class.getName(), UTF8StreamJsonParser.class.getName() + SUFFIX); - ctByteSourceJsonBootstrapper - .replaceClassName(ReaderBasedJsonParser.class.getName(), ReaderBasedJsonParser.class.getName() + SUFFIX); - ctByteSourceJsonBootstrapper.toClass(classLoader, null); - } - - private static void fixStreamParser(ClassLoader classLoader, ClassPool pool) - throws NotFoundException, CannotCompileException { - CtClass ctStreamClass = pool - .getAndRename(UTF8StreamJsonParser.class.getName(), UTF8StreamJsonParser.class.getName() + SUFFIX); - ctStreamClass.replaceClassName(ParserBase.class.getName(), ParserBase.class.getName() + SUFFIX); - ctStreamClass.toClass(classLoader, null); - } - - private static void fixReaderParser(ClassLoader classLoader, ClassPool pool) - throws NotFoundException, CannotCompileException { - CtClass ctReaderClass = pool - .getAndRename(ReaderBasedJsonParser.class.getName(), ReaderBasedJsonParser.class.getName() + SUFFIX); - ctReaderClass.replaceClassName(ParserBase.class.getName(), ParserBase.class.getName() + SUFFIX); - ctReaderClass.toClass(classLoader, null); - } - - private static void fixParserBase(ClassLoader classLoader, ClassPool pool) - throws NotFoundException, CannotCompileException { - CtMethod ctMethodFixed = pool.get(DoSParserFixed.class.getName()).getDeclaredMethod("_parseSlowInt"); - CtClass baseClass = pool.getAndRename(ParserBase.class.getName(), ParserBase.class.getName() + SUFFIX); - baseClass.removeMethod(baseClass.getDeclaredMethod("_parseSlowInt")); - baseClass.addMethod(new CtMethod(ctMethodFixed, baseClass, null)); - baseClass.toClass(classLoader, null); - } -} diff --git a/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSParserFixed.java b/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSParserFixed.java deleted file mode 100644 index 71fbdc675b0..00000000000 --- a/common/common-rest/src/main/java/com/fasterxml/jackson/core/base/DoSParserFixed.java +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.fasterxml.jackson.core.base; - -import java.io.IOException; -import java.io.Reader; -import java.math.BigInteger; - -import com.fasterxml.jackson.core.ObjectCodec; -import com.fasterxml.jackson.core.io.IOContext; -import com.fasterxml.jackson.core.io.NumberInput; -import com.fasterxml.jackson.core.json.ReaderBasedJsonParser; -import com.fasterxml.jackson.core.sym.CharsToNameCanonicalizer; - -/** - * will not be use directly - * just get _parseSlowInt/_parseSlowFloat bytecode and replace to ParserBase - */ -public abstract class DoSParserFixed extends ReaderBasedJsonParser { - public DoSParserFixed(IOContext ctxt, int features, Reader r, - ObjectCodec codec, CharsToNameCanonicalizer st, - char[] inputBuffer, int start, int end, boolean bufferRecyclable) { - super(ctxt, features, r, codec, st, inputBuffer, start, end, bufferRecyclable); - } - - private void _parseSlowInt(int expType) throws IOException { - String numStr = _textBuffer.contentsAsString(); - try { - int len = _intLength; - char[] buf = _textBuffer.getTextBuffer(); - int offset = _textBuffer.getTextOffset(); - if (_numberNegative) { - ++offset; - } - // Some long cases still... - if (NumberInput.inLongRange(buf, offset, len, _numberNegative)) { - // Probably faster to construct a String, call parse, than to use BigInteger - _numberLong = Long.parseLong(numStr); - _numTypesValid = NR_LONG; - } else { - // nope, need the heavy guns... (rare case) - - // *** fix DoS attack begin *** - if (NR_DOUBLE == expType || NR_FLOAT == expType) { - _numberDouble = Double.parseDouble(numStr); - _numTypesValid = NR_DOUBLE; - return; - } - if (NR_BIGINT != expType) { - throw new NumberFormatException("invalid numeric value '" + numStr + "'"); - } - // *** fix DoS attack end *** - - _numberBigInt = new BigInteger(numStr); - _numTypesValid = NR_BIGINT; - } - } catch (NumberFormatException nex) { - // Can this ever occur? Due to overflow, maybe? - _wrapError("Malformed numeric value '" + numStr + "'", nex); - } - } -} diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/AbstractRestObjectMapper.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/AbstractRestObjectMapper.java index 1f64d3d0543..0ca5fa6a019 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/AbstractRestObjectMapper.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/AbstractRestObjectMapper.java @@ -17,15 +17,10 @@ package org.apache.servicecomb.common.rest.codec; -import com.fasterxml.jackson.core.JsonFactory; import com.fasterxml.jackson.databind.ObjectMapper; public abstract class AbstractRestObjectMapper extends ObjectMapper { private static final long serialVersionUID = 189026839992490564L; - public AbstractRestObjectMapper(JsonFactory jsonFactory) { - super(jsonFactory); - } - abstract public String convertToString(Object value) throws Exception; } diff --git a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/RestObjectMapper.java b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/RestObjectMapper.java index ae1a45f7fcf..8c315c52aa2 100644 --- a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/RestObjectMapper.java +++ b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/codec/RestObjectMapper.java @@ -23,7 +23,6 @@ import com.fasterxml.jackson.core.JsonGenerator; import com.fasterxml.jackson.core.JsonParser.Feature; -import com.fasterxml.jackson.core.base.DoSFix; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.JavaType; import com.fasterxml.jackson.databind.JsonSerializer; @@ -35,9 +34,6 @@ import io.vertx.core.json.JsonObject; public class RestObjectMapper extends AbstractRestObjectMapper { - static { - DoSFix.init(); - } private static class JsonObjectSerializer extends JsonSerializer { @Override @@ -52,7 +48,6 @@ public void serialize(JsonObject value, JsonGenerator jgen, SerializerProvider p @SuppressWarnings("deprecation") public RestObjectMapper() { - super(DoSFix.createJsonFactory()); // swagger中要求date使用ISO8601格式传递,这里与之做了功能绑定,这在cse中是没有问题的 setDateFormat(new com.fasterxml.jackson.databind.util.ISO8601DateFormat() { diff --git a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/param/TestRestClientRequestImpl.java b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/param/TestRestClientRequestImpl.java index 6bcb3f9ec63..3e25612cf09 100644 --- a/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/param/TestRestClientRequestImpl.java +++ b/common/common-rest/src/test/java/org/apache/servicecomb/common/rest/codec/param/TestRestClientRequestImpl.java @@ -22,6 +22,7 @@ import java.util.UUID; import javax.servlet.http.Part; +import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; import org.hamcrest.Matchers; @@ -35,7 +36,6 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.CaseInsensitiveHeaders; import io.vertx.core.http.HttpClientRequest; -import io.vertx.core.http.HttpHeaders; import mockit.Deencapsulation; import mockit.Expectations; import mockit.Mock; diff --git a/core/src/main/java/org/apache/servicecomb/core/transport/TransportVertxFactory.java b/core/src/main/java/org/apache/servicecomb/core/transport/TransportVertxFactory.java index 3aa8ab1bb2d..d4e96d380a3 100644 --- a/core/src/main/java/org/apache/servicecomb/core/transport/TransportVertxFactory.java +++ b/core/src/main/java/org/apache/servicecomb/core/transport/TransportVertxFactory.java @@ -35,6 +35,7 @@ public class TransportVertxFactory { public TransportVertxFactory() { vertxOptions.setMetricsOptions(metricsOptionsEx); transportVertx = VertxUtils.getOrCreateVertxByName("transport", vertxOptions); + metricsFactory.setVertx(transportVertx, vertxOptions); } public DefaultVertxMetricsFactory getMetricsFactory() { diff --git a/core/src/test/java/org/apache/servicecomb/core/transport/TestTransportVertxFactory.java b/core/src/test/java/org/apache/servicecomb/core/transport/TestTransportVertxFactory.java index aec2492b2e2..fee477f474e 100644 --- a/core/src/test/java/org/apache/servicecomb/core/transport/TestTransportVertxFactory.java +++ b/core/src/test/java/org/apache/servicecomb/core/transport/TestTransportVertxFactory.java @@ -26,7 +26,6 @@ public void getTransportVertx() { Assert.assertNotNull(vertxFactory.getTransportVertx()); Assert.assertSame(vertxFactory.getTransportVertx(), vertxFactory.getTransportVertx()); - Assert.assertSame(vertxFactory.getTransportVertx(), vertxFactory.getMetricsFactory().getVertxMetrics().getVertx()); vertxFactory.getTransportVertx().close(); } diff --git a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java index 56cb27e8fab..cc0a5df9405 100644 --- a/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java +++ b/dynamic-config/config-cc/src/main/java/org/apache/servicecomb/config/client/ConfigCenterClient.java @@ -163,6 +163,7 @@ private void refreshMembers(MemberDiscovery memberDiscovery) { String configCenter = memberDiscovery.getConfigServer(); IpPort ipPort = NetUtils.parseIpPortFromURI(configCenter); clientMgr.findThreadBindClientPool().runOnContext(client -> { + @SuppressWarnings("deprecation") HttpClientRequest request = client.get(ipPort.getPort(), ipPort.getHostOrIp(), uriConst.MEMBERS, rsp -> { if (rsp.statusCode() == HttpResponseStatus.OK.code()) { @@ -374,6 +375,7 @@ public void refreshConfig(String configcenter, boolean wait) { + ParseConfigUtils.getInstance().getCurrentVersionInfo(); clientMgr.findThreadBindClientPool().runOnContext(client -> { IpPort ipPort = NetUtils.parseIpPortFromURI(configcenter); + @SuppressWarnings("deprecation") HttpClientRequest request = client.get(ipPort.getPort(), ipPort.getHostOrIp(), path, rsp -> { if (rsp.statusCode() == HttpResponseStatus.OK.code()) { rsp.bodyHandler(buf -> { diff --git a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java index 5c70ac4a280..d6fad3af143 100644 --- a/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java +++ b/dynamic-config/config-cc/src/test/java/org/apache/servicecomb/config/client/TestConfigCenterClient.java @@ -53,6 +53,7 @@ import mockit.MockUp; import mockit.Mocked; +@SuppressWarnings("deprecation") public class TestConfigCenterClient { @BeforeClass public static void setUpClass() { diff --git a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java index eb039afd66b..4fd33690b0a 100644 --- a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java +++ b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncContext.java @@ -55,10 +55,10 @@ public static void syncExecuteBlocking(Handler> blockingCodeHandle } @Override - public void executeBlocking(Action action, Handler> resultHandler) { + public void executeBlockingInternal(Handler> action, Handler> resultHandler) { syncExecuteBlocking((future) -> { try { - future.complete(action.perform()); + action.handle(future); } catch (Throwable e) { future.fail(e); } @@ -72,7 +72,7 @@ public void executeBlocking(Handler> blockingCodeHandler, boolean } @Override - void executeBlocking(Action action, Handler> blockingCodeHandler, + void executeBlocking(Handler> blockingCodeHandler, Handler> resultHandler, Executor exec, TaskQueue queue, @SuppressWarnings("rawtypes") PoolMetrics metrics) { syncExecuteBlocking(blockingCodeHandler, resultHandler); diff --git a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java index 5fada598c84..c200ed73cb9 100644 --- a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java +++ b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/SyncVertx.java @@ -20,6 +20,7 @@ import io.vertx.core.Handler; import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; +import io.vertx.core.net.impl.transport.Transport; /** * after test finished, need to invoke vertx.close @@ -28,10 +29,12 @@ public class SyncVertx extends VertxImpl { private ContextImpl context = new SyncContext(this); public SyncVertx() { - this(null, null); + this(new VertxOptions(), null); } protected SyncVertx(VertxOptions options, Handler> resultHandler) { + super(options, Transport.transport(options.getPreferNativeTransport())); + init(); } @Override diff --git a/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/VertxImpl.java b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/VertxImpl.java new file mode 100644 index 00000000000..92dd1d08706 --- /dev/null +++ b/foundations/foundation-test-scaffolding/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -0,0 +1,1137 @@ +/* + * Copyright (c) 2011-2017 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.impl; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.AddressResolverGroup; +import io.netty.util.ResourceLeakDetector; +import io.netty.util.concurrent.GenericFutureListener; +import io.vertx.core.AsyncResult; +import io.vertx.core.Closeable; +import io.vertx.core.Context; +import io.vertx.core.DeploymentOptions; +import io.vertx.core.Future; +import io.vertx.core.Handler; +import io.vertx.core.ServiceHelper; +import io.vertx.core.TimeoutStream; +import io.vertx.core.Verticle; +import io.vertx.core.Vertx; +import io.vertx.core.VertxOptions; +import io.vertx.core.datagram.DatagramSocket; +import io.vertx.core.datagram.DatagramSocketOptions; +import io.vertx.core.datagram.impl.DatagramSocketImpl; +import io.vertx.core.dns.AddressResolverOptions; +import io.vertx.core.dns.DnsClient; +import io.vertx.core.dns.DnsClientOptions; +import io.vertx.core.dns.impl.DnsClientImpl; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.impl.EventBusImpl; +import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus; +import io.vertx.core.file.FileSystem; +import io.vertx.core.file.impl.FileResolver; +import io.vertx.core.file.impl.FileSystemImpl; +import io.vertx.core.file.impl.WindowsFileSystem; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.impl.HttpClientImpl; +import io.vertx.core.http.impl.HttpServerImpl; +import io.vertx.core.impl.resolver.DnsResolverProvider; +import io.vertx.core.json.JsonObject; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.impl.NetClientImpl; +import io.vertx.core.net.impl.NetServerImpl; +import io.vertx.core.net.impl.ServerID; +import io.vertx.core.net.impl.transport.Transport; +import io.vertx.core.shareddata.SharedData; +import io.vertx.core.shareddata.impl.SharedDataImpl; +import io.vertx.core.spi.VerticleFactory; +import io.vertx.core.spi.VertxMetricsFactory; +import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.core.spi.metrics.Metrics; +import io.vertx.core.spi.metrics.MetricsProvider; +import io.vertx.core.spi.metrics.PoolMetrics; +import io.vertx.core.spi.metrics.VertxMetrics; + +/** + * @author Tim Fox + */ +public class VertxImpl implements VertxInternal, MetricsProvider { + + private static final Logger log = LoggerFactory.getLogger(VertxImpl.class); + + private static final String CLUSTER_MAP_NAME = "__vertx.haInfo"; + private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio"; + private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50); + + static { + // Netty resource leak detection has a performance overhead and we do not need it in Vert.x + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); + // Use the JDK deflater/inflater by default + System.setProperty("io.netty.noJdkZlibDecoder", "false"); + } + + static VertxImpl vertx(VertxOptions options) { + VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport())); + vertx.init(); + return vertx; + } + + static VertxImpl vertx(VertxOptions options, Transport transport) { + VertxImpl vertx = new VertxImpl(options, transport); + vertx.init(); + return vertx; + } + + static void clusteredVertx(VertxOptions options, Handler> resultHandler) { + VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport())); + vertx.joinCluster(options, resultHandler); + } + + private final FileSystem fileSystem = getFileSystem(); + private final SharedData sharedData; + private final VertxMetrics metrics; + private final ConcurrentMap timeouts = new ConcurrentHashMap<>(); + private final AtomicLong timeoutCounter = new AtomicLong(0); + private final ClusterManager clusterManager; + private final DeploymentManager deploymentManager; + private final FileResolver fileResolver; + private final Map sharedHttpServers = new HashMap<>(); + private final Map sharedNetServers = new HashMap<>(); + final WorkerPool workerPool; + final WorkerPool internalBlockingPool; + private final ThreadFactory eventLoopThreadFactory; + private final EventLoopGroup eventLoopGroup; + private final EventLoopGroup acceptorEventLoopGroup; + private final BlockedThreadChecker checker; + private final AddressResolver addressResolver; + private final AddressResolverOptions addressResolverOptions; + private final EventBus eventBus; + private volatile HAManager haManager; + private boolean closed; + private volatile Handler exceptionHandler; + private final Map namedWorkerPools; + private final int defaultWorkerPoolSize; + private final long defaultWorkerMaxExecTime; + private final CloseHooks closeHooks; + private final Transport transport; + + @SuppressWarnings("rawtypes") + public VertxImpl(VertxOptions options, Transport transport) { + // Sanity check + if (Vertx.currentContext() != null) { + log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?"); + } + closeHooks = new CloseHooks(log); + checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit()); + eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit()); + eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO); + ThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit()); + // The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections + // under a lot of load + acceptorEventLoopGroup = transport.eventLoopGroup(1, acceptorEventLoopThreadFactory, 100); + + metrics = initialiseMetrics(options); + + ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(), + new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit())); + PoolMetrics workerPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null; + ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(), + new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit())); + PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null; + internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics); + namedWorkerPools = new HashMap<>(); + workerPool = new WorkerPool(workerExec, workerPoolMetrics); + defaultWorkerPoolSize = options.getWorkerPoolSize(); + defaultWorkerMaxExecTime = options.getMaxWorkerExecuteTime(); + + this.transport = transport; + this.fileResolver = new FileResolver(options.getFileSystemOptions()); + this.addressResolverOptions = options.getAddressResolverOptions(); + this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions()); + this.deploymentManager = new DeploymentManager(this); + if (options.isClustered()) { + this.clusterManager = getClusterManager(options); + this.eventBus = new ClusteredEventBus(this, options, clusterManager); + } else { + this.clusterManager = null; + this.eventBus = new EventBusImpl(this); + } + this.sharedData = new SharedDataImpl(this, clusterManager); + } + + public void init() { + eventBus.start(ar -> {}); + if (metrics != null) { + metrics.vertxCreated(this); + } + } + + private void joinCluster(VertxOptions options, Handler> resultHandler) { + clusterManager.setVertx(this); + clusterManager.join(ar -> { + if (ar.succeeded()) { + createHaManager(options, resultHandler); + } else { + log.error("Failed to join cluster", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void createHaManager(VertxOptions options, Handler> resultHandler) { + this.>executeBlocking(fut -> { + fut.complete(clusterManager.getSyncMap(CLUSTER_MAP_NAME)); + }, false, ar -> { + if (ar.succeeded()) { + Map clusterMap = ar.result(); + haManager = new HAManager(this, deploymentManager, clusterManager, clusterMap, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled()); + startEventBus(resultHandler); + } else { + log.error("Failed to start HAManager", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void startEventBus(Handler> resultHandler) { + eventBus.start(ar -> { + if (ar.succeeded()) { + initializeHaManager(resultHandler); + } else { + log.error("Failed to start event bus", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void initializeHaManager(Handler> resultHandler) { + this.executeBlocking(fut -> { + // Init the manager (i.e register listener and check the quorum) + // after the event bus has been fully started and updated its state + // it will have also set the clustered changed view handler on the ha manager + haManager.init(); + fut.complete(); + }, false, ar -> { + if (ar.succeeded()) { + if (metrics != null) { + metrics.vertxCreated(this); + } + resultHandler.handle(Future.succeededFuture(this)); + } else { + log.error("Failed to initialize HAManager", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + /** + * @return The FileSystem implementation for the OS + */ + protected FileSystem getFileSystem() { + return Utils.isWindows() ? new WindowsFileSystem(this) : new FileSystemImpl(this); + } + + @Override + public DatagramSocket createDatagramSocket(DatagramSocketOptions options) { + return DatagramSocketImpl.create(this, options); + } + + @Override + public DatagramSocket createDatagramSocket() { + return createDatagramSocket(new DatagramSocketOptions()); + } + + public NetServer createNetServer(NetServerOptions options) { + return new NetServerImpl(this, options); + } + + @Override + public NetServer createNetServer() { + return createNetServer(new NetServerOptions()); + } + + public NetClient createNetClient(NetClientOptions options) { + return new NetClientImpl(this, options); + } + + @Override + public NetClient createNetClient() { + return createNetClient(new NetClientOptions()); + } + + @Override + public Transport transport() { + return transport; + } + + @Override + public boolean isNativeTransportEnabled() { + return transport != Transport.JDK; + } + + public FileSystem fileSystem() { + return fileSystem; + } + + public SharedData sharedData() { + return sharedData; + } + + public HttpServer createHttpServer(HttpServerOptions serverOptions) { + return new HttpServerImpl(this, serverOptions); + } + + @Override + public HttpServer createHttpServer() { + return createHttpServer(new HttpServerOptions()); + } + + public HttpClient createHttpClient(HttpClientOptions options) { + return new HttpClientImpl(this, options); + } + + @Override + public HttpClient createHttpClient() { + return createHttpClient(new HttpClientOptions()); + } + + public EventBus eventBus() { + return eventBus; + } + + public long setPeriodic(long delay, Handler handler) { + return scheduleTimeout(getOrCreateContext(), handler, delay, true); + } + + @Override + public TimeoutStream periodicStream(long delay) { + return new TimeoutStreamImpl(delay, true); + } + + public long setTimer(long delay, Handler handler) { + return scheduleTimeout(getOrCreateContext(), handler, delay, false); + } + + @Override + public TimeoutStream timerStream(long delay) { + return new TimeoutStreamImpl(delay, false); + } + + public void runOnContext(Handler task) { + ContextImpl context = getOrCreateContext(); + context.runOnContext(task); + } + + // The background pool is used for making blocking calls to legacy synchronous APIs + public ExecutorService getWorkerPool() { + return workerPool.executor(); + } + + public EventLoopGroup getEventLoopGroup() { + return eventLoopGroup; + } + + public EventLoopGroup getAcceptorEventLoopGroup() { + return acceptorEventLoopGroup; + } + + public ContextImpl getOrCreateContext() { + ContextImpl ctx = getContext(); + if (ctx == null) { + // We are running embedded - Create a context + ctx = createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader()); + } + return ctx; + } + + public Map sharedHttpServers() { + return sharedHttpServers; + } + + public Map sharedNetServers() { + return sharedNetServers; + } + + @Override + public boolean isMetricsEnabled() { + return metrics != null; + } + + @Override + public Metrics getMetrics() { + return metrics; + } + + public boolean cancelTimer(long id) { + InternalTimerHandler handler = timeouts.remove(id); + if (handler != null) { + handler.context.removeCloseHook(handler); + return handler.cancel(); + } else { + return false; + } + } + + @Override + public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) { + return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl); + } + + @Override + public ContextImpl createWorkerContext(boolean multiThreaded, String deploymentID, WorkerPool workerPool, JsonObject config, + ClassLoader tccl) { + if (workerPool == null) { + workerPool = this.workerPool; + } + if (multiThreaded) { + return new MultiThreadedWorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl); + } else { + return new WorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl); + } + } + + @Override + public DnsClient createDnsClient(int port, String host) { + return createDnsClient(new DnsClientOptions().setHost(host).setPort(port)); + } + + @Override + public DnsClient createDnsClient() { + return createDnsClient(new DnsClientOptions()); + } + + @Override + public DnsClient createDnsClient(DnsClientOptions options) { + String host = options.getHost(); + int port = options.getPort(); + if (host == null || port < 0) { + DnsResolverProvider provider = new DnsResolverProvider(this, addressResolverOptions); + InetSocketAddress address = provider.nameServerAddresses().get(0); + // provide the host and port + options = new DnsClientOptions(options) + .setHost(address.getAddress().getHostAddress()) + .setPort(address.getPort()); + } + return new DnsClientImpl(this, options); + } + + private VertxMetrics initialiseMetrics(VertxOptions options) { + if (options.getMetricsOptions() != null && options.getMetricsOptions().isEnabled()) { + VertxMetricsFactory factory = options.getMetricsOptions().getFactory(); + if (factory == null) { + factory = ServiceHelper.loadFactoryOrNull(VertxMetricsFactory.class); + if (factory == null) { + log.warn("Metrics has been set to enabled but no VertxMetricsFactory found on classpath"); + } + } + if (factory != null) { + VertxMetrics metrics = factory.metrics(options); + Objects.requireNonNull(metrics, "The metric instance created from " + factory + " cannot be null"); + return metrics; + } + } + return null; + } + + private ClusterManager getClusterManager(VertxOptions options) { + ClusterManager mgr = options.getClusterManager(); + if (mgr == null) { + String clusterManagerClassName = System.getProperty("vertx.cluster.managerClass"); + if (clusterManagerClassName != null) { + // We allow specify a sys prop for the cluster manager factory which overrides ServiceLoader + try { + Class clazz = Class.forName(clusterManagerClassName); + mgr = (ClusterManager) clazz.newInstance(); + } catch (Exception e) { + throw new IllegalStateException("Failed to instantiate " + clusterManagerClassName, e); + } + } else { + mgr = ServiceHelper.loadFactoryOrNull(ClusterManager.class); + if (mgr == null) { + throw new IllegalStateException("No ClusterManagerFactory instances found on classpath"); + } + } + } + return mgr; + } + + private long scheduleTimeout(ContextImpl context, Handler handler, long delay, boolean periodic) { + if (delay < 1) { + throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms"); + } + long timerId = timeoutCounter.getAndIncrement(); + InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, delay, context); + timeouts.put(timerId, task); + context.addCloseHook(task); + return timerId; + } + + public static Context context() { + Thread current = Thread.currentThread(); + if (current instanceof VertxThread) { + return ((VertxThread) current).getContext(); + } + return null; + } + + public ContextImpl getContext() { + ContextImpl context = (ContextImpl) context(); + if (context != null && context.owner == this) { + return context; + } + return null; + } + + public ClusterManager getClusterManager() { + return clusterManager; + } + + @Override + public void close() { + close(null); + } + + private void closeClusterManager(Handler> completionHandler) { + if (clusterManager != null) { + clusterManager.leave(ar -> { + if (ar.failed()) { + log.error("Failed to leave cluster", ar.cause()); + } + if (completionHandler != null) { + runOnContext(v -> completionHandler.handle(Future.succeededFuture())); + } + }); + } else if (completionHandler != null) { + runOnContext(v -> completionHandler.handle(Future.succeededFuture())); + } + } + + @Override + public synchronized void close(Handler> completionHandler) { + if (closed || eventBus == null) { + // Just call the handler directly since pools shutdown + if (completionHandler != null) { + completionHandler.handle(Future.succeededFuture()); + } + return; + } + closed = true; + + closeHooks.run(ar -> { + deploymentManager.undeployAll(ar1 -> { + HAManager haManager = haManager(); + Future haFuture = Future.future(); + if (haManager != null) { + this.executeBlocking(fut -> { + haManager.stop(); + fut.complete(); + }, false, haFuture); + } else { + haFuture.complete(); + } + haFuture.setHandler(ar2 -> { + addressResolver.close(ar3 -> { + eventBus.close(ar4 -> { + closeClusterManager(ar5 -> { + // Copy set to prevent ConcurrentModificationException + Set httpServers = new HashSet<>(sharedHttpServers.values()); + Set netServers = new HashSet<>(sharedNetServers.values()); + sharedHttpServers.clear(); + sharedNetServers.clear(); + + int serverCount = httpServers.size() + netServers.size(); + + AtomicInteger serverCloseCount = new AtomicInteger(); + + Handler> serverCloseHandler = res -> { + if (res.failed()) { + log.error("Failure in shutting down server", res.cause()); + } + if (serverCloseCount.incrementAndGet() == serverCount) { + deleteCacheDirAndShutdown(completionHandler); + } + }; + + for (HttpServerImpl server : httpServers) { + server.closeAll(serverCloseHandler); + } + for (NetServerImpl server : netServers) { + server.closeAll(serverCloseHandler); + } + if (serverCount == 0) { + deleteCacheDirAndShutdown(completionHandler); + } + }); + }); + }); + }); + }); + }); + } + + @Override + public void deployVerticle(Verticle verticle) { + deployVerticle(verticle, new DeploymentOptions(), null); + } + + @Override + public void deployVerticle(Verticle verticle, Handler> completionHandler) { + deployVerticle(verticle, new DeploymentOptions(), completionHandler); + } + + @Override + public void deployVerticle(String name, Handler> completionHandler) { + deployVerticle(name, new DeploymentOptions(), completionHandler); + } + + @Override + public void deployVerticle(Verticle verticle, DeploymentOptions options) { + deployVerticle(verticle, options, null); + } + + @Override + public void deployVerticle(Class verticleClass, DeploymentOptions options) { + deployVerticle(() -> { + try { + return verticleClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, options); + } + + @Override + public void deployVerticle(Supplier verticleSupplier, DeploymentOptions options) { + deployVerticle(verticleSupplier, options, null); + } + + @Override + public void deployVerticle(Verticle verticle, DeploymentOptions options, Handler> completionHandler) { + if (options.getInstances() != 1) { + throw new IllegalArgumentException("Can't specify > 1 instances for already created verticle"); + } + deployVerticle(() -> verticle, options, completionHandler); + } + + @Override + public void deployVerticle(Class verticleClass, DeploymentOptions options, Handler> completionHandler) { + deployVerticle(() -> { + try { + return verticleClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, options, completionHandler); + } + + @Override + public void deployVerticle(Supplier verticleSupplier, DeploymentOptions options, Handler> completionHandler) { + boolean closed; + synchronized (this) { + closed = this.closed; + } + if (closed) { + if (completionHandler != null) { + completionHandler.handle(Future.failedFuture("Vert.x closed")); + } + } else { + deploymentManager.deployVerticle(verticleSupplier, options, completionHandler); + } + } + + @Override + public void deployVerticle(String name) { + deployVerticle(name, new DeploymentOptions(), null); + } + + @Override + public void deployVerticle(String name, DeploymentOptions options) { + deployVerticle(name, options, null); + } + + @Override + public void deployVerticle(String name, DeploymentOptions options, Handler> completionHandler) { + if (options.isHa() && haManager() != null && haManager().isEnabled()) { + haManager().deployVerticle(name, options, completionHandler); + } else { + deploymentManager.deployVerticle(name, options, completionHandler); + } + } + + @Override + public String getNodeID() { + return clusterManager.getNodeID(); + } + + @Override + public void undeploy(String deploymentID) { + undeploy(deploymentID, res -> { + }); + } + + @Override + public void undeploy(String deploymentID, Handler> completionHandler) { + HAManager haManager = haManager(); + Future haFuture = Future.future(); + if (haManager != null && haManager.isEnabled()) { + this.executeBlocking(fut -> { + haManager.removeFromHA(deploymentID); + fut.complete(); + }, false, haFuture); + } else { + haFuture.complete(); + } + haFuture.compose(v -> { + Future deploymentFuture = Future.future(); + deploymentManager.undeployVerticle(deploymentID, deploymentFuture); + return deploymentFuture; + }).setHandler(completionHandler); + } + + @Override + public Set deploymentIDs() { + return deploymentManager.deployments(); + } + + @Override + public void registerVerticleFactory(VerticleFactory factory) { + deploymentManager.registerVerticleFactory(factory); + } + + @Override + public void unregisterVerticleFactory(VerticleFactory factory) { + deploymentManager.unregisterVerticleFactory(factory); + } + + @Override + public Set verticleFactories() { + return deploymentManager.verticleFactories(); + } + + @Override + public void executeBlockingInternal(Handler> blockingCodeHandler, Handler> resultHandler) { + ContextImpl context = getOrCreateContext(); + + context.executeBlockingInternal(blockingCodeHandler, resultHandler); + } + + @Override + public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, + Handler> asyncResultHandler) { + ContextImpl context = getOrCreateContext(); + context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler); + } + + @Override + public void executeBlocking(Handler> blockingCodeHandler, + Handler> asyncResultHandler) { + executeBlocking(blockingCodeHandler, true, asyncResultHandler); + } + + @Override + public boolean isClustered() { + return clusterManager != null; + } + + @Override + public EventLoopGroup nettyEventLoopGroup() { + return eventLoopGroup; + } + + // For testing + public void simulateKill() { + if (haManager() != null) { + haManager().simulateKill(); + } + } + + @Override + public Deployment getDeployment(String deploymentID) { + return deploymentManager.getDeployment(deploymentID); + } + + @Override + public synchronized void failoverCompleteHandler(FailoverCompleteHandler failoverCompleteHandler) { + if (haManager() != null) { + haManager().setFailoverCompleteHandler(failoverCompleteHandler); + } + } + + @Override + public boolean isKilled() { + return haManager().isKilled(); + } + + @Override + public void failDuringFailover(boolean fail) { + if (haManager() != null) { + haManager().failDuringFailover(fail); + } + } + + @Override + public VertxMetrics metricsSPI() { + return metrics; + } + + @Override + public File resolveFile(String fileName) { + return fileResolver.resolveFile(fileName); + } + + @Override + public void resolveAddress(String hostname, Handler> resultHandler) { + addressResolver.resolveHostname(hostname, resultHandler); + } + + @Override + public AddressResolver addressResolver() { + return addressResolver; + } + + @Override + public AddressResolverGroup nettyAddressResolverGroup() { + return addressResolver.nettyAddressResolverGroup(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private void deleteCacheDirAndShutdown(Handler> completionHandler) { + executeBlockingInternal(fut -> { + try { + fileResolver.close(); + fut.complete(); + } catch (IOException e) { + fut.tryFail(e); + } + }, ar -> { + + workerPool.close(); + internalBlockingPool.close(); + new ArrayList<>(namedWorkerPools.values()).forEach(WorkerPool::close); + + acceptorEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!future.isSuccess()) { + log.warn("Failure in shutting down acceptor event loop group", future.cause()); + } + eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!future.isSuccess()) { + log.warn("Failure in shutting down event loop group", future.cause()); + } + if (metrics != null) { + metrics.close(); + } + + checker.close(); + + if (completionHandler != null) { + eventLoopThreadFactory.newThread(() -> { + completionHandler.handle(Future.succeededFuture()); + }).start(); + } + } + }); + } + }); + }); + } + + public HAManager haManager() { + return haManager; + } + + private class InternalTimerHandler implements Handler, Closeable { + final Handler handler; + final boolean periodic; + final long timerID; + final ContextImpl context; + final java.util.concurrent.Future future; + final AtomicBoolean cancelled; + + boolean cancel() { + if (cancelled.compareAndSet(false, true)) { + if (metrics != null) { + metrics.timerEnded(timerID, true); + } + future.cancel(false); + return true; + } else { + return false; + } + } + + InternalTimerHandler(long timerID, Handler runnable, boolean periodic, long delay, ContextImpl context) { + this.context = context; + this.timerID = timerID; + this.handler = runnable; + this.periodic = periodic; + this.cancelled = new AtomicBoolean(); + EventLoop el = context.nettyEventLoop(); + Runnable toRun = () -> context.runOnContext(this); + if (periodic) { + future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS); + } else { + future = el.schedule(toRun, delay, TimeUnit.MILLISECONDS); + } + if (metrics != null) { + metrics.timerCreated(timerID); + } + } + + public void handle(Void v) { + if (!cancelled.get()) { + try { + handler.handle(timerID); + } finally { + if (!periodic) { + // Clean up after it's fired + cleanupNonPeriodic(); + } + } + } + } + + private void cleanupNonPeriodic() { + VertxImpl.this.timeouts.remove(timerID); + if (metrics != null) { + metrics.timerEnded(timerID, false); + } + ContextImpl context = getContext(); + if (context != null) { + context.removeCloseHook(this); + } + } + + // Called via Context close hook when Verticle is undeployed + public void close(Handler> completionHandler) { + VertxImpl.this.timeouts.remove(timerID); + cancel(); + completionHandler.handle(Future.succeededFuture()); + } + + } + + /* + * + * This class is optimised for performance when used on the same event loop that is was passed to the handler with. + * However it can be used safely from other threads. + * + * The internal state is protected using the synchronized keyword. If always used on the same event loop, then + * we benefit from biased locking which makes the overhead of synchronized near zero. + * + */ + private class TimeoutStreamImpl implements TimeoutStream, Handler { + + private final long delay; + private final boolean periodic; + + private Long id; + private Handler handler; + private Handler endHandler; + private long demand; + + public TimeoutStreamImpl(long delay, boolean periodic) { + this.delay = delay; + this.periodic = periodic; + this.demand = Long.MAX_VALUE; + } + + @Override + public synchronized void handle(Long event) { + try { + if (demand > 0) { + demand--; + handler.handle(event); + } + } finally { + if (!periodic && endHandler != null) { + endHandler.handle(null); + } + } + } + + @Override + public synchronized TimeoutStream fetch(long amount) { + demand += amount; + if (demand < 0) { + demand = Long.MAX_VALUE; + } + return this; + } + + @Override + public TimeoutStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public void cancel() { + if (id != null) { + VertxImpl.this.cancelTimer(id); + } + } + + @Override + public synchronized TimeoutStream handler(Handler handler) { + if (handler != null) { + if (id != null) { + throw new IllegalStateException(); + } + this.handler = handler; + id = scheduleTimeout(getOrCreateContext(), this, delay, periodic); + } else { + cancel(); + } + return this; + } + + @Override + public synchronized TimeoutStream pause() { + demand = 0; + return this; + } + + @Override + public synchronized TimeoutStream resume() { + demand = Long.MAX_VALUE; + return this; + } + + @Override + public synchronized TimeoutStream endHandler(Handler endHandler) { + this.endHandler = endHandler; + return this; + } + } + + class SharedWorkerPool extends WorkerPool { + + private final String name; + private int refCount = 1; + + @SuppressWarnings("rawtypes") + SharedWorkerPool(String name, ExecutorService workerExec, PoolMetrics workerMetrics) { + super(workerExec, workerMetrics); + this.name = name; + } + + @Override + void close() { + synchronized (VertxImpl.this) { + if (refCount > 0) { + refCount = 0; + super.close(); + } + } + } + + void release() { + synchronized (VertxImpl.this) { + if (--refCount == 0) { + namedWorkerPools.remove(name); + super.close(); + } + } + } + } + + @Override + public WorkerExecutorImpl createSharedWorkerExecutor(String name) { + return createSharedWorkerExecutor(name, defaultWorkerPoolSize); + } + + @Override + public WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize) { + return createSharedWorkerExecutor(name, poolSize, defaultWorkerMaxExecTime); + } + + @Override + public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime) { + return createSharedWorkerExecutor(name, poolSize, maxExecuteTime, TimeUnit.NANOSECONDS); + } + + @SuppressWarnings("rawtypes") + @Override + public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime, TimeUnit maxExecuteTimeUnit) { + if (poolSize < 1) { + throw new IllegalArgumentException("poolSize must be > 0"); + } + if (maxExecuteTime < 1) { + throw new IllegalArgumentException("maxExecuteTime must be > 0"); + } + SharedWorkerPool sharedWorkerPool = namedWorkerPools.get(name); + if (sharedWorkerPool == null) { + ExecutorService workerExec = Executors.newFixedThreadPool(poolSize, new VertxThreadFactory(name + "-", checker, true, maxExecuteTime, maxExecuteTimeUnit)); + PoolMetrics workerMetrics = metrics != null ? metrics.createPoolMetrics("worker", name, poolSize) : null; + namedWorkerPools.put(name, sharedWorkerPool = new SharedWorkerPool(name, workerExec, workerMetrics)); + } else { + sharedWorkerPool.refCount++; + } + ContextImpl context = getOrCreateContext(); + WorkerExecutorImpl namedExec = new WorkerExecutorImpl(context, sharedWorkerPool); + context.addCloseHook(namedExec); + return namedExec; + } + + @Override + public Vertx exceptionHandler(Handler handler) { + exceptionHandler = handler; + return this; + } + + @Override + public Handler exceptionHandler() { + return exceptionHandler; + } + + @Override + public void addCloseHook(Closeable hook) { + closeHooks.add(hook); + } + + @Override + public void removeCloseHook(Closeable hook) { + closeHooks.remove(hook); + } +} diff --git a/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImpl.java b/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImpl.java new file mode 100644 index 00000000000..ef61ee200c2 --- /dev/null +++ b/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImpl.java @@ -0,0 +1,1118 @@ +/* + * Copyright (c) 2011-2017 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ + +package io.vertx.core.impl; + +import io.netty.channel.EventLoop; +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.AddressResolverGroup; +import io.netty.util.ResourceLeakDetector; +import io.netty.util.concurrent.GenericFutureListener; +import io.vertx.core.*; +import io.vertx.core.Future; +import io.vertx.core.datagram.DatagramSocket; +import io.vertx.core.datagram.DatagramSocketOptions; +import io.vertx.core.datagram.impl.DatagramSocketImpl; +import io.vertx.core.dns.AddressResolverOptions; +import io.vertx.core.dns.DnsClient; +import io.vertx.core.dns.DnsClientOptions; +import io.vertx.core.dns.impl.DnsClientImpl; +import io.vertx.core.eventbus.EventBus; +import io.vertx.core.eventbus.impl.EventBusImpl; +import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus; +import io.vertx.core.file.FileSystem; +import io.vertx.core.file.impl.FileResolver; +import io.vertx.core.file.impl.FileSystemImpl; +import io.vertx.core.file.impl.WindowsFileSystem; +import io.vertx.core.http.HttpClient; +import io.vertx.core.http.HttpClientOptions; +import io.vertx.core.http.HttpServer; +import io.vertx.core.http.HttpServerOptions; +import io.vertx.core.http.impl.HttpClientImpl; +import io.vertx.core.http.impl.HttpServerImpl; +import io.vertx.core.impl.resolver.DnsResolverProvider; +import io.vertx.core.json.JsonObject; +import io.vertx.core.logging.Logger; +import io.vertx.core.logging.LoggerFactory; +import io.vertx.core.net.NetClient; +import io.vertx.core.net.NetClientOptions; +import io.vertx.core.net.NetServer; +import io.vertx.core.net.NetServerOptions; +import io.vertx.core.net.impl.NetClientImpl; +import io.vertx.core.net.impl.NetServerImpl; +import io.vertx.core.net.impl.ServerID; +import io.vertx.core.net.impl.transport.Transport; +import io.vertx.core.shareddata.SharedData; +import io.vertx.core.shareddata.impl.SharedDataImpl; +import io.vertx.core.spi.VerticleFactory; +import io.vertx.core.spi.VertxMetricsFactory; +import io.vertx.core.spi.cluster.ClusterManager; +import io.vertx.core.spi.metrics.Metrics; +import io.vertx.core.spi.metrics.MetricsProvider; +import io.vertx.core.spi.metrics.PoolMetrics; +import io.vertx.core.spi.metrics.VertxMetrics; + +import java.io.File; +import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Supplier; + +/** + * @author Tim Fox + */ +public class VertxImpl implements VertxInternal, MetricsProvider { + + private static final Logger log = LoggerFactory.getLogger(VertxImpl.class); + + private static final String CLUSTER_MAP_NAME = "__vertx.haInfo"; + private static final String NETTY_IO_RATIO_PROPERTY_NAME = "vertx.nettyIORatio"; + private static final int NETTY_IO_RATIO = Integer.getInteger(NETTY_IO_RATIO_PROPERTY_NAME, 50); + + static { + // Netty resource leak detection has a performance overhead and we do not need it in Vert.x + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.DISABLED); + // Use the JDK deflater/inflater by default + System.setProperty("io.netty.noJdkZlibDecoder", "false"); + } + + static VertxImpl vertx(VertxOptions options) { + VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport())); + vertx.init(); + return vertx; + } + + static VertxImpl vertx(VertxOptions options, Transport transport) { + VertxImpl vertx = new VertxImpl(options, transport); + vertx.init(); + return vertx; + } + + static void clusteredVertx(VertxOptions options, Handler> resultHandler) { + VertxImpl vertx = new VertxImpl(options, Transport.transport(options.getPreferNativeTransport())); + vertx.joinCluster(options, resultHandler); + } + + private final FileSystem fileSystem = getFileSystem(); + private final SharedData sharedData; + private final VertxMetrics metrics; + private final ConcurrentMap timeouts = new ConcurrentHashMap<>(); + private final AtomicLong timeoutCounter = new AtomicLong(0); + private final ClusterManager clusterManager; + private final DeploymentManager deploymentManager; + private final FileResolver fileResolver; + private final Map sharedHttpServers = new HashMap<>(); + private final Map sharedNetServers = new HashMap<>(); + final WorkerPool workerPool; + final WorkerPool internalBlockingPool; + private final ThreadFactory eventLoopThreadFactory; + private final EventLoopGroup eventLoopGroup; + private final EventLoopGroup acceptorEventLoopGroup; + private final BlockedThreadChecker checker; + private final AddressResolver addressResolver; + private final AddressResolverOptions addressResolverOptions; + private final EventBus eventBus; + private volatile HAManager haManager; + private boolean closed; + private volatile Handler exceptionHandler; + private final Map namedWorkerPools; + private final int defaultWorkerPoolSize; + private final long defaultWorkerMaxExecTime; + private final CloseHooks closeHooks; + private final Transport transport; + + @SuppressWarnings("rawtypes") + public VertxImpl(VertxOptions options, Transport transport) { + // Sanity check + if (Vertx.currentContext() != null) { + log.warn("You're already on a Vert.x context, are you sure you want to create a new Vertx instance?"); + } + closeHooks = new CloseHooks(log); + checker = new BlockedThreadChecker(options.getBlockedThreadCheckInterval(), options.getBlockedThreadCheckIntervalUnit(), options.getWarningExceptionTime(), options.getWarningExceptionTimeUnit()); + eventLoopThreadFactory = new VertxThreadFactory("vert.x-eventloop-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit()); + eventLoopGroup = transport.eventLoopGroup(options.getEventLoopPoolSize(), eventLoopThreadFactory, NETTY_IO_RATIO); + ThreadFactory acceptorEventLoopThreadFactory = new VertxThreadFactory("vert.x-acceptor-thread-", checker, false, options.getMaxEventLoopExecuteTime(), options.getMaxEventLoopExecuteTimeUnit()); + // The acceptor event loop thread needs to be from a different pool otherwise can get lags in accepted connections + // under a lot of load + acceptorEventLoopGroup = transport.eventLoopGroup(1, acceptorEventLoopThreadFactory, 100); + + metrics = initialiseMetrics(options); + + ExecutorService workerExec = Executors.newFixedThreadPool(options.getWorkerPoolSize(), + new VertxThreadFactory("vert.x-worker-thread-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit())); + PoolMetrics workerPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-worker-thread", options.getWorkerPoolSize()) : null; + ExecutorService internalBlockingExec = Executors.newFixedThreadPool(options.getInternalBlockingPoolSize(), + new VertxThreadFactory("vert.x-internal-blocking-", checker, true, options.getMaxWorkerExecuteTime(), options.getMaxWorkerExecuteTimeUnit())); + PoolMetrics internalBlockingPoolMetrics = metrics != null ? metrics.createPoolMetrics("worker", "vert.x-internal-blocking", options.getInternalBlockingPoolSize()) : null; + internalBlockingPool = new WorkerPool(internalBlockingExec, internalBlockingPoolMetrics); + namedWorkerPools = new HashMap<>(); + workerPool = new WorkerPool(workerExec, workerPoolMetrics); + defaultWorkerPoolSize = options.getWorkerPoolSize(); + defaultWorkerMaxExecTime = options.getMaxWorkerExecuteTime(); + + this.transport = transport; + this.fileResolver = new FileResolver(options.getFileSystemOptions()); + this.addressResolverOptions = options.getAddressResolverOptions(); + this.addressResolver = new AddressResolver(this, options.getAddressResolverOptions()); + this.deploymentManager = new DeploymentManager(this); + if (options.isClustered()) { + this.clusterManager = getClusterManager(options); + this.eventBus = new ClusteredEventBus(this, options, clusterManager); + } else { + this.clusterManager = null; + this.eventBus = new EventBusImpl(this); + } + this.sharedData = new SharedDataImpl(this, clusterManager); + } + + public void init() { + eventBus.start(ar -> {}); + if (metrics != null) { + metrics.vertxCreated(this); + } + } + + private void joinCluster(VertxOptions options, Handler> resultHandler) { + clusterManager.setVertx(this); + clusterManager.join(ar -> { + if (ar.succeeded()) { + createHaManager(options, resultHandler); + } else { + log.error("Failed to join cluster", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void createHaManager(VertxOptions options, Handler> resultHandler) { + this.>executeBlocking(fut -> { + fut.complete(clusterManager.getSyncMap(CLUSTER_MAP_NAME)); + }, false, ar -> { + if (ar.succeeded()) { + Map clusterMap = ar.result(); + haManager = new HAManager(this, deploymentManager, clusterManager, clusterMap, options.getQuorumSize(), options.getHAGroup(), options.isHAEnabled()); + startEventBus(resultHandler); + } else { + log.error("Failed to start HAManager", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void startEventBus(Handler> resultHandler) { + eventBus.start(ar -> { + if (ar.succeeded()) { + initializeHaManager(resultHandler); + } else { + log.error("Failed to start event bus", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + private void initializeHaManager(Handler> resultHandler) { + this.executeBlocking(fut -> { + // Init the manager (i.e register listener and check the quorum) + // after the event bus has been fully started and updated its state + // it will have also set the clustered changed view handler on the ha manager + haManager.init(); + fut.complete(); + }, false, ar -> { + if (ar.succeeded()) { + if (metrics != null) { + metrics.vertxCreated(this); + } + resultHandler.handle(Future.succeededFuture(this)); + } else { + log.error("Failed to initialize HAManager", ar.cause()); + resultHandler.handle(Future.failedFuture(ar.cause())); + } + }); + } + + /** + * @return The FileSystem implementation for the OS + */ + protected FileSystem getFileSystem() { + return Utils.isWindows() ? new WindowsFileSystem(this) : new FileSystemImpl(this); + } + + @Override + public DatagramSocket createDatagramSocket(DatagramSocketOptions options) { + return DatagramSocketImpl.create(this, options); + } + + @Override + public DatagramSocket createDatagramSocket() { + return createDatagramSocket(new DatagramSocketOptions()); + } + + public NetServer createNetServer(NetServerOptions options) { + return new NetServerImpl(this, options); + } + + @Override + public NetServer createNetServer() { + return createNetServer(new NetServerOptions()); + } + + public NetClient createNetClient(NetClientOptions options) { + return new NetClientImpl(this, options); + } + + @Override + public NetClient createNetClient() { + return createNetClient(new NetClientOptions()); + } + + @Override + public Transport transport() { + return transport; + } + + @Override + public boolean isNativeTransportEnabled() { + return transport != Transport.JDK; + } + + public FileSystem fileSystem() { + return fileSystem; + } + + public SharedData sharedData() { + return sharedData; + } + + public HttpServer createHttpServer(HttpServerOptions serverOptions) { + return new HttpServerImpl(this, serverOptions); + } + + @Override + public HttpServer createHttpServer() { + return createHttpServer(new HttpServerOptions()); + } + + public HttpClient createHttpClient(HttpClientOptions options) { + return new HttpClientImpl(this, options); + } + + @Override + public HttpClient createHttpClient() { + return createHttpClient(new HttpClientOptions()); + } + + public EventBus eventBus() { + return eventBus; + } + + public long setPeriodic(long delay, Handler handler) { + return scheduleTimeout(getOrCreateContext(), handler, delay, true); + } + + @Override + public TimeoutStream periodicStream(long delay) { + return new TimeoutStreamImpl(delay, true); + } + + public long setTimer(long delay, Handler handler) { + return scheduleTimeout(getOrCreateContext(), handler, delay, false); + } + + @Override + public TimeoutStream timerStream(long delay) { + return new TimeoutStreamImpl(delay, false); + } + + public void runOnContext(Handler task) { + ContextImpl context = getOrCreateContext(); + context.runOnContext(task); + } + + // The background pool is used for making blocking calls to legacy synchronous APIs + public ExecutorService getWorkerPool() { + return workerPool.executor(); + } + + public EventLoopGroup getEventLoopGroup() { + return eventLoopGroup; + } + + public EventLoopGroup getAcceptorEventLoopGroup() { + return acceptorEventLoopGroup; + } + + public ContextImpl getOrCreateContext() { + ContextImpl ctx = getContext(); + if (ctx == null) { + // We are running embedded - Create a context + ctx = createEventLoopContext(null, null, new JsonObject(), Thread.currentThread().getContextClassLoader()); + } + return ctx; + } + + public Map sharedHttpServers() { + return sharedHttpServers; + } + + public Map sharedNetServers() { + return sharedNetServers; + } + + @Override + public boolean isMetricsEnabled() { + return metrics != null; + } + + @Override + public Metrics getMetrics() { + return metrics; + } + + public boolean cancelTimer(long id) { + InternalTimerHandler handler = timeouts.remove(id); + if (handler != null) { + handler.context.removeCloseHook(handler); + return handler.cancel(); + } else { + return false; + } + } + + @Override + public EventLoopContext createEventLoopContext(String deploymentID, WorkerPool workerPool, JsonObject config, ClassLoader tccl) { + return new EventLoopContext(this, internalBlockingPool, workerPool != null ? workerPool : this.workerPool, deploymentID, config, tccl); + } + + @Override + public ContextImpl createWorkerContext(boolean multiThreaded, String deploymentID, WorkerPool workerPool, JsonObject config, + ClassLoader tccl) { + if (workerPool == null) { + workerPool = this.workerPool; + } + if (multiThreaded) { + return new MultiThreadedWorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl); + } else { + return new WorkerContext(this, internalBlockingPool, workerPool, deploymentID, config, tccl); + } + } + + @Override + public DnsClient createDnsClient(int port, String host) { + return createDnsClient(new DnsClientOptions().setHost(host).setPort(port)); + } + + @Override + public DnsClient createDnsClient() { + return createDnsClient(new DnsClientOptions()); + } + + @Override + public DnsClient createDnsClient(DnsClientOptions options) { + String host = options.getHost(); + int port = options.getPort(); + if (host == null || port < 0) { + DnsResolverProvider provider = new DnsResolverProvider(this, addressResolverOptions); + InetSocketAddress address = provider.nameServerAddresses().get(0); + // provide the host and port + options = new DnsClientOptions(options) + .setHost(address.getAddress().getHostAddress()) + .setPort(address.getPort()); + } + return new DnsClientImpl(this, options); + } + + private VertxMetrics initialiseMetrics(VertxOptions options) { + if (options.getMetricsOptions() != null && options.getMetricsOptions().isEnabled()) { + VertxMetricsFactory factory = options.getMetricsOptions().getFactory(); + if (factory == null) { + factory = ServiceHelper.loadFactoryOrNull(VertxMetricsFactory.class); + if (factory == null) { + log.warn("Metrics has been set to enabled but no VertxMetricsFactory found on classpath"); + } + } + if (factory != null) { + VertxMetrics metrics = factory.metrics(options); + Objects.requireNonNull(metrics, "The metric instance created from " + factory + " cannot be null"); + return metrics; + } + } + return null; + } + + private ClusterManager getClusterManager(VertxOptions options) { + ClusterManager mgr = options.getClusterManager(); + if (mgr == null) { + String clusterManagerClassName = System.getProperty("vertx.cluster.managerClass"); + if (clusterManagerClassName != null) { + // We allow specify a sys prop for the cluster manager factory which overrides ServiceLoader + try { + Class clazz = Class.forName(clusterManagerClassName); + mgr = (ClusterManager) clazz.newInstance(); + } catch (Exception e) { + throw new IllegalStateException("Failed to instantiate " + clusterManagerClassName, e); + } + } else { + mgr = ServiceHelper.loadFactoryOrNull(ClusterManager.class); + if (mgr == null) { + throw new IllegalStateException("No ClusterManagerFactory instances found on classpath"); + } + } + } + return mgr; + } + + private long scheduleTimeout(ContextImpl context, Handler handler, long delay, boolean periodic) { + if (delay < 1) { + throw new IllegalArgumentException("Cannot schedule a timer with delay < 1 ms"); + } + long timerId = timeoutCounter.getAndIncrement(); + InternalTimerHandler task = new InternalTimerHandler(timerId, handler, periodic, delay, context); + timeouts.put(timerId, task); + context.addCloseHook(task); + return timerId; + } + + public static Context context() { + Thread current = Thread.currentThread(); + if (current instanceof VertxThread) { + return ((VertxThread) current).getContext(); + } + return null; + } + + public ContextImpl getContext() { + ContextImpl context = (ContextImpl) context(); + if (context != null && context.owner == this) { + return context; + } + return null; + } + + public ClusterManager getClusterManager() { + return clusterManager; + } + + @Override + public void close() { + close(null); + } + + private void closeClusterManager(Handler> completionHandler) { + if (clusterManager != null) { + clusterManager.leave(ar -> { + if (ar.failed()) { + log.error("Failed to leave cluster", ar.cause()); + } + if (completionHandler != null) { + runOnContext(v -> completionHandler.handle(Future.succeededFuture())); + } + }); + } else if (completionHandler != null) { + runOnContext(v -> completionHandler.handle(Future.succeededFuture())); + } + } + + @Override + public synchronized void close(Handler> completionHandler) { + if (closed || eventBus == null) { + // Just call the handler directly since pools shutdown + if (completionHandler != null) { + completionHandler.handle(Future.succeededFuture()); + } + return; + } + closed = true; + + closeHooks.run(ar -> { + deploymentManager.undeployAll(ar1 -> { + HAManager haManager = haManager(); + Future haFuture = Future.future(); + if (haManager != null) { + this.executeBlocking(fut -> { + haManager.stop(); + fut.complete(); + }, false, haFuture); + } else { + haFuture.complete(); + } + haFuture.setHandler(ar2 -> { + addressResolver.close(ar3 -> { + eventBus.close(ar4 -> { + closeClusterManager(ar5 -> { + // Copy set to prevent ConcurrentModificationException + Set httpServers = new HashSet<>(sharedHttpServers.values()); + Set netServers = new HashSet<>(sharedNetServers.values()); + sharedHttpServers.clear(); + sharedNetServers.clear(); + + int serverCount = httpServers.size() + netServers.size(); + + AtomicInteger serverCloseCount = new AtomicInteger(); + + Handler> serverCloseHandler = res -> { + if (res.failed()) { + log.error("Failure in shutting down server", res.cause()); + } + if (serverCloseCount.incrementAndGet() == serverCount) { + deleteCacheDirAndShutdown(completionHandler); + } + }; + + for (HttpServerImpl server : httpServers) { + server.closeAll(serverCloseHandler); + } + for (NetServerImpl server : netServers) { + server.closeAll(serverCloseHandler); + } + if (serverCount == 0) { + deleteCacheDirAndShutdown(completionHandler); + } + }); + }); + }); + }); + }); + }); + } + + @Override + public void deployVerticle(Verticle verticle) { + deployVerticle(verticle, new DeploymentOptions(), null); + } + + @Override + public void deployVerticle(Verticle verticle, Handler> completionHandler) { + deployVerticle(verticle, new DeploymentOptions(), completionHandler); + } + + @Override + public void deployVerticle(String name, Handler> completionHandler) { + deployVerticle(name, new DeploymentOptions(), completionHandler); + } + + @Override + public void deployVerticle(Verticle verticle, DeploymentOptions options) { + deployVerticle(verticle, options, null); + } + + @Override + public void deployVerticle(Class verticleClass, DeploymentOptions options) { + deployVerticle(() -> { + try { + return verticleClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, options); + } + + @Override + public void deployVerticle(Supplier verticleSupplier, DeploymentOptions options) { + deployVerticle(verticleSupplier, options, null); + } + + @Override + public void deployVerticle(Verticle verticle, DeploymentOptions options, Handler> completionHandler) { + if (options.getInstances() != 1) { + throw new IllegalArgumentException("Can't specify > 1 instances for already created verticle"); + } + deployVerticle(() -> verticle, options, completionHandler); + } + + @Override + public void deployVerticle(Class verticleClass, DeploymentOptions options, Handler> completionHandler) { + deployVerticle(() -> { + try { + return verticleClass.newInstance(); + } catch (Exception e) { + throw new RuntimeException(e); + } + }, options, completionHandler); + } + + @Override + public void deployVerticle(Supplier verticleSupplier, DeploymentOptions options, Handler> completionHandler) { + boolean closed; + synchronized (this) { + closed = this.closed; + } + if (closed) { + if (completionHandler != null) { + completionHandler.handle(Future.failedFuture("Vert.x closed")); + } + } else { + deploymentManager.deployVerticle(verticleSupplier, options, completionHandler); + } + } + + @Override + public void deployVerticle(String name) { + deployVerticle(name, new DeploymentOptions(), null); + } + + @Override + public void deployVerticle(String name, DeploymentOptions options) { + deployVerticle(name, options, null); + } + + @Override + public void deployVerticle(String name, DeploymentOptions options, Handler> completionHandler) { + if (options.isHa() && haManager() != null && haManager().isEnabled()) { + haManager().deployVerticle(name, options, completionHandler); + } else { + deploymentManager.deployVerticle(name, options, completionHandler); + } + } + + @Override + public String getNodeID() { + return clusterManager.getNodeID(); + } + + @Override + public void undeploy(String deploymentID) { + undeploy(deploymentID, res -> { + }); + } + + @Override + public void undeploy(String deploymentID, Handler> completionHandler) { + HAManager haManager = haManager(); + Future haFuture = Future.future(); + if (haManager != null && haManager.isEnabled()) { + this.executeBlocking(fut -> { + haManager.removeFromHA(deploymentID); + fut.complete(); + }, false, haFuture); + } else { + haFuture.complete(); + } + haFuture.compose(v -> { + Future deploymentFuture = Future.future(); + deploymentManager.undeployVerticle(deploymentID, deploymentFuture); + return deploymentFuture; + }).setHandler(completionHandler); + } + + @Override + public Set deploymentIDs() { + return deploymentManager.deployments(); + } + + @Override + public void registerVerticleFactory(VerticleFactory factory) { + deploymentManager.registerVerticleFactory(factory); + } + + @Override + public void unregisterVerticleFactory(VerticleFactory factory) { + deploymentManager.unregisterVerticleFactory(factory); + } + + @Override + public Set verticleFactories() { + return deploymentManager.verticleFactories(); + } + + @Override + public void executeBlockingInternal(Handler> blockingCodeHandler, Handler> resultHandler) { + ContextImpl context = getOrCreateContext(); + + context.executeBlockingInternal(blockingCodeHandler, resultHandler); + } + + @Override + public void executeBlocking(Handler> blockingCodeHandler, boolean ordered, + Handler> asyncResultHandler) { + ContextImpl context = getOrCreateContext(); + context.executeBlocking(blockingCodeHandler, ordered, asyncResultHandler); + } + + @Override + public void executeBlocking(Handler> blockingCodeHandler, + Handler> asyncResultHandler) { + executeBlocking(blockingCodeHandler, true, asyncResultHandler); + } + + @Override + public boolean isClustered() { + return clusterManager != null; + } + + @Override + public EventLoopGroup nettyEventLoopGroup() { + return eventLoopGroup; + } + + // For testing + public void simulateKill() { + if (haManager() != null) { + haManager().simulateKill(); + } + } + + @Override + public Deployment getDeployment(String deploymentID) { + return deploymentManager.getDeployment(deploymentID); + } + + @Override + public synchronized void failoverCompleteHandler(FailoverCompleteHandler failoverCompleteHandler) { + if (haManager() != null) { + haManager().setFailoverCompleteHandler(failoverCompleteHandler); + } + } + + @Override + public boolean isKilled() { + return haManager().isKilled(); + } + + @Override + public void failDuringFailover(boolean fail) { + if (haManager() != null) { + haManager().failDuringFailover(fail); + } + } + + @Override + public VertxMetrics metricsSPI() { + return metrics; + } + + @Override + public File resolveFile(String fileName) { + return fileResolver.resolveFile(fileName); + } + + @Override + public void resolveAddress(String hostname, Handler> resultHandler) { + addressResolver.resolveHostname(hostname, resultHandler); + } + + @Override + public AddressResolver addressResolver() { + return addressResolver; + } + + @Override + public AddressResolverGroup nettyAddressResolverGroup() { + return addressResolver.nettyAddressResolverGroup(); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) + private void deleteCacheDirAndShutdown(Handler> completionHandler) { + executeBlockingInternal(fut -> { + try { + fileResolver.close(); + fut.complete(); + } catch (IOException e) { + fut.tryFail(e); + } + }, ar -> { + + workerPool.close(); + internalBlockingPool.close(); + new ArrayList<>(namedWorkerPools.values()).forEach(WorkerPool::close); + + acceptorEventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!future.isSuccess()) { + log.warn("Failure in shutting down acceptor event loop group", future.cause()); + } + eventLoopGroup.shutdownGracefully(0, 10, TimeUnit.SECONDS).addListener(new GenericFutureListener() { + @Override + public void operationComplete(io.netty.util.concurrent.Future future) throws Exception { + if (!future.isSuccess()) { + log.warn("Failure in shutting down event loop group", future.cause()); + } + if (metrics != null) { + metrics.close(); + } + + checker.close(); + + if (completionHandler != null) { + eventLoopThreadFactory.newThread(() -> { + completionHandler.handle(Future.succeededFuture()); + }).start(); + } + } + }); + } + }); + }); + } + + public HAManager haManager() { + return haManager; + } + + private class InternalTimerHandler implements Handler, Closeable { + final Handler handler; + final boolean periodic; + final long timerID; + final ContextImpl context; + final java.util.concurrent.Future future; + final AtomicBoolean cancelled; + + boolean cancel() { + if (cancelled.compareAndSet(false, true)) { + if (metrics != null) { + metrics.timerEnded(timerID, true); + } + future.cancel(false); + return true; + } else { + return false; + } + } + + InternalTimerHandler(long timerID, Handler runnable, boolean periodic, long delay, ContextImpl context) { + this.context = context; + this.timerID = timerID; + this.handler = runnable; + this.periodic = periodic; + this.cancelled = new AtomicBoolean(); + EventLoop el = context.nettyEventLoop(); + Runnable toRun = () -> context.runOnContext(this); + if (periodic) { + future = el.scheduleAtFixedRate(toRun, delay, delay, TimeUnit.MILLISECONDS); + } else { + future = el.schedule(toRun, delay, TimeUnit.MILLISECONDS); + } + if (metrics != null) { + metrics.timerCreated(timerID); + } + } + + public void handle(Void v) { + if (!cancelled.get()) { + try { + handler.handle(timerID); + } finally { + if (!periodic) { + // Clean up after it's fired + cleanupNonPeriodic(); + } + } + } + } + + private void cleanupNonPeriodic() { + VertxImpl.this.timeouts.remove(timerID); + if (metrics != null) { + metrics.timerEnded(timerID, false); + } + ContextImpl context = getContext(); + if (context != null) { + context.removeCloseHook(this); + } + } + + // Called via Context close hook when Verticle is undeployed + public void close(Handler> completionHandler) { + VertxImpl.this.timeouts.remove(timerID); + cancel(); + completionHandler.handle(Future.succeededFuture()); + } + + } + + /* + * + * This class is optimised for performance when used on the same event loop that is was passed to the handler with. + * However it can be used safely from other threads. + * + * The internal state is protected using the synchronized keyword. If always used on the same event loop, then + * we benefit from biased locking which makes the overhead of synchronized near zero. + * + */ + private class TimeoutStreamImpl implements TimeoutStream, Handler { + + private final long delay; + private final boolean periodic; + + private Long id; + private Handler handler; + private Handler endHandler; + private long demand; + + public TimeoutStreamImpl(long delay, boolean periodic) { + this.delay = delay; + this.periodic = periodic; + this.demand = Long.MAX_VALUE; + } + + @Override + public synchronized void handle(Long event) { + try { + if (demand > 0) { + demand--; + handler.handle(event); + } + } finally { + if (!periodic && endHandler != null) { + endHandler.handle(null); + } + } + } + + @Override + public synchronized TimeoutStream fetch(long amount) { + demand += amount; + if (demand < 0) { + demand = Long.MAX_VALUE; + } + return this; + } + + @Override + public TimeoutStream exceptionHandler(Handler handler) { + return this; + } + + @Override + public void cancel() { + if (id != null) { + VertxImpl.this.cancelTimer(id); + } + } + + @Override + public synchronized TimeoutStream handler(Handler handler) { + if (handler != null) { + if (id != null) { + throw new IllegalStateException(); + } + this.handler = handler; + id = scheduleTimeout(getOrCreateContext(), this, delay, periodic); + } else { + cancel(); + } + return this; + } + + @Override + public synchronized TimeoutStream pause() { + demand = 0; + return this; + } + + @Override + public synchronized TimeoutStream resume() { + demand = Long.MAX_VALUE; + return this; + } + + @Override + public synchronized TimeoutStream endHandler(Handler endHandler) { + this.endHandler = endHandler; + return this; + } + } + + class SharedWorkerPool extends WorkerPool { + + private final String name; + private int refCount = 1; + + @SuppressWarnings("rawtypes") + SharedWorkerPool(String name, ExecutorService workerExec, PoolMetrics workerMetrics) { + super(workerExec, workerMetrics); + this.name = name; + } + + @Override + void close() { + synchronized (VertxImpl.this) { + if (refCount > 0) { + refCount = 0; + super.close(); + } + } + } + + void release() { + synchronized (VertxImpl.this) { + if (--refCount == 0) { + namedWorkerPools.remove(name); + super.close(); + } + } + } + } + + @Override + public WorkerExecutorImpl createSharedWorkerExecutor(String name) { + return createSharedWorkerExecutor(name, defaultWorkerPoolSize); + } + + @Override + public WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize) { + return createSharedWorkerExecutor(name, poolSize, defaultWorkerMaxExecTime); + } + + @Override + public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime) { + return createSharedWorkerExecutor(name, poolSize, maxExecuteTime, TimeUnit.NANOSECONDS); + } + + @SuppressWarnings("rawtypes") + @Override + public synchronized WorkerExecutorImpl createSharedWorkerExecutor(String name, int poolSize, long maxExecuteTime, TimeUnit maxExecuteTimeUnit) { + if (poolSize < 1) { + throw new IllegalArgumentException("poolSize must be > 0"); + } + if (maxExecuteTime < 1) { + throw new IllegalArgumentException("maxExecuteTime must be > 0"); + } + SharedWorkerPool sharedWorkerPool = namedWorkerPools.get(name); + if (sharedWorkerPool == null) { + ExecutorService workerExec = Executors.newFixedThreadPool(poolSize, new VertxThreadFactory(name + "-", checker, true, maxExecuteTime, maxExecuteTimeUnit)); + PoolMetrics workerMetrics = metrics != null ? metrics.createPoolMetrics("worker", name, poolSize) : null; + namedWorkerPools.put(name, sharedWorkerPool = new SharedWorkerPool(name, workerExec, workerMetrics)); + } else { + sharedWorkerPool.refCount++; + } + ContextImpl context = getOrCreateContext(); + WorkerExecutorImpl namedExec = new WorkerExecutorImpl(context, sharedWorkerPool); + context.addCloseHook(namedExec); + return namedExec; + } + + @Override + public Vertx exceptionHandler(Handler handler) { + exceptionHandler = handler; + return this; + } + + @Override + public Handler exceptionHandler() { + return exceptionHandler; + } + + @Override + public void addCloseHook(Closeable hook) { + closeHooks.add(hook); + } + + @Override + public void removeCloseHook(Closeable hook) { + closeHooks.remove(hook); + } +} diff --git a/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImplEx.java b/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImplEx.java index 2020ff3b41d..2ea0182e92f 100644 --- a/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImplEx.java +++ b/foundations/foundation-vertx/src/main/java/io/vertx/core/impl/VertxImplEx.java @@ -25,13 +25,14 @@ import io.vertx.core.VertxOptions; import io.vertx.core.json.JsonObject; +import io.vertx.core.net.impl.transport.Transport; public class VertxImplEx extends VertxImpl { private AtomicLong eventLoopContextCreated = new AtomicLong(); public VertxImplEx(String name, VertxOptions vertxOptions) { - super(vertxOptions); - + super(vertxOptions, Transport.transport(vertxOptions.getPreferNativeTransport())); + init(); if (StringUtils.isEmpty(name)) { return; } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java index a3aabd69452..8861a793a2a 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/VertxUtils.java @@ -41,7 +41,7 @@ import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; import io.vertx.core.buffer.Buffer; -import io.vertx.core.impl.FileResolver; +import io.vertx.core.file.impl.FileResolver; import io.vertx.core.impl.VertxImplEx; import io.vertx.core.logging.SLF4JLogDelegateFactory; diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultHttpServerMetrics.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultHttpServerMetrics.java index b99e8f9246d..4d007ed9ecf 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultHttpServerMetrics.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultHttpServerMetrics.java @@ -62,12 +62,7 @@ public void responseEnd(Object requestMetric, HttpServerResponse response) { } @Override - public Object upgrade(Object requestMetric, ServerWebSocket serverWebSocket) { - return null; - } - - @Override - public Object connected(DefaultHttpSocketMetric socketMetric, ServerWebSocket serverWebSocket) { + public Object connected(DefaultHttpSocketMetric socketMetric, Object requestMetric, ServerWebSocket serverWebSocket) { return null; } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java index 4d59a296f99..f78b6b862e5 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetrics.java @@ -24,9 +24,7 @@ import io.vertx.core.Vertx; import io.vertx.core.VertxOptions; -import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientOptions; -import io.vertx.core.http.HttpServer; import io.vertx.core.http.HttpServerOptions; import io.vertx.core.metrics.impl.DummyVertxMetrics; import io.vertx.core.net.NetClientOptions; @@ -37,8 +35,6 @@ import io.vertx.core.spi.metrics.TCPMetrics; public class DefaultVertxMetrics extends DummyVertxMetrics { - private final Vertx vertx; - private VertxOptions vertxOptions; // to support listen multiple addresses, must use a map to manage the metric @@ -46,17 +42,12 @@ public class DefaultVertxMetrics extends DummyVertxMetrics { private volatile DefaultClientEndpointMetricManager clientEndpointMetricManager; - public DefaultVertxMetrics(Vertx vertx, VertxOptions vertxOptions) { - this.vertx = vertx; + public DefaultVertxMetrics(VertxOptions vertxOptions) { this.vertxOptions = vertxOptions; - this.clientEndpointMetricManager = new DefaultClientEndpointMetricManager(vertx, + this.clientEndpointMetricManager = new DefaultClientEndpointMetricManager( (MetricsOptionsEx) vertxOptions.getMetricsOptions()); } - public Vertx getVertx() { - return vertx; - } - public DefaultClientEndpointMetricManager getClientEndpointMetricManager() { return clientEndpointMetricManager; } @@ -66,27 +57,27 @@ public Map getServerEndpointMetricMa } @Override - public HttpServerMetrics createMetrics(HttpServer server, SocketAddress localAddress, - HttpServerOptions options) { + public HttpServerMetrics createHttpServerMetrics(HttpServerOptions options, SocketAddress localAddress + ) { DefaultServerEndpointMetric endpointMetric = serverEndpointMetricMap .computeIfAbsent(localAddress, DefaultServerEndpointMetric::new); return new DefaultHttpServerMetrics(endpointMetric); } @Override - public HttpClientMetrics createMetrics(HttpClient client, HttpClientOptions options) { + public HttpClientMetrics createHttpClientMetrics(HttpClientOptions options) { return new DefaultHttpClientMetrics(clientEndpointMetricManager); } @Override - public TCPMetrics createMetrics(SocketAddress localAddress, NetServerOptions options) { + public TCPMetrics createNetServerMetrics(NetServerOptions options, SocketAddress localAddress) { DefaultServerEndpointMetric endpointMetric = serverEndpointMetricMap .computeIfAbsent(localAddress, DefaultServerEndpointMetric::new); return new DefaultTcpServerMetrics(endpointMetric); } @Override - public TCPMetrics createMetrics(NetClientOptions options) { + public TCPMetrics createNetClientMetrics(NetClientOptions options) { return new DefaultTcpClientMetrics(clientEndpointMetricManager); } @@ -100,4 +91,8 @@ public boolean isMetricsEnabled() { public boolean isEnabled() { return true; } + + public void setVertx(Vertx vertx) { + clientEndpointMetricManager.setVertx(vertx); + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetricsFactory.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetricsFactory.java index 1ceb91684bd..1c558029f50 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetricsFactory.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/DefaultVertxMetricsFactory.java @@ -44,9 +44,9 @@ public DefaultVertxMetrics getVertxMetrics() { } @Override - public synchronized VertxMetrics metrics(Vertx vertx, VertxOptions options) { + public synchronized VertxMetrics metrics(VertxOptions options) { if (vertxMetrics == null) { - vertxMetrics = new DefaultVertxMetrics(vertx, options); + vertxMetrics = new DefaultVertxMetrics(options); } return vertxMetrics; } @@ -58,4 +58,11 @@ public MetricsOptions newOptions() { metricsOptions.setEnabled(true); return metricsOptions; } + + public void setVertx(Vertx vertx, VertxOptions options) { + if (vertxMetrics == null) { + vertxMetrics = new DefaultVertxMetrics(options); + } + vertxMetrics.setVertx(vertx); + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java index aa25d132fb9..e537b186549 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/metrics/metric/DefaultClientEndpointMetricManager.java @@ -30,20 +30,19 @@ import io.vertx.core.net.SocketAddress; public class DefaultClientEndpointMetricManager { - private final Vertx vertx; - private final MetricsOptionsEx metricsOptionsEx; // to avoid save too many endpoint that not exist any more // must check expired periodically private Map clientEndpointMetricMap = new ConcurrentHashMapEx<>(); + private Vertx vertx; + private final ReadWriteLock rwlock = new ReentrantReadWriteLock(); private AtomicBoolean inited = new AtomicBoolean(false); - public DefaultClientEndpointMetricManager(Vertx vertx, MetricsOptionsEx metricsOptionsEx) { - this.vertx = vertx; + public DefaultClientEndpointMetricManager(MetricsOptionsEx metricsOptionsEx) { this.metricsOptionsEx = metricsOptionsEx; } @@ -88,4 +87,8 @@ public void onCheckClientEndpointMetricExpired(long periodic) { } } } + + public void setVertx(Vertx vertx) { + this.vertx = vertx; + } } diff --git a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java index 6d66fb60f98..18ed2745815 100644 --- a/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java +++ b/foundations/foundation-vertx/src/main/java/org/apache/servicecomb/foundation/vertx/stream/InputStreamToReadStream.java @@ -191,7 +191,7 @@ private void closeInputStream() { if (closed) { return; } - + closed = true; if (!autoCloseInputStream) { return; @@ -210,4 +210,9 @@ public ReadStream endHandler(Handler handler) { this.endHandler = handler; return this; } + + @Override + public ReadStream fetch(long amount) { + return this; + } } diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/http/TestHttpClientPoolFactory.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/http/TestHttpClientPoolFactory.java index 669d1da6697..eb63c1db8d7 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/http/TestHttpClientPoolFactory.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/http/TestHttpClientPoolFactory.java @@ -21,7 +21,7 @@ import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientOptions; -import io.vertx.core.impl.ContextImpl; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxImpl; import io.vertx.core.impl.VertxInternal; import mockit.Expectations; @@ -33,7 +33,7 @@ public class TestHttpClientPoolFactory { HttpClientPoolFactory factory = new HttpClientPoolFactory(httpClientOptions); @Test - public void createClientPool(@Mocked VertxInternal vertx, @Mocked ContextImpl context, + public void createClientPool(@Mocked VertxInternal vertx, @Mocked ContextInternal context, @Mocked HttpClient httpClient) { new Expectations(VertxImpl.class) { { diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnectionPool.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnectionPool.java index 2ca9736d8bc..f13e59ff75f 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnectionPool.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/client/tcp/TestTcpClientConnectionPool.java @@ -21,12 +21,12 @@ import org.junit.Before; import org.junit.Test; -import io.vertx.core.impl.ContextImpl; +import io.vertx.core.impl.ContextInternal; import mockit.Mocked; public class TestTcpClientConnectionPool { @Mocked - ContextImpl context; + ContextInternal context; @Mocked NetClientWrapper netClientWrapper; diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java index 0e50c68ba7d..c2f6230f2b4 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/http/TestReadStreamPart.java @@ -40,7 +40,7 @@ import io.vertx.core.file.FileSystemException; import io.vertx.core.file.OpenOptions; import io.vertx.core.http.HttpClientResponse; -import io.vertx.core.impl.ContextImpl; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.SyncVertx; import io.vertx.core.streams.WriteStream; import mockit.Expectations; @@ -51,7 +51,7 @@ public class TestReadStreamPart { static SyncVertx vertx = new SyncVertx(); - static ContextImpl context = vertx.getContext(); + static ContextInternal context = vertx.getContext(); static String src = "src"; diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpClientMetrics.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpClientMetrics.java index caeb25e1541..c026512a0e0 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpClientMetrics.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpClientMetrics.java @@ -99,9 +99,10 @@ private static DefaultHttpSocketMetric initSocketMetric(DefaultHttpClientMetrics @Before public void setup() { vertxOptions.setMetricsOptions(metricsOptionsEx); - defaultVertxMetrics = new DefaultVertxMetrics(vertx, vertxOptions); - clientMetrics_a = (DefaultHttpClientMetrics) defaultVertxMetrics.createMetrics(anyHttpClient, options); - clientMetrics_b = (DefaultHttpClientMetrics) defaultVertxMetrics.createMetrics(anyHttpClient, options); + defaultVertxMetrics = new DefaultVertxMetrics(vertxOptions); + defaultVertxMetrics.setVertx(vertx); + clientMetrics_a = (DefaultHttpClientMetrics) defaultVertxMetrics.createHttpClientMetrics(options); + clientMetrics_b = (DefaultHttpClientMetrics) defaultVertxMetrics.createHttpClientMetrics(options); nanoTime = 1; diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpServerMetrics.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpServerMetrics.java index b95ad5a6e3c..51505978c33 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpServerMetrics.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultHttpServerMetrics.java @@ -93,18 +93,18 @@ public class TestDefaultHttpServerMetrics { @Before public void setup() { vertxOptions.setMetricsOptions(metricsOptionsEx); - defaultVertxMetrics = new DefaultVertxMetrics(vertx, vertxOptions); + defaultVertxMetrics = new DefaultVertxMetrics(vertxOptions); metrics_listen1_server1 = (DefaultHttpServerMetrics) defaultVertxMetrics - .createMetrics(listen1_server1, listen1_addr, options); + .createHttpServerMetrics(options, listen1_addr); metrics_listen1_server2 = (DefaultHttpServerMetrics) defaultVertxMetrics - .createMetrics(listen1_server2, listen1_addr, options); + .createHttpServerMetrics(options, listen1_addr); endpointMetric1 = metrics_listen1_server1.getEndpointMetric(); metrics_listen2_server1 = (DefaultHttpServerMetrics) defaultVertxMetrics - .createMetrics(listen2_server1, listen2_addr, options); + .createHttpServerMetrics(options, listen2_addr); metrics_listen2_server2 = (DefaultHttpServerMetrics) defaultVertxMetrics - .createMetrics(listen2_server2, listen2_addr, options); + .createHttpServerMetrics(options, listen2_addr); endpointMetric2 = metrics_listen2_server1.getEndpointMetric(); socketMetric_listen1_1 = metrics_listen1_server1.connected(anyRemoteAddr, remoteName); @@ -198,8 +198,7 @@ public void meaningless() { metrics_listen1_server1.requestReset(null); metrics_listen1_server1.responsePushed(null, null, null, null); metrics_listen1_server1.responseEnd(null, null); - metrics_listen1_server1.upgrade(null, null); - metrics_listen1_server1.connected((DefaultHttpSocketMetric) null, null); + metrics_listen1_server1.connected((DefaultHttpSocketMetric) null, null, null); metrics_listen1_server1.disconnected(null); metrics_listen1_server1.exceptionOccurred(null, null, null); metrics_listen1_server1.close(); diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpClientMetrics.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpClientMetrics.java index fbbe8dec921..140534ebd81 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpClientMetrics.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpClientMetrics.java @@ -94,9 +94,10 @@ private static DefaultTcpSocketMetric initSocketMetric(DefaultTcpClientMetrics m @Before public void setup() { vertxOptions.setMetricsOptions(metricsOptionsEx); - defaultVertxMetrics = new DefaultVertxMetrics(vertx, vertxOptions); - clientMetrics_a = (DefaultTcpClientMetrics) defaultVertxMetrics.createMetrics(options); - clientMetrics_b = (DefaultTcpClientMetrics) defaultVertxMetrics.createMetrics(options); + defaultVertxMetrics = new DefaultVertxMetrics(vertxOptions); + defaultVertxMetrics.setVertx(vertx); + clientMetrics_a = (DefaultTcpClientMetrics) defaultVertxMetrics.createNetClientMetrics(options); + clientMetrics_b = (DefaultTcpClientMetrics) defaultVertxMetrics.createNetClientMetrics(options); nanoTime = 1; diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpServerMetrics.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpServerMetrics.java index c6ed45e3a9b..8cdf0a847c9 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpServerMetrics.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultTcpServerMetrics.java @@ -80,18 +80,18 @@ public class TestDefaultTcpServerMetrics { @Before public void setup() { vertxOptions.setMetricsOptions(metricsOptionsEx); - defaultVertxMetrics = new DefaultVertxMetrics(vertx, vertxOptions); + defaultVertxMetrics = new DefaultVertxMetrics(vertxOptions); metrics_listen1_server1 = (DefaultTcpServerMetrics) defaultVertxMetrics - .createMetrics(listen1_addr, options); + .createNetServerMetrics(options, listen1_addr); metrics_listen1_server2 = (DefaultTcpServerMetrics) defaultVertxMetrics - .createMetrics(listen1_addr, options); + .createNetServerMetrics(options, listen1_addr); endpointMetric1 = metrics_listen1_server1.getEndpointMetric(); metrics_listen2_server1 = (DefaultTcpServerMetrics) defaultVertxMetrics - .createMetrics(listen2_addr, options); + .createNetServerMetrics(options, listen2_addr); metrics_listen2_server2 = (DefaultTcpServerMetrics) defaultVertxMetrics - .createMetrics(listen2_addr, options); + .createNetServerMetrics(options, listen2_addr); endpointMetric2 = metrics_listen2_server1.getEndpointMetric(); socketMetric_listen1_1 = metrics_listen1_server1.connected(anyRemoteAddr, remoteName); diff --git a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultVertxMetricsFactory.java b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultVertxMetricsFactory.java index a96f23c7fa1..64af2df5704 100644 --- a/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultVertxMetricsFactory.java +++ b/foundations/foundation-vertx/src/test/java/org/apache/servicecomb/foundation/vertx/metrics/TestDefaultVertxMetricsFactory.java @@ -38,13 +38,12 @@ public class TestDefaultVertxMetricsFactory { public void metrics() { MetricsOptions metricsOptions = factory.newOptions(); options.setMetricsOptions(metricsOptions); - VertxMetrics vertxMetrics = factory.metrics(vertx, options); + VertxMetrics vertxMetrics = factory.metrics(options); Assert.assertSame(factory, metricsOptions.getFactory()); Assert.assertTrue(metricsOptions.isEnabled()); Assert.assertSame(factory.getVertxMetrics(), vertxMetrics); - Assert.assertSame(vertx, ((DefaultVertxMetrics) vertxMetrics).getVertx()); Assert.assertTrue(vertxMetrics.isMetricsEnabled()); Assert.assertTrue(vertxMetrics.isEnabled()); } diff --git a/java-chassis-dependencies/pom.xml b/java-chassis-dependencies/pom.xml index 4642339e3a4..c4f2cce1ef6 100644 --- a/java-chassis-dependencies/pom.xml +++ b/java-chassis-dependencies/pom.xml @@ -31,8 +31,8 @@ pom - 2.9.6 - 3.5.3 + 2.9.8 + 3.6.2 0.8 4.3.20.RELEASE 1.7.7 @@ -45,8 +45,8 @@ 4.5.2 1.5.2 1.5.12 - 4.1.24.Final - 2.0.7.Final + 4.1.28.Final + 2.0.14.Final ${basedir}/../.. 5.3.2.Final 3.1.6 diff --git a/java-chassis-distribution/src/release/LICENSE b/java-chassis-distribution/src/release/LICENSE index 06e8e818201..4e3ed326962 100644 --- a/java-chassis-distribution/src/release/LICENSE +++ b/java-chassis-distribution/src/release/LICENSE @@ -436,14 +436,14 @@ Google Guice - Extensions - AssistedInject (https://github.com/google/guice/exte Google Guice - Extensions - MultiBindings (https://github.com/google/guice/extensions-parent/guice-multibindings) com.google.inject.extensions:guice-multibindings:jar:4.1.0 Hibernate Validator Engine (http://hibernate.org/validator/hibernate-validator) org.hibernate:hibernate-validator:jar:6.0.2.Final hystrix-core (https://github.com/Netflix/Hystrix) com.netflix.hystrix:hystrix-core:jar:1.5.12 -Jackson dataformat: protobuf (http://github.com/FasterXML/jackson-dataformats-binary) com.fasterxml.jackson.dataformat:jackson-dataformat-protobuf:bundle:2.9.6 +Jackson dataformat: protobuf (http://github.com/FasterXML/jackson-dataformats-binary) com.fasterxml.jackson.dataformat:jackson-dataformat-protobuf:bundle:2.9.8 Jackson module: Afterburner (https://github.com/FasterXML/jackson-modules-base) com.fasterxml.jackson.module:jackson-module-afterburner:bundle:2.8.11 Jackson module: JAXB-annotations (http://github.com/FasterXML/jackson-module-jaxb-annotations) com.fasterxml.jackson.module:jackson-module-jaxb-annotations:bundle:2.8.11 -Jackson-annotations (http://github.com/FasterXML/jackson) com.fasterxml.jackson.core:jackson-annotations:bundle:2.9.6 -Jackson-core (https://github.com/FasterXML/jackson-core) com.fasterxml.jackson.core:jackson-core:bundle:2.9.6 -jackson-databind (http://github.com/FasterXML/jackson) com.fasterxml.jackson.core:jackson-databind:bundle:2.9.6 -Jackson-dataformat-XML (http://wiki.fasterxml.com/JacksonExtensionXmlDataBinding) com.fasterxml.jackson.dataformat:jackson-dataformat-xml:bundle:2.9.6 -Jackson-dataformat-YAML (https://github.com/FasterXML/jackson-dataformats-text) com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:bundle:2.9.6 +Jackson-annotations (http://github.com/FasterXML/jackson) com.fasterxml.jackson.core:jackson-annotations:bundle:2.9.8 +Jackson-core (https://github.com/FasterXML/jackson-core) com.fasterxml.jackson.core:jackson-core:bundle:2.9.8 +jackson-databind (http://github.com/FasterXML/jackson) com.fasterxml.jackson.core:jackson-databind:bundle:2.9.8 +Jackson-dataformat-XML (http://wiki.fasterxml.com/JacksonExtensionXmlDataBinding) com.fasterxml.jackson.dataformat:jackson-dataformat-xml:bundle:2.9.8 +Jackson-dataformat-YAML (https://github.com/FasterXML/jackson-dataformats-text) com.fasterxml.jackson.dataformat:jackson-dataformat-yaml:bundle:2.9.8 Javassist (http://www.javassist.org/) org.javassist:javassist:bundle:3.18.1-GA javax.inject (http://code.google.com/p/atinject/) javax.inject:javax.inject:jar:1 JBoss Logging 3 (http://www.jboss.org) org.jboss.logging:jboss-logging:jar:3.3.2.Final @@ -451,19 +451,19 @@ json-lib (http://json-lib.sourceforge.net) net.sf.json-lib:json-lib:jar:2.4 Log4j Implemented Over SLF4J (http://www.slf4j.org) org.slf4j:log4j-over-slf4j:jar:1.7.7 netflix-commons-util (https://github.com/Netflix/netflix-commons) com.netflix.netflix-commons:netflix-commons-util:jar:0.1.1 netflix-statistics (https://github.com/Netflix/netflix-commons) com.netflix.netflix-commons:netflix-statistics:jar:0.1.1 -Netty/Buffer (http://netty.io/netty-buffer/) io.netty:netty-buffer:jar:4.1.24.Final -Netty/Codec (http://netty.io/netty-codec/) io.netty:netty-codec:jar:4.1.24.Final -Netty/Codec/DNS (http://netty.io/netty-codec-dns/) io.netty:netty-codec-dns:jar:4.1.24.Final -Netty/Codec/HTTP (http://netty.io/netty-codec-http/) io.netty:netty-codec-http:jar:4.1.24.Final -Netty/Codec/HTTP2 (http://netty.io/netty-codec-http2/) io.netty:netty-codec-http2:jar:4.1.24.Final -Netty/Codec/Socks (http://netty.io/netty-codec-socks/) io.netty:netty-codec-socks:jar:4.1.24.Final -Netty/Common (http://netty.io/netty-common/) io.netty:netty-common:jar:4.1.24.Final -Netty/Handler (http://netty.io/netty-handler/) io.netty:netty-handler:jar:4.1.24.Final -Netty/Handler/Proxy (http://netty.io/netty-handler-proxy/) io.netty:netty-handler-proxy:jar:4.1.24.Final -Netty/Resolver (http://netty.io/netty-resolver/) io.netty:netty-resolver:jar:4.1.24.Final -Netty/Resolver/DNS (http://netty.io/netty-resolver-dns/) io.netty:netty-resolver-dns:jar:4.1.24.Final -Netty/TomcatNative(https://netty.io/wiki/forked-tomcat-native.html) io.netty:netty-tcnative-boringssl-static:2.0.7.Final -Netty/Transport (http://netty.io/netty-transport/) io.netty:netty-transport:jar:4.1.24.Final +Netty/Buffer (http://netty.io/netty-buffer/) io.netty:netty-buffer:jar:4.1.28.Final +Netty/Codec (http://netty.io/netty-codec/) io.netty:netty-codec:jar:4.1.28.Final +Netty/Codec/DNS (http://netty.io/netty-codec-dns/) io.netty:netty-codec-dns:jar:4.1.28.Final +Netty/Codec/HTTP (http://netty.io/netty-codec-http/) io.netty:netty-codec-http:jar:4.1.28.Final +Netty/Codec/HTTP2 (http://netty.io/netty-codec-http2/) io.netty:netty-codec-http2:jar:4.1.28.Final +Netty/Codec/Socks (http://netty.io/netty-codec-socks/) io.netty:netty-codec-socks:jar:4.1.28.Final +Netty/Common (http://netty.io/netty-common/) io.netty:netty-common:jar:4.1.28.Final +Netty/Handler (http://netty.io/netty-handler/) io.netty:netty-handler:jar:4.1.28.Final +Netty/Handler/Proxy (http://netty.io/netty-handler-proxy/) io.netty:netty-handler-proxy:jar:4.1.28.Final +Netty/Resolver (http://netty.io/netty-resolver/) io.netty:netty-resolver:jar:4.1.28.Final +Netty/Resolver/DNS (http://netty.io/netty-resolver-dns/) io.netty:netty-resolver-dns:jar:4.1.28.Final +Netty/TomcatNative(https://netty.io/wiki/forked-tomcat-native.html) io.netty:netty-tcnative-boringssl-static:2.0.14.Final +Netty/Transport (http://netty.io/netty-transport/) io.netty:netty-transport:jar:4.1.28.Final OkHttp (https://github.com/square/okhttp/okhttp) com.squareup.okhttp3:okhttp:jar:3.4.1 Okio (https://github.com/square/okio/okio) com.squareup.okio:okio:jar:1.9.0 Prometheus Java Simpleclient (http://github.com/prometheus/client_java/simpleclient) io.prometheus:simpleclient:bundle:0.1.0 @@ -524,10 +524,10 @@ tomcat-embed-core (http://tomcat.apache.org/) org.apache.tomcat.embed:tomcat-emb tomcat-embed-el (http://tomcat.apache.org/) org.apache.tomcat.embed:tomcat-embed-el:jar:8.0.33 tomcat-embed-logging-juli (http://tomcat.apache.org/) org.apache.tomcat.embed:tomcat-embed-logging-juli:jar:8.0.33 tomcat-embed-websocket (http://tomcat.apache.org/) org.apache.tomcat.embed:tomcat-embed-websocket:jar:8.0.33 -Vert.x Bridge Common (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-bridge-common) io.vertx:vertx-bridge-common:jar:3.5.3 -Vert.x Core (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-core) io.vertx:vertx-core:jar:3.5.3 -vertx-auth-common (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-auth/vertx-auth-common) io.vertx:vertx-auth-common:jar:3.5.3 -vertx-web (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-web-parent/vertx-web) io.vertx:vertx-web:jar:3.5.3 +Vert.x Bridge Common (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-bridge-common) io.vertx:vertx-bridge-common:jar:3.6.2 +Vert.x Core (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-core) io.vertx:vertx-core:jar:3.6.2 +vertx-auth-common (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-auth/vertx-auth-common) io.vertx:vertx-auth-common:jar:3.6.2 +vertx-web (http://nexus.sonatype.org/oss-repository-hosting.html/vertx-parent/vertx-ext/vertx-ext-parent/vertx-web-parent/vertx-web) io.vertx:vertx-web:jar:3.6.2 Woodstox (https://github.com/FasterXML/woodstox) com.fasterxml.woodstox:woodstox-core:bundle:5.0.3 Zipkin Reporter Spring Factory Beans (https://github.com/openzipkin/zipkin-reporter-java/zipkin-reporter-spring-beans) io.zipkin.reporter2:zipkin-reporter-spring-beans:jar:2.7.13 Zipkin Reporter: Core (https://github.com/openzipkin/zipkin-reporter-java/zipkin-reporter) io.zipkin.reporter2:zipkin-reporter:jar:2.7.13 diff --git a/metrics/metrics-core/src/test/java/org/apache/servicecomb/metrics/core/TestVertxMetersInitializer.java b/metrics/metrics-core/src/test/java/org/apache/servicecomb/metrics/core/TestVertxMetersInitializer.java index ad5573fd187..6f5b9d53815 100644 --- a/metrics/metrics-core/src/test/java/org/apache/servicecomb/metrics/core/TestVertxMetersInitializer.java +++ b/metrics/metrics-core/src/test/java/org/apache/servicecomb/metrics/core/TestVertxMetersInitializer.java @@ -74,7 +74,7 @@ public void start(Future startFuture) { }); HttpServer server = vertx.createHttpServer(); - server.requestHandler(mainRouter::accept); + server.requestHandler(mainRouter); server.listen(0, "0.0.0.0", ar -> { if (ar.succeeded()) { port = ar.result().actualPort(); @@ -88,6 +88,7 @@ public void start(Future startFuture) { } public static class TestClientVerticle extends AbstractVerticle { + @SuppressWarnings("deprecation") @Override public void start(Future startFuture) { HttpClient client = vertx.createHttpClient(); diff --git a/pom.xml b/pom.xml index ceebcc3879a..57a202f05a2 100644 --- a/pom.xml +++ b/pom.xml @@ -195,6 +195,7 @@ **/target/** **/io/vertx/ext/web/impl/MimeTypesUtils.java + **/io/vertx/core/impl/VertxImpl.java **/java/org/apache/servicecomb/transport/rest/vertx/RestBodyHandler.java **/java/org/apache/servicecomb/foundation/vertx/stream/PumpImplEx.java diff --git a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestUtils.java b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestUtils.java index f9db0c3887d..bb797810742 100644 --- a/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestUtils.java +++ b/service-registry/src/main/java/org/apache/servicecomb/serviceregistry/client/http/RestUtils.java @@ -83,6 +83,7 @@ public static void httpDo(long timeout, RequestContext requestContext, Handler { responseHandler.handle(new RestResponse(requestContext, response)); diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java index 13f91ab514f..c1c5431b023 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/DefaultHttpClientFilter.java @@ -128,6 +128,9 @@ public Response afterReceiveResponse(Invocation invocation, HttpServletResponseE Response response = extractResponse(invocation, responseEx); for (String headerName : responseEx.getHeaderNames()) { + if (headerName.equals(":status")) { + continue; + } Collection headerValues = responseEx.getHeaders(headerName); for (String headerValue : headerValues) { response.getHeaders().addHeader(headerName, headerValue); diff --git a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java index 8af3b3adf25..4218da3c6a9 100644 --- a/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java +++ b/transports/transport-rest/transport-rest-client/src/main/java/org/apache/servicecomb/transport/rest/client/http/RestClientInvocation.java @@ -146,6 +146,7 @@ private HttpMethod getMethod() { return HttpMethod.valueOf(method); } + @SuppressWarnings("deprecation") void createRequest(IpPort ipPort, String path) { URIEndpointObject endpoint = (URIEndpointObject) invocation.getEndpoint().getAddress(); RequestOptions requestOptions = new RequestOptions(); diff --git a/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java b/transports/transport-rest/transport-rest-client/src/test/java/io/vertx/core/http/impl/Http1xConnectionBaseEx.java similarity index 67% rename from foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java rename to transports/transport-rest/transport-rest-client/src/test/java/io/vertx/core/http/impl/Http1xConnectionBaseEx.java index a458f9bd6d8..524647f7e0e 100644 --- a/foundations/foundation-vertx/src/test/java/io/vertx/core/file/impl/AsyncFileUitls.java +++ b/transports/transport-rest/transport-rest-client/src/test/java/io/vertx/core/http/impl/Http1xConnectionBaseEx.java @@ -14,15 +14,18 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.vertx.core.file.impl; -import io.vertx.core.file.AsyncFile; -import io.vertx.core.file.OpenOptions; -import io.vertx.core.impl.ContextImpl; +package io.vertx.core.http.impl; + +import io.netty.channel.ChannelHandlerContext; +import io.vertx.core.impl.ContextInternal; import io.vertx.core.impl.VertxInternal; +import io.vertx.core.spi.metrics.NetworkMetrics; -public class AsyncFileUitls { - public static AsyncFile createAsyncFile(VertxInternal vertx, String path, OpenOptions options, ContextImpl context) { - return new AsyncFileImpl(vertx, path, options, context); +public abstract class Http1xConnectionBaseEx extends Http1xConnectionBase{ + public Http1xConnectionBaseEx(VertxInternal vertx, ChannelHandlerContext chctx, + ContextInternal context) { + super(vertx, chctx, context); } + } diff --git a/transports/transport-rest/transport-rest-client/src/test/java/io/vertx/core/http/impl/VertxImplTestUtils.java b/transports/transport-rest/transport-rest-client/src/test/java/io/vertx/core/http/impl/VertxImplTestUtils.java deleted file mode 100644 index 5fb03b151af..00000000000 --- a/transports/transport-rest/transport-rest-client/src/test/java/io/vertx/core/http/impl/VertxImplTestUtils.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.vertx.core.http.impl; - -import org.mockito.Mockito; - -public class VertxImplTestUtils { - public static Http1xConnectionBase mockClientConnection() { - return Mockito.mock(Http1xConnectionBase.class); - } -} diff --git a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java index 57a590c420d..d725ec31e26 100644 --- a/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java +++ b/transports/transport-rest/transport-rest-client/src/test/java/org/apache/servicecomb/transport/rest/client/http/TestRestClientInvocation.java @@ -65,14 +65,13 @@ import io.vertx.core.http.HttpClient; import io.vertx.core.http.HttpClientRequest; import io.vertx.core.http.HttpClientResponse; -import io.vertx.core.http.HttpConnection; import io.vertx.core.http.HttpMethod; import io.vertx.core.http.RequestOptions; -import io.vertx.core.http.impl.VertxImplTestUtils; +import io.vertx.core.http.impl.Http1xConnectionBaseEx; import io.vertx.core.net.NetSocket; import io.vertx.core.net.SocketAddress; -import io.vertx.core.net.impl.ConnectionBase; import mockit.Deencapsulation; +import mockit.Expectations; import mockit.Mock; import mockit.MockUp; import mockit.Mocked; @@ -120,6 +119,9 @@ public class TestRestClientInvocation { static long nanoTime = 123; + @Mocked + Http1xConnectionBaseEx connectionBase; + @BeforeClass public static void classSetup() { new MockUp() { @@ -130,7 +132,7 @@ long nanoTime() { }; } - @SuppressWarnings("unchecked") + @SuppressWarnings({"unchecked", "deprecation"}) @Before public void setup() { Deencapsulation.setField(restClientInvocation, "clientRequest", request); @@ -148,15 +150,6 @@ public void setup() { when(httpClient.request((HttpMethod) Mockito.any(), (RequestOptions) Mockito.any(), Mockito.any())) .thenReturn(request); - ConnectionBase connectionBase = VertxImplTestUtils.mockClientConnection(); - when(connectionBase.metric()).thenReturn(Mockito.mock(DefaultHttpSocketMetric.class)); - when(request.connection()).thenReturn((HttpConnection) connectionBase); - - DefaultHttpSocketMetric httpSocketMetric = new DefaultHttpSocketMetric(Mockito.mock(DefaultEndpointMetric.class)); - httpSocketMetric.requestBegin(); - httpSocketMetric.requestEnd(); - when(connectionBase.metric()).thenReturn(httpSocketMetric); - doAnswer(a -> { exceptionHandler = (Handler) a.getArguments()[0]; return request; @@ -318,6 +311,23 @@ public void processResponseBody() { when(invocation.getResponseExecutor()).thenReturn(new ReactiveExecutor()); + new Expectations() { + { + connectionBase.metric(); + result = Mockito.mock(DefaultHttpSocketMetric.class); + } + }; + + DefaultHttpSocketMetric httpSocketMetric = new DefaultHttpSocketMetric(Mockito.mock(DefaultEndpointMetric.class)); + httpSocketMetric.requestBegin(); + httpSocketMetric.requestEnd(); + new Expectations() { + { + connectionBase.metric(); + result = httpSocketMetric; + } + }; + when(request.connection()).thenReturn(connectionBase); restClientInvocation.processResponseBody(null); Assert.assertSame(resp, response); @@ -342,7 +352,23 @@ public void processResponseBody_throw() { } when(invocation.getResponseExecutor()).thenReturn(new ReactiveExecutor()); + new Expectations() { + { + connectionBase.metric(); + result = Mockito.mock(DefaultHttpSocketMetric.class); + } + }; + DefaultHttpSocketMetric httpSocketMetric = new DefaultHttpSocketMetric(Mockito.mock(DefaultEndpointMetric.class)); + httpSocketMetric.requestBegin(); + httpSocketMetric.requestEnd(); + new Expectations() { + { + connectionBase.metric(); + result = httpSocketMetric; + } + }; + when(request.connection()).thenReturn(connectionBase); restClientInvocation.processResponseBody(null); Assert.assertThat(((InvocationException) response.getResult()).getCause(), Matchers.instanceOf(Error.class)); diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestBodyHandler.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestBodyHandler.java index 7e8105fadf8..257d95d98b6 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestBodyHandler.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestBodyHandler.java @@ -60,18 +60,29 @@ public class RestBodyHandler implements BodyHandler { private static final String BODY_HANDLED = "__body-handled"; private long bodyLimit = DEFAULT_BODY_LIMIT; - + private boolean handleFileUploads; private String uploadsDir; private boolean mergeFormAttributes = DEFAULT_MERGE_FORM_ATTRIBUTES; private boolean deleteUploadedFilesOnEnd = DEFAULT_DELETE_UPLOADED_FILES_ON_END; + private boolean isPreallocateBodyBuffer = DEFAULT_PREALLOCATE_BODY_BUFFER; + private static final int DEFAULT_INITIAL_BODY_BUFFER_SIZE = 1024; //bytes public RestBodyHandler() { - setUploadsDirectory(DEFAULT_UPLOADS_DIRECTORY); + this(true, DEFAULT_UPLOADS_DIRECTORY); + } + + public RestBodyHandler(boolean handleFileUploads) { + this(handleFileUploads, DEFAULT_UPLOADS_DIRECTORY); } public RestBodyHandler(String uploadDirectory) { + this(true, uploadDirectory); + } + + private RestBodyHandler(boolean handleFileUploads, String uploadDirectory) { + this.handleFileUploads = handleFileUploads; setUploadsDirectory(uploadDirectory); } @@ -86,7 +97,8 @@ public void handle(RoutingContext context) { // we need to keep state since we can be called again on reroute Boolean handled = context.get(BODY_HANDLED); if (handled == null || !handled) { - BHandler handler = new BHandler(context); + long contentLength = isPreallocateBodyBuffer ? parseContentLengthHeader(request) : -1; + BHandler handler = new BHandler(context, contentLength); request.handler(handler); request.endHandler(v -> handler.end()); context.put(BODY_HANDLED, true); @@ -100,6 +112,12 @@ public void handle(RoutingContext context) { } } + @Override + public BodyHandler setHandleFileUploads(boolean handleFileUploads) { + this.handleFileUploads = handleFileUploads; + return this; + } + @Override public BodyHandler setBodyLimit(long bodyLimit) { this.bodyLimit = bodyLimit; @@ -124,11 +142,32 @@ public BodyHandler setDeleteUploadedFilesOnEnd(boolean deleteUploadedFilesOnEnd) return this; } + @Override + public BodyHandler setPreallocateBodyBuffer(boolean isPreallocateBodyBuffer) { + this.isPreallocateBodyBuffer = isPreallocateBodyBuffer; + return this; + } + + private long parseContentLengthHeader(HttpServerRequest request) { + String contentLength = request.getHeader(HttpHeaders.CONTENT_LENGTH); + if(contentLength == null || contentLength.isEmpty()) { + return -1; + } + try { + long parsedContentLength = Long.parseLong(contentLength); + return parsedContentLength < 0 ? null : parsedContentLength; + } + catch (NumberFormatException ex) { + return -1; + } + } + private class BHandler implements Handler { + private static final int MAX_PREALLOCATED_BODY_BUFFER_BYTES = 65535; private RoutingContext context; - private Buffer body = Buffer.buffer(); + private Buffer body; private boolean failed; @@ -144,7 +183,7 @@ private class BHandler implements Handler { private final boolean isUrlEncoded; - BHandler(RoutingContext context) { + BHandler(RoutingContext context, long contentLength) { this.context = context; Set fileUploads = context.fileUploads(); @@ -158,9 +197,13 @@ private class BHandler implements Handler { isUrlEncoded = lowerCaseContentType.startsWith(HttpHeaderValues.APPLICATION_X_WWW_FORM_URLENCODED.toString()); } + initBodyBuffer(contentLength); + if (isMultipart || isUrlEncoded) { - makeUploadDir(context.vertx().fileSystem()); context.request().setExpectMultipart(true); + if (handleFileUploads) { + makeUploadDir(context.vertx().fileSystem()); + } context.request().uploadHandler(upload -> { // *** cse begin *** if (uploadsDir == null) { @@ -180,17 +223,19 @@ private class BHandler implements Handler { return; } } - // we actually upload to a file with a generated filename - uploadCount.incrementAndGet(); - String uploadedFileName = new File(uploadsDir, UUID.randomUUID().toString()).getPath(); - upload.streamToFileSystem(uploadedFileName); - FileUploadImpl fileUpload = new FileUploadImpl(uploadedFileName, upload); - fileUploads.add(fileUpload); - upload.exceptionHandler(t -> { - deleteFileUploads(); - context.fail(t); - }); - upload.endHandler(v -> uploadEnded()); + if (handleFileUploads) { + // we actually upload to a file with a generated filename + uploadCount.incrementAndGet(); + String uploadedFileName = new File(uploadsDir, UUID.randomUUID().toString()).getPath(); + upload.streamToFileSystem(uploadedFileName); + FileUploadImpl fileUpload = new FileUploadImpl(uploadedFileName, upload); + fileUploads.add(fileUpload); + upload.exceptionHandler(t -> { + deleteFileUploads(); + context.fail(t); + }); + upload.endHandler(v -> uploadEnded()); + } }); } context.request().exceptionHandler(t -> { @@ -199,6 +244,25 @@ private class BHandler implements Handler { }); } + private void initBodyBuffer(long contentLength) { + int initialBodyBufferSize; + if(contentLength < 0) { + initialBodyBufferSize = DEFAULT_INITIAL_BODY_BUFFER_SIZE; + } + else if(contentLength > MAX_PREALLOCATED_BODY_BUFFER_BYTES) { + initialBodyBufferSize = MAX_PREALLOCATED_BODY_BUFFER_BYTES; + } + else { + initialBodyBufferSize = (int) contentLength; + } + + if(bodyLimit != -1) { + initialBodyBufferSize = (int)Math.min(initialBodyBufferSize, bodyLimit); + } + + this.body = Buffer.buffer(initialBodyBufferSize); + } + private void makeUploadDir(FileSystem fileSystem) { // *** cse begin *** if (uploadsDir == null) { @@ -271,7 +335,7 @@ void doEnd() { } private void deleteFileUploads() { - if (cleanup.compareAndSet(false, true)) { + if (cleanup.compareAndSet(false, true) && handleFileUploads) { for (FileUpload fileUpload : context.fileUploads()) { FileSystem fileSystem = context.vertx().fileSystem(); String uploadedFileName = fileUpload.uploadedFileName(); diff --git a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java index 3ac2ee7a21a..2e35b5a3ceb 100644 --- a/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java +++ b/transports/transport-rest/transport-rest-vertx/src/main/java/org/apache/servicecomb/transport/rest/vertx/RestServerVerticle.java @@ -92,7 +92,7 @@ public void start(Future startFuture) throws Exception { mountCorsHandler(mainRouter); initDispatcher(mainRouter); HttpServer httpServer = createHttpServer(); - httpServer.requestHandler(mainRouter::accept); + httpServer.requestHandler(mainRouter); httpServer.connectionHandler(connection -> { DefaultHttpServerMetrics serverMetrics = (DefaultHttpServerMetrics) ((ConnectionBase) connection).metrics(); DefaultServerEndpointMetric endpointMetric = serverMetrics.getEndpointMetric();