Sync Azure Vert.x HTTP client from azure-sdk project main
authorJames Netherton <jamesnetherton@gmail.com>
Thu, 6 Oct 2022 13:05:02 +0000 (14:05 +0100)
committerJames Netherton <jamesnetherton@users.noreply.github.com>
Fri, 7 Oct 2022 08:57:44 +0000 (09:57 +0100)
Fixes #4090

33 files changed:
extensions-support/azure-core-http-client-vertx/deployment/pom.xml
extensions-support/azure-core-http-client-vertx/deployment/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/AzureCoreHttpClientVertxProcessor.java
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/DeadlockTests.java
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/SimpleBasicAuthHttpProxyServer.java [new file with mode: 0644]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncClientTestHelper.java [new file with mode: 0644]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientAsyncHttpClientTests.java [moved from extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientHttpClientTests.java with 60% similarity]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientBuilderTests.java [new file with mode: 0644]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientProviderTests.java [new file with mode: 0644]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientResponseTransformer.java [moved from extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientResponseTransformer.java with 89% similarity]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientRestProxyTests.java [new file with mode: 0644]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientRestProxyWithAsyncHttpProxyTests.java [new file with mode: 0644]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientSingletonTests.java [new file with mode: 0644]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientTests.java [moved from extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientTests.java with 68% similarity]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientBuilderTests.java [deleted file]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientProviderTests.java [deleted file]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientRestProxyTests.java [deleted file]
extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientRestProxyWithHttpProxyTests.java [deleted file]
extensions-support/azure-core-http-client-vertx/runtime/pom.xml
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/QuarkusVertxProvider.java [moved from extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientProvider.java with 57% similarity]
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClient.java [new file with mode: 0644]
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientBuilder.java [new file with mode: 0644]
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientProvider.java [new file with mode: 0644]
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpAsyncResponse.java [deleted file]
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClient.java [deleted file]
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientBuilder.java [deleted file]
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpResponseHandler.java [deleted file]
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxProvider.java [moved from extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpRequest.java with 62% similarity]
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/implementation/BufferedVertxHttpResponse.java [moved from extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/BufferedVertxHttpResponse.java with 64% similarity]
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/implementation/VertxHttpAsyncResponse.java [new file with mode: 0644]
extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/implementation/VertxHttpResponseBase.java [moved from extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpResponse.java with 71% similarity]
extensions-support/azure-core-http-client-vertx/runtime/src/main/resources/META-INF/services/com.azure.core.http.HttpClientProvider
extensions-support/azure-core-http-client-vertx/runtime/src/main/resources/META-INF/services/org.apache.camel.quarkus.support.azure.core.http.vertx.VertxProvider [new file with mode: 0644]
extensions-support/azure-core/runtime/pom.xml

index 166f4c57191bebd29e94c33f8db6cc0d1495b598..d9b245b462a8526c41f5b5abdcc16d4009ceef1b 100644 (file)
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-inline</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
     <build>
index db0972b8d7f85fa22af8f553586b896d1a89564b..45af42f89896d92cd207c9371cbb0f88290e6f0d 100644 (file)
  */
 package org.apache.camel.quarkus.support.azure.core.http.vertx;
 
+import com.azure.core.http.HttpClientProvider;
 import io.netty.handler.ssl.OpenSsl;
 import io.quarkus.deployment.annotations.BuildProducer;
 import io.quarkus.deployment.annotations.BuildStep;
 import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
+import io.quarkus.deployment.builditem.nativeimage.ServiceProviderBuildItem;
 
 public class AzureCoreHttpClientVertxProcessor {
 
@@ -27,5 +29,13 @@ public class AzureCoreHttpClientVertxProcessor {
     void runtimeInitializedClasses(BuildProducer<RuntimeInitializedClassBuildItem> runtimeInitializedClasses) {
         runtimeInitializedClasses.produce(new RuntimeInitializedClassBuildItem(OpenSsl.class.getName()));
         runtimeInitializedClasses.produce(new RuntimeInitializedClassBuildItem("io.netty.internal.tcnative.SSL"));
+        runtimeInitializedClasses.produce(new RuntimeInitializedClassBuildItem(
+                "org.apache.camel.quarkus.support.azure.core.http.vertx.VertxAsyncHttpClientProvider$GlobalVertxHttpClient"));
+    }
+
+    @BuildStep
+    void registerServiceProviders(BuildProducer<ServiceProviderBuildItem> serviceProvider) {
+        serviceProvider.produce(ServiceProviderBuildItem.allProvidersFromClassPath(HttpClientProvider.class.getName()));
+        serviceProvider.produce(ServiceProviderBuildItem.allProvidersFromClassPath(VertxProvider.class.getName()));
     }
 }
index e66cf6154a1e713603e74d74c2a5b58198444209..14d0fc106018cffbe93ac3c50db031e292957caf 100644 (file)
@@ -23,32 +23,33 @@ import com.azure.core.http.HttpMethod;
 import com.azure.core.http.HttpRequest;
 import com.azure.core.util.FluxUtil;
 import com.github.tomakehurst.wiremock.WireMockServer;
-import com.github.tomakehurst.wiremock.client.WireMock;
 import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
 import io.quarkus.test.QuarkusUnitTest;
 import org.jboss.shrinkwrap.api.ShrinkWrap;
 import org.jboss.shrinkwrap.api.spec.JavaArchive;
 import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import reactor.core.publisher.Mono;
 import reactor.test.StepVerifier;
 
-@Disabled //https://github.com/apache/camel-quarkus/issues/4090
-public class DeadlockTests {
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
 
-    @RegisterExtension
-    static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
-            .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));
+public class DeadlockTests {
 
     private static final String GET_ENDPOINT = "/get";
 
     private WireMockServer server;
     private byte[] expectedGetBytes;
 
+    @RegisterExtension
+    static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
+            .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));
+
     @BeforeEach
     public void configureWireMockServer() {
         expectedGetBytes = new byte[10 * 1024 * 1024];
@@ -59,7 +60,7 @@ public class DeadlockTests {
                 .disableRequestJournal()
                 .gzipDisabled(true));
 
-        server.stubFor(WireMock.get(GET_ENDPOINT).willReturn(WireMock.aResponse().withBody(expectedGetBytes)));
+        server.stubFor(get(GET_ENDPOINT).willReturn(aResponse().withBody(expectedGetBytes)));
 
         server.start();
     }
@@ -73,7 +74,7 @@ public class DeadlockTests {
 
     @Test
     public void attemptToDeadlock() {
-        HttpClient httpClient = new VertxHttpClientProvider().createInstance();
+        HttpClient httpClient = new VertxAsyncHttpClientProvider().createInstance();
 
         String endpoint = server.baseUrl() + GET_ENDPOINT;
 
@@ -82,8 +83,8 @@ public class DeadlockTests {
                     .flatMap(response -> FluxUtil.collectBytesInByteBufferStream(response.getBody())
                             .zipWith(Mono.just(response.getStatusCode()))))
                     .assertNext(responseTuple -> {
-                        Assertions.assertEquals(200, responseTuple.getT2());
-                        Assertions.assertArrayEquals(expectedGetBytes, responseTuple.getT1());
+                        assertEquals(200, responseTuple.getT2());
+                        assertArrayEquals(expectedGetBytes, responseTuple.getT1());
                     })
                     .verifyComplete();
         }
diff --git a/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/SimpleBasicAuthHttpProxyServer.java b/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/SimpleBasicAuthHttpProxyServer.java
new file mode 100644 (file)
index 0000000..3e1cfb9
--- /dev/null
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.support.azure.core.http.vertx;
+
+import java.util.Base64;
+
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.common.FileSource;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import com.github.tomakehurst.wiremock.extension.Parameters;
+import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
+import com.github.tomakehurst.wiremock.http.HttpHeader;
+import com.github.tomakehurst.wiremock.http.HttpHeaders;
+import com.github.tomakehurst.wiremock.http.Request;
+import com.github.tomakehurst.wiremock.http.Response;
+
+/**
+ * A simple Http proxy server that enforce basic proxy authentication, once authenticated
+ * any request matching {@code serviceEndpoints} will be responded with an empty Http 200.
+ */
+final class SimpleBasicAuthHttpProxyServer {
+    private final String userName;
+    private final String password;
+    private final String[] serviceEndpoints;
+    private WireMockServer proxyService;
+
+    /**
+     * Creates SimpleBasicAuthHttpProxyServer.
+     *
+     * @param userName         the proxy user name for basic authentication
+     * @param password         the proxy password for basic authentication
+     * @param serviceEndpoints the whitelisted mock endpoints targeting the service behind proxy
+     */
+    SimpleBasicAuthHttpProxyServer(String userName, String password, String[] serviceEndpoints) {
+        this.userName = userName;
+        this.password = password;
+        this.serviceEndpoints = serviceEndpoints;
+    }
+
+    public ProxyEndpoint start() {
+        this.proxyService = new WireMockServer(WireMockConfiguration
+                .options()
+                .dynamicPort()
+                .extensions(new ResponseTransformer() {
+                    @Override
+                    public Response transform(Request request,
+                            Response response,
+                            FileSource fileSource,
+                            Parameters parameters) {
+                        String proxyAuthorization = request.getHeader("Proxy-Authorization");
+                        if (proxyAuthorization == null) {
+                            HttpHeader proxyAuthenticateHeader = new HttpHeader("Proxy-Authenticate", "Basic");
+                            return new Response.Builder()
+                                    .status(407)
+                                    .headers(new HttpHeaders(proxyAuthenticateHeader))
+                                    .build();
+                        } else {
+                            if (!proxyAuthorization.startsWith("Basic")) {
+                                return new Response.Builder()
+                                        .status(401)
+                                        .build();
+                            }
+                            String encodedCred = proxyAuthorization.substring("Basic".length());
+                            encodedCred = encodedCred.trim();
+                            final Base64.Decoder decoder = Base64.getDecoder();
+                            final byte[] decodedCred = decoder.decode(encodedCred);
+                            if (new String(decodedCred).equals(userName + ":" + password)) {
+                                return new Response.Builder()
+                                        .status(200)
+                                        .build();
+                            } else {
+                                return new Response.Builder()
+                                        .status(401)
+                                        .build();
+                            }
+                        }
+                    }
+
+                    @Override
+                    public String getName() {
+                        return "ProxyServer";
+                    }
+                })
+                .disableRequestJournal());
+        for (String endpoint : this.serviceEndpoints) {
+            proxyService.stubFor(WireMock.any(WireMock.urlEqualTo(endpoint))
+                    .willReturn(WireMock.aResponse()));
+        }
+        this.proxyService.start();
+        return new ProxyEndpoint("localhost", this.proxyService.port());
+    }
+
+    public void shutdown() {
+        if (this.proxyService != null && this.proxyService.isRunning()) {
+            this.proxyService.shutdown();
+
+        }
+    }
+
+    static class ProxyEndpoint {
+        private final String host;
+        private final int port;
+
+        ProxyEndpoint(String host, int port) {
+            this.host = host;
+            this.port = port;
+        }
+
+        String getHost() {
+            return this.host;
+        }
+
+        int getPort() {
+            return this.port;
+        }
+    }
+}
diff --git a/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncClientTestHelper.java b/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncClientTestHelper.java
new file mode 100644 (file)
index 0000000..c370851
--- /dev/null
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.support.azure.core.http.vertx;
+
+import java.lang.reflect.Field;
+import java.util.function.Predicate;
+
+import io.vertx.core.http.impl.HttpClientImpl;
+import io.vertx.core.net.SocketAddress;
+
+/**
+ * Utility class to reflectively retrieve configuration settings from the Vert.x HTTP Client that are
+ * not exposed by default.
+ *
+ * Avoids having to implement workarounds in the client code to make them available just for testing purposes.
+ */
+final class VertxAsyncClientTestHelper {
+
+    private VertxAsyncClientTestHelper() {
+        // Utility class
+    }
+
+    @SuppressWarnings("unchecked")
+    static Predicate<SocketAddress> getVertxInternalProxyFilter(HttpClientImpl client) {
+        try {
+            Field field = HttpClientImpl.class.getDeclaredField("proxyFilter");
+            field.setAccessible(true);
+            return (Predicate<SocketAddress>) field.get(client);
+        } catch (NoSuchFieldException | IllegalAccessException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
 package org.apache.camel.quarkus.support.azure.core.http.vertx;
 
 import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import com.azure.core.http.HttpClient;
 import com.azure.core.test.HttpClientTestsWireMockServer;
 import com.azure.core.test.http.HttpClientTests;
 import com.github.tomakehurst.wiremock.WireMockServer;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
 import io.vertx.core.Vertx;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 
-@Disabled //https://github.com/apache/camel-quarkus/issues/4090
-public class VertxHttpClientHttpClientTests extends HttpClientTests {
-    private static final WireMockServer server = HttpClientTestsWireMockServer.getHttpClientTestsServer();
-    private static final Vertx vertx = Vertx.vertx();
+public class VertxAsyncHttpClientAsyncHttpClientTests extends HttpClientTests {
+    private static WireMockServer server;
+    private static Vertx vertx;
 
     @BeforeAll
-    public static void getWireMockServer() {
+    public static void beforeAll() {
+        server = HttpClientTestsWireMockServer.getHttpClientTestsServer();
         server.start();
+        vertx = Vertx.vertx();
     }
 
     @AfterAll
-    public static void afterAll() throws InterruptedException {
-        server.shutdown();
-        CountDownLatch latch = new CountDownLatch(1);
-        vertx.close(x -> latch.countDown());
-        latch.await();
+    public static void afterAll() throws Exception {
+        if (server != null) {
+            server.shutdown();
+        }
+
+        if (vertx != null) {
+            CountDownLatch latch = new CountDownLatch(1);
+            vertx.close(new Handler<AsyncResult<Void>>() {
+                @Override
+                public void handle(AsyncResult<Void> event) {
+                    latch.countDown();
+                }
+            });
+            latch.await(5, TimeUnit.SECONDS);
+        }
     }
 
     @Override
@@ -52,6 +65,6 @@ public class VertxHttpClientHttpClientTests extends HttpClientTests {
 
     @Override
     protected HttpClient createHttpClient() {
-        return new VertxHttpClientBuilder(vertx).build();
+        return new VertxAsyncHttpClientBuilder().vertx(vertx).build();
     }
 }
diff --git a/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientBuilderTests.java b/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientBuilderTests.java
new file mode 100644 (file)
index 0000000..0e43054
--- /dev/null
@@ -0,0 +1,464 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.support.azure.core.http.vertx;
+
+import java.net.ConnectException;
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.HttpMethod;
+import com.azure.core.http.HttpRequest;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.test.utils.TestConfigurationSource;
+import com.azure.core.util.Configuration;
+import com.azure.core.util.ConfigurationBuilder;
+import com.azure.core.util.ConfigurationSource;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import com.github.tomakehurst.wiremock.client.WireMock;
+import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
+import io.vertx.core.AsyncResult;
+import io.vertx.core.Handler;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpClientOptions;
+import io.vertx.core.http.impl.HttpClientImpl;
+import io.vertx.core.net.SocketAddress;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import reactor.test.StepVerifier;
+
+import static io.vertx.core.net.SocketAddress.inetSocketAddress;
+import static org.apache.camel.quarkus.support.azure.core.http.vertx.VertxAsyncClientTestHelper.getVertxInternalProxyFilter;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests {@link VertxAsyncHttpClientBuilder}.
+ */
+public class VertxAsyncHttpClientBuilderTests {
+    private static final String PROXY_USERNAME = "foo";
+    private static final String PROXY_PASSWORD = "bar";
+    private static final String PROXY_USER_INFO = PROXY_USERNAME + ":" + PROXY_PASSWORD + "@";
+    private static final String SERVICE_ENDPOINT = "/default";
+    private static final ConfigurationSource EMPTY_SOURCE = new TestConfigurationSource();
+    private static Vertx vertx;
+
+    @BeforeAll
+    public static void beforeAll() {
+        vertx = Vertx.vertx();
+    }
+
+    @AfterAll
+    public static void afterAll() {
+        if (vertx != null) {
+            CountDownLatch latch = new CountDownLatch(1);
+            vertx.close(new Handler<AsyncResult<Void>>() {
+                @Override
+                public void handle(AsyncResult<Void> event) {
+                    latch.countDown();
+                }
+            });
+            try {
+                latch.await(5, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+        }
+    }
+
+    @Test
+    public void buildWithConfigurationNone() {
+        HttpClient httpClient = new VertxAsyncHttpClientBuilder()
+                .configuration(Configuration.NONE)
+                .vertx(vertx)
+                .build();
+
+        String defaultPath = "/default";
+        WireMockServer server = new WireMockServer(WireMockConfiguration.options().dynamicPort().disableRequestJournal());
+        server.stubFor(WireMock.get(defaultPath).willReturn(WireMock.aResponse().withStatus(200)));
+        server.start();
+        String defaultUrl = "http://localhost:" + server.port() + defaultPath;
+        try {
+            StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, defaultUrl)))
+                    .assertNext(response -> assertEquals(200, response.getStatusCode()))
+                    .verifyComplete();
+        } finally {
+            if (server.isRunning()) {
+                server.shutdown();
+            }
+        }
+    }
+
+    @Test
+    public void buildWithDefaultConnectionOptions() {
+        VertxAsyncHttpClientBuilder builder = new VertxAsyncHttpClientBuilder().vertx(vertx);
+        HttpClient httpClient = builder.build();
+
+        io.vertx.core.http.HttpClient client = ((VertxAsyncHttpClient) httpClient).client;
+        io.vertx.core.http.HttpClientOptions options = ((HttpClientImpl) client).options();
+
+        String defaultPath = "/default";
+        WireMockServer server = new WireMockServer(WireMockConfiguration.options().dynamicPort().disableRequestJournal());
+        server.stubFor(WireMock.get(defaultPath).willReturn(WireMock.aResponse().withStatus(200)));
+        server.start();
+        String defaultUrl = "http://localhost:" + server.port() + defaultPath;
+        try {
+            StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, defaultUrl)))
+                    .assertNext(response -> assertEquals(200, response.getStatusCode()))
+                    .verifyComplete();
+
+            assertEquals(10000, options.getConnectTimeout());
+            assertEquals(60, options.getIdleTimeout());
+            assertEquals(60, options.getReadIdleTimeout());
+            assertEquals(60, options.getWriteIdleTimeout());
+        } finally {
+            if (server.isRunning()) {
+                server.shutdown();
+            }
+        }
+    }
+
+    @Test
+    public void buildWithConnectionOptions() {
+        VertxAsyncHttpClientBuilder builder = new VertxAsyncHttpClientBuilder().vertx(vertx);
+        VertxAsyncHttpClient httpClient = (VertxAsyncHttpClient) builder.connectTimeout(Duration.ofSeconds(10))
+                .idleTimeout(Duration.ofSeconds(20))
+                .readIdleTimeout(Duration.ofSeconds(30))
+                .writeIdleTimeout(Duration.ofSeconds(40))
+                .build();
+
+        io.vertx.core.http.HttpClientOptions options = ((HttpClientImpl) httpClient.client).options();
+
+        String defaultPath = "/default";
+        WireMockServer server = new WireMockServer(WireMockConfiguration.options().dynamicPort().disableRequestJournal());
+        server.stubFor(WireMock.get(defaultPath).willReturn(WireMock.aResponse().withStatus(200)));
+        server.start();
+        String defaultUrl = "http://localhost:" + server.port() + defaultPath;
+        try {
+            StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, defaultUrl)))
+                    .assertNext(response -> assertEquals(200, response.getStatusCode()))
+                    .verifyComplete();
+
+            assertEquals(10000, options.getConnectTimeout());
+            assertEquals(20, options.getIdleTimeout());
+            assertEquals(30, options.getReadIdleTimeout());
+            assertEquals(40, options.getWriteIdleTimeout());
+        } finally {
+            if (server.isRunning()) {
+                server.shutdown();
+            }
+        }
+    }
+
+    @Test
+    public void buildWithAllProxyTypes() throws Exception {
+        for (ProxyOptions.Type type : ProxyOptions.Type.values()) {
+            if (type.equals(ProxyOptions.Type.SOCKS5)) {
+                return;
+            }
+
+            String proxyUser = "user";
+            String proxyPassword = "secret";
+
+            InetSocketAddress address = new InetSocketAddress("localhost", 8888);
+            ProxyOptions proxyOptions = new ProxyOptions(type, address);
+            proxyOptions.setCredentials("user", "secret");
+            proxyOptions.setNonProxyHosts("foo.*|*bar.com|microsoft.com");
+
+            VertxAsyncHttpClient httpClient = (VertxAsyncHttpClient) new VertxAsyncHttpClientBuilder()
+                    .proxy(proxyOptions)
+                    .vertx(vertx)
+                    .build();
+
+            HttpClientImpl vertxHttpClientImpl = (HttpClientImpl) httpClient.client;
+            io.vertx.core.http.HttpClientOptions options = vertxHttpClientImpl.options();
+
+            io.vertx.core.net.ProxyOptions vertxProxyOptions = options.getProxyOptions();
+            assertNotNull(vertxProxyOptions);
+            assertEquals(address.getHostName(), vertxProxyOptions.getHost());
+            assertEquals(address.getPort(), vertxProxyOptions.getPort());
+            assertEquals(type.name(), vertxProxyOptions.getType().name());
+            assertEquals(proxyUser, vertxProxyOptions.getUsername());
+            assertEquals(proxyPassword, vertxProxyOptions.getPassword());
+
+            Predicate<SocketAddress> proxyFilter = getVertxInternalProxyFilter(vertxHttpClientImpl);
+            assertFalse(proxyFilter.test(inetSocketAddress(80, "foo.com")));
+            assertFalse(proxyFilter.test(inetSocketAddress(80, "foo.bar.com")));
+            assertFalse(proxyFilter.test(inetSocketAddress(80, "test.bar.com")));
+            assertFalse(proxyFilter.test(inetSocketAddress(80, "microsoft.com")));
+            assertTrue(proxyFilter.test(inetSocketAddress(80, "allowed.host.com")));
+        }
+
+    }
+
+    @Test
+    public void buildWithHttpProxy() {
+        SimpleBasicAuthHttpProxyServer proxyServer = new SimpleBasicAuthHttpProxyServer(PROXY_USERNAME,
+                PROXY_PASSWORD,
+                new String[] { SERVICE_ENDPOINT });
+
+        try {
+            SimpleBasicAuthHttpProxyServer.ProxyEndpoint proxyEndpoint = proxyServer.start();
+
+            ProxyOptions clientProxyOptions = new ProxyOptions(ProxyOptions.Type.HTTP,
+                    new InetSocketAddress(proxyEndpoint.getHost(), proxyEndpoint.getPort()))
+                            .setCredentials(PROXY_USERNAME, PROXY_PASSWORD);
+
+            HttpClient httpClient = new VertxAsyncHttpClientBuilder()
+                    .proxy(clientProxyOptions)
+                    .vertx(vertx)
+                    .build();
+
+            final String serviceUrl = "http://localhost:80" + SERVICE_ENDPOINT;
+            StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, serviceUrl)))
+                    .expectNextCount(1)
+                    .verifyComplete();
+        } finally {
+            proxyServer.shutdown();
+        }
+    }
+
+    @Test
+    public void buildWithHttpProxyFromEnvConfiguration() {
+        SimpleBasicAuthHttpProxyServer proxyServer = new SimpleBasicAuthHttpProxyServer(PROXY_USERNAME,
+                PROXY_PASSWORD,
+                new String[] { SERVICE_ENDPOINT });
+
+        try {
+            SimpleBasicAuthHttpProxyServer.ProxyEndpoint proxyEndpoint = proxyServer.start();
+
+            Configuration configuration = new ConfigurationBuilder(EMPTY_SOURCE, EMPTY_SOURCE,
+                    new TestConfigurationSource()
+                            .put(Configuration.PROPERTY_HTTP_PROXY,
+                                    "http://" + PROXY_USER_INFO + proxyEndpoint.getHost() + ":" + proxyEndpoint.getPort())
+                            .put("java.net.useSystemProxies", "true"))
+                                    .build();
+
+            HttpClient httpClient = new VertxAsyncHttpClientBuilder()
+                    .configuration(configuration)
+                    .vertx(vertx)
+                    .build();
+
+            final String serviceUrl = "http://localhost:80" + SERVICE_ENDPOINT;
+            StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, serviceUrl)))
+                    .expectNextCount(1)
+                    .verifyComplete();
+        } finally {
+            proxyServer.shutdown();
+        }
+    }
+
+    @Test
+    public void buildWithHttpProxyFromExplicitConfiguration() {
+        SimpleBasicAuthHttpProxyServer proxyServer = new SimpleBasicAuthHttpProxyServer(PROXY_USERNAME,
+                PROXY_PASSWORD,
+                new String[] { SERVICE_ENDPOINT });
+
+        try {
+            SimpleBasicAuthHttpProxyServer.ProxyEndpoint proxyEndpoint = proxyServer.start();
+
+            Configuration configuration = new ConfigurationBuilder()
+                    .putProperty("http.proxy.hostname", proxyEndpoint.getHost())
+                    .putProperty("http.proxy.port", String.valueOf(proxyEndpoint.getPort()))
+                    .build();
+
+            HttpClient httpClient = new VertxAsyncHttpClientBuilder()
+                    .configuration(configuration)
+                    .vertx(vertx)
+                    .build();
+
+            final String serviceUrl = "http://localhost:80" + SERVICE_ENDPOINT;
+            StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, serviceUrl)))
+                    .expectNextCount(1)
+                    .verifyComplete();
+        } finally {
+            proxyServer.shutdown();
+        }
+    }
+
+    @Test
+    public void buildWithCustomVertx() throws Exception {
+        Vertx vertx = Vertx.vertx();
+
+        HttpClient httpClient = new VertxAsyncHttpClientBuilder()
+                .configuration(Configuration.NONE)
+                .vertx(vertx)
+                .build();
+
+        String defaultPath = "/default";
+        WireMockServer server = new WireMockServer(WireMockConfiguration.options().dynamicPort().disableRequestJournal());
+        server.stubFor(WireMock.get(defaultPath).willReturn(WireMock.aResponse().withStatus(200)));
+        server.start();
+        String defaultUrl = "http://localhost:" + server.port() + defaultPath;
+        try {
+            StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, defaultUrl)))
+                    .assertNext(response -> assertEquals(200, response.getStatusCode()))
+                    .verifyComplete();
+        } finally {
+            if (server.isRunning()) {
+                server.shutdown();
+            }
+
+            CountDownLatch latch = new CountDownLatch(1);
+            vertx.close(event -> latch.countDown());
+            assertTrue(latch.await(5, TimeUnit.SECONDS));
+        }
+    }
+
+    @Test
+    public void buildWithCustomHttpClientOptions() {
+        HttpClientOptions options = new HttpClientOptions();
+        options.setConnectTimeout(30000);
+        options.setIdleTimeout(50);
+        options.setReadIdleTimeout(60);
+        options.setWriteIdleTimeout(70);
+
+        HttpClient httpClient = new VertxAsyncHttpClientBuilder()
+                .connectTimeout(Duration.ofSeconds(10))
+                .idleTimeout(Duration.ofSeconds(20))
+                .readIdleTimeout(Duration.ofSeconds(30))
+                .writeIdleTimeout(Duration.ofSeconds(40))
+                .httpClientOptions(options)
+                .vertx(vertx)
+                .build();
+
+        // Verify the original configuration was preserved and not overwritten
+        assertEquals(30000, options.getConnectTimeout());
+        assertEquals(50, options.getIdleTimeout());
+        assertEquals(60, options.getReadIdleTimeout());
+        assertEquals(70, options.getWriteIdleTimeout());
+
+        String defaultPath = "/default";
+        WireMockServer server = new WireMockServer(WireMockConfiguration.options().dynamicPort().disableRequestJournal());
+        server.stubFor(WireMock.get(defaultPath).willReturn(WireMock.aResponse().withStatus(200)));
+        server.start();
+        String defaultUrl = "http://localhost:" + server.port() + defaultPath;
+        try {
+            StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, defaultUrl)))
+                    .assertNext(response -> assertEquals(200, response.getStatusCode()))
+                    .verifyComplete();
+        } finally {
+            if (server.isRunning()) {
+                server.shutdown();
+            }
+        }
+    }
+
+    @Test
+    public void buildWithNullProxyAddress() {
+        SimpleBasicAuthHttpProxyServer proxyServer = new SimpleBasicAuthHttpProxyServer(PROXY_USERNAME,
+                PROXY_PASSWORD,
+                new String[] { SERVICE_ENDPOINT });
+
+        try {
+            proxyServer.start();
+
+            ProxyOptions mockPoxyOptions = Mockito.mock(ProxyOptions.class);
+            Mockito.when(mockPoxyOptions.getType()).thenReturn(ProxyOptions.Type.HTTP);
+            Mockito.when(mockPoxyOptions.getAddress()).thenReturn(null);
+
+            HttpClient httpClient = new VertxAsyncHttpClientBuilder()
+                    .proxy(mockPoxyOptions)
+                    .vertx(vertx)
+                    .build();
+
+            final String serviceUrl = "http://localhost:80" + SERVICE_ENDPOINT;
+            StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, serviceUrl)))
+                    .verifyError(ConnectException.class);
+        } finally {
+            proxyServer.shutdown();
+        }
+    }
+
+    @Test
+    public void buildWithInvalidProxyType() {
+        ProxyOptions.Type mockProxyType = Mockito.mock(ProxyOptions.Type.class);
+        Mockito.when(mockProxyType.name()).thenReturn("INVALID");
+
+        ProxyOptions clientProxyOptions = new ProxyOptions(mockProxyType,
+                new InetSocketAddress("test.com", 8080));
+
+        assertThrows(IllegalArgumentException.class, () -> {
+            new VertxAsyncHttpClientBuilder()
+                    .proxy(clientProxyOptions)
+                    .vertx(vertx)
+                    .build();
+        });
+    }
+
+    @Test
+    public void buildWithNullProxyType() {
+        SimpleBasicAuthHttpProxyServer proxyServer = new SimpleBasicAuthHttpProxyServer(PROXY_USERNAME,
+                PROXY_PASSWORD,
+                new String[] { SERVICE_ENDPOINT });
+
+        try {
+            SimpleBasicAuthHttpProxyServer.ProxyEndpoint proxyEndpoint = proxyServer.start();
+
+            ProxyOptions mockPoxyOptions = Mockito.mock(ProxyOptions.class);
+            Mockito.when(mockPoxyOptions.getType()).thenReturn(null);
+            Mockito.when(mockPoxyOptions.getAddress())
+                    .thenReturn(new InetSocketAddress(proxyEndpoint.getHost(), proxyEndpoint.getPort()));
+
+            HttpClient httpClient = new VertxAsyncHttpClientBuilder()
+                    .proxy(mockPoxyOptions)
+                    .vertx(vertx)
+                    .build();
+
+            final String serviceUrl = "http://localhost:80" + SERVICE_ENDPOINT;
+            StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, serviceUrl)))
+                    .expectNextCount(1)
+                    .verifyComplete();
+        } finally {
+            proxyServer.shutdown();
+        }
+    }
+
+    @Test
+    public void buildWithoutProxyAuthentication() {
+        SimpleBasicAuthHttpProxyServer proxyServer = new SimpleBasicAuthHttpProxyServer(PROXY_USERNAME,
+                PROXY_PASSWORD,
+                new String[] { SERVICE_ENDPOINT });
+
+        try {
+            SimpleBasicAuthHttpProxyServer.ProxyEndpoint proxyEndpoint = proxyServer.start();
+
+            ProxyOptions clientProxyOptions = new ProxyOptions(ProxyOptions.Type.HTTP,
+                    new InetSocketAddress(proxyEndpoint.getHost(), proxyEndpoint.getPort()));
+
+            HttpClient httpClient = new VertxAsyncHttpClientBuilder()
+                    .proxy(clientProxyOptions)
+                    .vertx(vertx)
+                    .build();
+
+            final String serviceUrl = "http://localhost:80" + SERVICE_ENDPOINT;
+            StepVerifier.create(httpClient.send(new HttpRequest(HttpMethod.GET, serviceUrl)))
+                    .expectNextCount(1)
+                    .verifyComplete();
+        } finally {
+            proxyServer.shutdown();
+        }
+    }
+}
diff --git a/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientProviderTests.java b/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientProviderTests.java
new file mode 100644 (file)
index 0000000..ccd22bc
--- /dev/null
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.support.azure.core.http.vertx;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.ServiceLoader;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Predicate;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.util.Configuration;
+import com.azure.core.util.HttpClientOptions;
+import io.quarkus.test.QuarkusUnitTest;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.impl.HttpClientImpl;
+import io.vertx.core.net.SocketAddress;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+import static io.vertx.core.net.SocketAddress.inetSocketAddress;
+import static org.apache.camel.quarkus.support.azure.core.http.vertx.VertxAsyncClientTestHelper.getVertxInternalProxyFilter;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.mockStatic;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Tests {@link VertxAsyncHttpClientProvider}.
+ */
+public class VertxAsyncHttpClientProviderTests {
+
+    @RegisterExtension
+    static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
+            .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
+                    .addAsServiceProvider(VertxProvider.class, QuarkusVertxProvider.class)
+                    .addPackage(VertxAsyncHttpClient.class.getPackage()));
+
+    @Test
+    public void nullOptionsReturnsBaseClient() {
+        VertxAsyncHttpClient httpClient = (VertxAsyncHttpClient) new VertxAsyncHttpClientProvider()
+                .createInstance(null);
+
+        ProxyOptions environmentProxy = ProxyOptions.fromConfiguration(Configuration.getGlobalConfiguration());
+        io.vertx.core.http.HttpClientOptions options = ((HttpClientImpl) httpClient.client).options();
+        io.vertx.core.net.ProxyOptions proxyOptions = options.getProxyOptions();
+        if (environmentProxy == null) {
+            assertNull(proxyOptions);
+        } else {
+            assertNotNull(proxyOptions);
+            assertEquals(environmentProxy.getAddress().getHostName(), proxyOptions.getHost());
+        }
+    }
+
+    @Test
+    public void defaultOptionsReturnsBaseClient() {
+        VertxAsyncHttpClient httpClient = (VertxAsyncHttpClient) new VertxAsyncHttpClientProvider()
+                .createInstance(new HttpClientOptions());
+
+        ProxyOptions environmentProxy = ProxyOptions.fromConfiguration(Configuration.getGlobalConfiguration());
+        io.vertx.core.http.HttpClientOptions options = ((HttpClientImpl) httpClient.client).options();
+        io.vertx.core.net.ProxyOptions proxyOptions = options.getProxyOptions();
+        if (environmentProxy == null) {
+            assertNull(proxyOptions);
+        } else {
+            assertNotNull(proxyOptions);
+            assertEquals(environmentProxy.getAddress().getHostName(), proxyOptions.getHost());
+        }
+    }
+
+    @Test
+    public void optionsWithAProxy() {
+        ProxyOptions proxyOptions = new ProxyOptions(ProxyOptions.Type.HTTP, new InetSocketAddress("localhost", 8888));
+        proxyOptions.setNonProxyHosts("foo.*|bar.*|cheese.com|wine.org");
+
+        HttpClientOptions clientOptions = new HttpClientOptions().setProxyOptions(proxyOptions);
+
+        VertxAsyncHttpClient httpClient = (VertxAsyncHttpClient) new VertxAsyncHttpClientProvider()
+                .createInstance(clientOptions);
+
+        io.vertx.core.http.HttpClientOptions options = ((HttpClientImpl) httpClient.client).options();
+
+        io.vertx.core.net.ProxyOptions vertxProxyOptions = options.getProxyOptions();
+        assertNotNull(vertxProxyOptions);
+        assertEquals(proxyOptions.getAddress().getHostName(), vertxProxyOptions.getHost());
+        assertEquals(proxyOptions.getAddress().getPort(), vertxProxyOptions.getPort());
+        assertEquals(proxyOptions.getType().name(), vertxProxyOptions.getType().name());
+
+        Predicate<SocketAddress> proxyFilter = getVertxInternalProxyFilter((HttpClientImpl) httpClient.client);
+        assertFalse(proxyFilter.test(inetSocketAddress(80, "foo.com")));
+        assertFalse(proxyFilter.test(inetSocketAddress(80, "foo.bar.com")));
+        assertFalse(proxyFilter.test(inetSocketAddress(80, "bar.com")));
+        assertFalse(proxyFilter.test(inetSocketAddress(80, "cheese.com")));
+        assertFalse(proxyFilter.test(inetSocketAddress(80, "wine.org")));
+        assertTrue(proxyFilter.test(inetSocketAddress(80, "allowed.host.com")));
+    }
+
+    @Test
+    public void optionsWithTimeouts() {
+        Duration timeout = Duration.ofMillis(15000);
+        HttpClientOptions clientOptions = new HttpClientOptions()
+                .setConnectTimeout(timeout)
+                .setConnectionIdleTimeout(timeout)
+                .setReadTimeout(timeout)
+                .setWriteTimeout(timeout);
+
+        HttpClient httpClient = new VertxAsyncHttpClientProvider().createInstance(clientOptions);
+        VertxAsyncHttpClient cast = VertxAsyncHttpClient.class.cast(httpClient);
+
+        io.vertx.core.http.HttpClientOptions options = ((HttpClientImpl) cast.client).options();
+
+        assertEquals(timeout.toMillis(), options.getConnectTimeout());
+        assertEquals(timeout.getSeconds(), options.getIdleTimeout());
+        assertEquals(timeout.getSeconds(), options.getReadIdleTimeout());
+        assertEquals(timeout.getSeconds(), options.getWriteIdleTimeout());
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void vertxProvider() throws Exception {
+        Vertx vertx = Vertx.vertx();
+
+        ServiceLoader mockServiceLoader = mock(ServiceLoader.class);
+        VertxProvider mockVertxProvider = mock(VertxProvider.class);
+
+        try (MockedStatic<ServiceLoader> serviceLoader = mockStatic(ServiceLoader.class)) {
+            Set<VertxProvider> providers = new HashSet<>();
+            providers.add(mockVertxProvider);
+
+            Class<?> providerClass = VertxProvider.class;
+            serviceLoader.when(() -> ServiceLoader.load(providerClass, providerClass.getClassLoader()))
+                    .thenReturn(mockServiceLoader);
+
+            Mockito.when(mockServiceLoader.iterator()).thenReturn(providers.iterator());
+            Mockito.when(mockVertxProvider.createVertx()).thenReturn(vertx);
+
+            HttpClient httpClient = new VertxAsyncHttpClientProvider().createInstance();
+            assertNotNull(httpClient);
+
+            verify(mockServiceLoader, times(1)).iterator();
+            verify(mockVertxProvider, times(1)).createVertx();
+        } finally {
+            CountDownLatch latch = new CountDownLatch(1);
+            vertx.close(event -> latch.countDown());
+            latch.await(5, TimeUnit.SECONDS);
+        }
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Test
+    public void multipleVertxProviders() throws Exception {
+        Vertx vertx = Vertx.vertx();
+
+        ServiceLoader mockServiceLoader = mock(ServiceLoader.class);
+        VertxProvider mockVertxProviderA = mock(VertxProvider.class);
+        VertxProvider mockVertxProviderB = mock(VertxProvider.class);
+
+        try (MockedStatic<ServiceLoader> serviceLoader = mockStatic(ServiceLoader.class)) {
+            Set<VertxProvider> providers = new LinkedHashSet<>();
+            providers.add(mockVertxProviderA);
+            providers.add(mockVertxProviderB);
+
+            Class<?> providerClass = VertxProvider.class;
+            serviceLoader.when(() -> ServiceLoader.load(providerClass, providerClass.getClassLoader()))
+                    .thenReturn(mockServiceLoader);
+
+            Mockito.when(mockServiceLoader.iterator()).thenReturn(providers.iterator());
+            Mockito.when(mockVertxProviderA.createVertx()).thenReturn(vertx);
+
+            HttpClient httpClient = new VertxAsyncHttpClientProvider().createInstance();
+            assertNotNull(httpClient);
+
+            verify(mockServiceLoader, times(1)).iterator();
+            verify(mockVertxProviderA, times(1)).createVertx();
+
+            // Only the first provider should have been invoked
+            verify(mockVertxProviderB, never()).createVertx();
+        } finally {
+            CountDownLatch latch = new CountDownLatch(1);
+            vertx.close(event -> latch.countDown());
+            latch.await(5, TimeUnit.SECONDS);
+        }
+    }
+}
@@ -22,12 +22,12 @@ import com.github.tomakehurst.wiremock.extension.ResponseTransformer;
 import com.github.tomakehurst.wiremock.http.Request;
 import com.github.tomakehurst.wiremock.http.Response;
 
-import static org.apache.camel.quarkus.support.azure.core.http.vertx.VertxHttpClientTests.RETURN_HEADERS_AS_IS_PATH;
+import static org.apache.camel.quarkus.support.azure.core.http.vertx.VertxAsyncHttpClientTests.RETURN_HEADERS_AS_IS_PATH;
 
 /**
- * Mock response transformer used to test {@link VertxHttpClient}.
+ * Mock response transformer used to test {@link VertxAsyncHttpClient}.
  */
-public class VertxHttpClientResponseTransformer extends ResponseTransformer {
+public class VertxAsyncHttpClientResponseTransformer extends ResponseTransformer {
     public static final String NAME = "vertx-http-client-response-transformer";
 
     @Override
diff --git a/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientRestProxyTests.java b/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientRestProxyTests.java
new file mode 100644 (file)
index 0000000..bf11603
--- /dev/null
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.support.azure.core.http.vertx;
+
+import java.io.IOException;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.test.RestProxyTestsWireMockServer;
+import com.azure.core.test.implementation.RestProxyTests;
+import com.azure.core.util.Context;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import io.quarkus.test.QuarkusUnitTest;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+public class VertxAsyncHttpClientRestProxyTests extends RestProxyTests {
+    private static WireMockServer server;
+
+    @RegisterExtension
+    static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
+            .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
+                    .addAsResource("upload.txt", "upload.txt"));
+
+    @BeforeAll
+    public static void beforeAll() {
+        server = RestProxyTestsWireMockServer.getRestProxyTestsServer();
+        server.start();
+    }
+
+    @AfterAll
+    public static void afterAll() throws Exception {
+        if (server != null) {
+            server.shutdown();
+        }
+    }
+
+    @Override
+    protected int getWireMockPort() {
+        return server.port();
+    }
+
+    @Override
+    protected HttpClient createHttpClient() {
+        return new VertxAsyncHttpClientBuilder().build();
+    }
+
+    /*
+     * The following methods are overridden and reimplemented to work around issues with
+     * parameterized tests not working properly with QuarkusUnitTest.
+     */
+
+    @Override
+    @ParameterizedTest
+    @MethodSource("downloadTestArgumentProvider")
+    @Disabled
+    public void simpleDownloadTest(Context context) {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void simpleDownloadTest() {
+        downloadTestArgumentProvider().forEach(arguments -> {
+            Named<Context> named = (Named<Context>) arguments.get()[0];
+            super.simpleDownloadTest(named.getPayload());
+        });
+    }
+
+    @Override
+    @ParameterizedTest
+    @MethodSource("downloadTestArgumentProvider")
+    @Disabled
+    public void simpleDownloadTestAsync(Context context) {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void simpleDownloadTestAsync() {
+        downloadTestArgumentProvider().forEach(arguments -> {
+            Named<Context> named = (Named<Context>) arguments.get()[0];
+            super.simpleDownloadTestAsync(named.getPayload());
+        });
+    }
+
+    @Override
+    @ParameterizedTest
+    @MethodSource("downloadTestArgumentProvider")
+    @Disabled
+    public void streamResponseCanTransferBody(Context context) throws IOException {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void streamResponseCanTransferBody() {
+        downloadTestArgumentProvider().forEach(arguments -> {
+            Named<Context> named = (Named<Context>) arguments.get()[0];
+            try {
+                super.streamResponseCanTransferBody(named.getPayload());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Override
+    @ParameterizedTest
+    @MethodSource("downloadTestArgumentProvider")
+    @Disabled
+    public void streamResponseCanTransferBodyAsync(Context context) throws IOException {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void streamResponseCanTransferBodyAsync() {
+        downloadTestArgumentProvider().forEach(arguments -> {
+            Named<Context> named = (Named<Context>) arguments.get()[0];
+            try {
+                super.streamResponseCanTransferBodyAsync(named.getPayload());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+}
diff --git a/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientRestProxyWithAsyncHttpProxyTests.java b/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientRestProxyWithAsyncHttpProxyTests.java
new file mode 100644 (file)
index 0000000..2c35e71
--- /dev/null
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.support.azure.core.http.vertx;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.test.RestProxyTestsWireMockServer;
+import com.azure.core.test.implementation.RestProxyTests;
+import com.azure.core.util.Context;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import io.quarkus.test.QuarkusUnitTest;
+import io.quarkus.test.common.QuarkusTestResource;
+import org.eclipse.microprofile.config.Config;
+import org.eclipse.microprofile.config.ConfigProvider;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import static org.apache.camel.quarkus.support.azure.core.http.vertx.VertxHttpClientTestResource.PROXY_PASSWORD;
+import static org.apache.camel.quarkus.support.azure.core.http.vertx.VertxHttpClientTestResource.PROXY_USER;
+
+@QuarkusTestResource(VertxHttpClientTestResource.class)
+public class VertxAsyncHttpClientRestProxyWithAsyncHttpProxyTests extends RestProxyTests {
+    private static WireMockServer server;
+
+    @RegisterExtension
+    static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
+            .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
+                    .addAsResource("upload.txt", "upload.txt"));
+
+    @BeforeAll
+    public static void beforeAll() {
+        server = RestProxyTestsWireMockServer.getRestProxyTestsServer();
+        server.start();
+    }
+
+    @AfterAll
+    public static void afterAll() throws Exception {
+        if (server != null) {
+            server.shutdown();
+        }
+    }
+
+    @Override
+    protected int getWireMockPort() {
+        return server.port();
+    }
+
+    @Override
+    protected HttpClient createHttpClient() {
+        Config config = ConfigProvider.getConfig();
+        String proxyHost = config.getValue("tiny.proxy.host", String.class);
+        int proxyPort = config.getValue("tiny.proxy.port", int.class);
+
+        InetSocketAddress address = new InetSocketAddress(proxyHost, proxyPort);
+        ProxyOptions proxyOptions = new ProxyOptions(ProxyOptions.Type.HTTP, address);
+        proxyOptions.setCredentials(PROXY_USER, PROXY_PASSWORD);
+
+        return new VertxAsyncHttpClientBuilder()
+                .proxy(proxyOptions)
+                .build();
+    }
+
+    /*
+     * The following methods are overridden and reimplemented to work around issues with
+     * parameterized tests not working properly with QuarkusUnitTest.
+     */
+
+    @Override
+    @ParameterizedTest
+    @MethodSource("downloadTestArgumentProvider")
+    @Disabled
+    public void simpleDownloadTest(Context context) {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void simpleDownloadTest() {
+        downloadTestArgumentProvider().forEach(arguments -> {
+            Named<Context> named = (Named<Context>) arguments.get()[0];
+            super.simpleDownloadTest(named.getPayload());
+        });
+    }
+
+    @Override
+    @ParameterizedTest
+    @MethodSource("downloadTestArgumentProvider")
+    @Disabled
+    public void simpleDownloadTestAsync(Context context) {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void simpleDownloadTestAsync() {
+        downloadTestArgumentProvider().forEach(arguments -> {
+            Named<Context> named = (Named<Context>) arguments.get()[0];
+            super.simpleDownloadTestAsync(named.getPayload());
+        });
+    }
+
+    @Override
+    @ParameterizedTest
+    @MethodSource("downloadTestArgumentProvider")
+    @Disabled
+    public void streamResponseCanTransferBody(Context context) throws IOException {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void streamResponseCanTransferBody() {
+        downloadTestArgumentProvider().forEach(arguments -> {
+            Named<Context> named = (Named<Context>) arguments.get()[0];
+            try {
+                super.streamResponseCanTransferBody(named.getPayload());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Override
+    @ParameterizedTest
+    @MethodSource("downloadTestArgumentProvider")
+    @Disabled
+    public void streamResponseCanTransferBodyAsync(Context context) throws IOException {
+    }
+
+    @SuppressWarnings("unchecked")
+    @Test
+    public void streamResponseCanTransferBodyAsync() {
+        downloadTestArgumentProvider().forEach(arguments -> {
+            Named<Context> named = (Named<Context>) arguments.get()[0];
+            try {
+                super.streamResponseCanTransferBodyAsync(named.getPayload());
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        });
+    }
+
+    @Override
+    @Test
+    @Disabled
+    public void service19PutWithBodyParamApplicationOctetStreamContentTypeAndStringBodyWithEmptyBody() {
+        super.service19PutWithBodyParamApplicationOctetStreamContentTypeAndStringBodyWithEmptyBody();
+    }
+
+    @Override
+    @Test
+    @Disabled
+    public void service19PutWithHeaderApplicationJsonContentTypeAndCharsetAndStringBodyWithEmptyBody() {
+        super.service19PutWithHeaderApplicationJsonContentTypeAndCharsetAndStringBodyWithEmptyBody();
+    }
+
+    @Override
+    @Test
+    @Disabled
+    public void service19PutWithHeaderApplicationOctetStreamContentTypeAndStringBodyWithEmptyBody() {
+        super.service19PutWithHeaderApplicationOctetStreamContentTypeAndStringBodyWithEmptyBody();
+    }
+
+    @Override
+    @Test
+    @Disabled
+    public void service19PutWithNoContentTypeAndStringBodyWithEmptyBody() {
+        super.service19PutWithNoContentTypeAndStringBodyWithEmptyBody();
+    }
+}
diff --git a/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientSingletonTests.java b/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientSingletonTests.java
new file mode 100644 (file)
index 0000000..acb5cca
--- /dev/null
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.support.azure.core.http.vertx;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.test.utils.TestConfigurationSource;
+import com.azure.core.util.Configuration;
+import com.azure.core.util.ConfigurationBuilder;
+import com.azure.core.util.ConfigurationSource;
+import com.azure.core.util.HttpClientOptions;
+import io.quarkus.test.QuarkusUnitTest;
+import org.jboss.shrinkwrap.api.ShrinkWrap;
+import org.jboss.shrinkwrap.api.spec.JavaArchive;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+
+@Execution(ExecutionMode.SAME_THREAD)
+public class VertxAsyncHttpClientSingletonTests {
+    private static final ConfigurationSource EMPTY_SOURCE = new TestConfigurationSource();
+
+    @RegisterExtension
+    static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
+            .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
+                    .addClasses(VertxAsyncHttpClientProvider.class));
+
+    @Test
+    public void testSingletonClientInstanceCreation() {
+        Configuration configuration = getConfiguration(true);
+        HttpClient client1 = new VertxAsyncHttpClientProvider(configuration).createInstance();
+        HttpClient client2 = new VertxAsyncHttpClientProvider(configuration).createInstance();
+        assertEquals(client1, client2);
+    }
+
+    @Test
+    public void testNonDefaultClientInstanceCreation() {
+        Configuration configuration = getConfiguration(false);
+        HttpClient client1 = new VertxAsyncHttpClientProvider(configuration).createInstance();
+        HttpClient client2 = new VertxAsyncHttpClientProvider(configuration).createInstance();
+        assertNotEquals(client1, client2);
+    }
+
+    @Test
+    public void testCustomizedClientInstanceCreationNotShared() {
+        Configuration configuration = getConfiguration(false);
+        HttpClientOptions clientOptions = new HttpClientOptions().setMaximumConnectionPoolSize(500);
+        HttpClient client1 = new VertxAsyncHttpClientProvider(configuration).createInstance(clientOptions);
+        HttpClient client2 = new VertxAsyncHttpClientProvider(configuration).createInstance(clientOptions);
+        assertNotEquals(client1, client2);
+    }
+
+    @Test
+    public void testNullHttpClientOptionsInstanceCreation() {
+        Configuration configuration = getConfiguration(true);
+        HttpClient client1 = new VertxAsyncHttpClientProvider(configuration).createInstance(null);
+        HttpClient client2 = new VertxAsyncHttpClientProvider(configuration).createInstance(null);
+        assertEquals(client1, client2);
+    }
+
+    private static Configuration getConfiguration(boolean enableSharing) {
+        return new ConfigurationBuilder(EMPTY_SOURCE, EMPTY_SOURCE, new TestConfigurationSource()
+                .put("AZURE_ENABLE_HTTP_CLIENT_SHARING", Boolean.toString(enableSharing)))
+                        .build();
+    }
+}
@@ -30,26 +30,23 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.CountDownLatch;
 
-import javax.inject.Inject;
-
 import com.azure.core.http.HttpClient;
 import com.azure.core.http.HttpHeader;
 import com.azure.core.http.HttpHeaders;
 import com.azure.core.http.HttpMethod;
 import com.azure.core.http.HttpRequest;
 import com.azure.core.http.HttpResponse;
+import com.azure.core.util.Context;
+import com.azure.core.util.FluxUtil;
 import com.github.tomakehurst.wiremock.WireMockServer;
-import com.github.tomakehurst.wiremock.client.WireMock;
 import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
 import io.quarkus.test.QuarkusUnitTest;
-import io.vertx.core.Vertx;
-import io.vertx.core.VertxException;
+import io.vertx.core.http.HttpClosedException;
 import org.jboss.shrinkwrap.api.ShrinkWrap;
 import org.jboss.shrinkwrap.api.spec.JavaArchive;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import reactor.core.publisher.Flux;
@@ -58,46 +55,44 @@ import reactor.core.scheduler.Schedulers;
 import reactor.test.StepVerifier;
 import reactor.test.StepVerifierOptions;
 
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.post;
 import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertLinesMatch;
 
-@Disabled //https://github.com/apache/camel-quarkus/issues/4090
-public class VertxHttpClientTests {
+public class VertxAsyncHttpClientTests {
     static final String RETURN_HEADERS_AS_IS_PATH = "/returnHeadersAsIs";
-
     private static final String SHORT_BODY = "hi there";
     private static final String LONG_BODY = createLongBody();
-
     private static WireMockServer server;
 
-    @Inject
-    Vertx vertx;
-
     @RegisterExtension
     static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
             .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
-                    .addClass(VertxHttpClientResponseTransformer.class));
+                    .addClasses(VertxAsyncHttpClientResponseTransformer.class));
 
     @BeforeAll
-    public static void beforeClass() {
+    public static void beforeAll() {
         server = new WireMockServer(WireMockConfiguration.options()
-                .extensions(new VertxHttpClientResponseTransformer())
+                .extensions(new VertxAsyncHttpClientResponseTransformer())
                 .dynamicPort()
                 .disableRequestJournal()
                 .gzipDisabled(true));
 
-        server.stubFor(WireMock.get("/short").willReturn(WireMock.aResponse().withBody(SHORT_BODY)));
-        server.stubFor(WireMock.get("/long").willReturn(WireMock.aResponse().withBody(LONG_BODY)));
-        server.stubFor(WireMock.get("/error").willReturn(WireMock.aResponse().withBody("error").withStatus(500)));
-        server.stubFor(WireMock.post("/shortPost").willReturn(WireMock.aResponse().withBody(SHORT_BODY)));
-        server.stubFor(WireMock.get(RETURN_HEADERS_AS_IS_PATH).willReturn(WireMock.aResponse()
-                .withTransformers(VertxHttpClientResponseTransformer.NAME)));
-
+        server.stubFor(get("/short").willReturn(aResponse().withBody(SHORT_BODY)));
+        server.stubFor(get("/long").willReturn(aResponse().withBody(LONG_BODY)));
+        server.stubFor(get("/error").willReturn(aResponse().withBody("error").withStatus(500)));
+        server.stubFor(post("/shortPost").willReturn(aResponse().withBody(SHORT_BODY)));
+        server.stubFor(get(RETURN_HEADERS_AS_IS_PATH).willReturn(aResponse()
+                .withTransformers(VertxAsyncHttpClientResponseTransformer.NAME)));
+        server.stubFor(get("/empty").willReturn(aResponse().withBody(new byte[0])));
         server.start();
     }
 
     @AfterAll
-    public static void afterClass() {
+    public static void afterAll() throws Exception {
         if (server != null) {
             server.shutdown();
         }
@@ -113,6 +108,14 @@ public class VertxHttpClientTests {
         checkBodyReceived(LONG_BODY, "/long");
     }
 
+    @Test
+    public void responseBodyAsStringAsyncWithCharset() {
+        HttpClient client = new VertxAsyncHttpClientBuilder().build();
+        StepVerifier.create(doRequest(client, "/short").getBodyAsString(StandardCharsets.UTF_8))
+                .assertNext(result -> assertEquals(SHORT_BODY, result))
+                .verifyComplete();
+    }
+
     @Test
     public void testFlowableWhenServerReturnsBodyAndNoErrorsWhenHttp500Returned() {
         HttpResponse response = getResponse("/error");
@@ -143,7 +146,7 @@ public class VertxHttpClientTests {
 
     @Test
     public void testRequestBodyIsErrorShouldPropagateToResponse() {
-        HttpClient client = new VertxHttpClientProvider().createInstance();
+        HttpClient client = new VertxAsyncHttpClientProvider().createInstance();
         HttpRequest request = new HttpRequest(HttpMethod.POST, url(server, "/shortPost"))
                 .setHeader("Content-Length", "123")
                 .setBody(Flux.error(new RuntimeException("boo")));
@@ -155,7 +158,7 @@ public class VertxHttpClientTests {
 
     @Test
     public void testRequestBodyEndsInErrorShouldPropagateToResponse() {
-        HttpClient client = new VertxHttpClientProvider().createInstance();
+        HttpClient client = new VertxAsyncHttpClientProvider().createInstance();
         String contentChunk = "abcdefgh";
         int repetitions = 1000;
         HttpRequest request = new HttpRequest(HttpMethod.POST, url(server, "/shortPost"))
@@ -164,9 +167,14 @@ public class VertxHttpClientTests {
                         .repeat(repetitions)
                         .map(s -> ByteBuffer.wrap(s.getBytes(StandardCharsets.UTF_8)))
                         .concatWith(Flux.error(new RuntimeException("boo"))));
-        StepVerifier.create(client.send(request))
-                .expectErrorMessage("boo")
-                .verify(Duration.ofSeconds(10));
+
+        try {
+            StepVerifier.create(client.send(request))
+                    .expectErrorMessage("boo")
+                    .verify(Duration.ofSeconds(10));
+        } catch (Exception ex) {
+            assertEquals("boo", ex.getMessage());
+        }
     }
 
     @Test
@@ -182,10 +190,9 @@ public class VertxHttpClientTests {
                     // respond but don't send the complete response
                     byte[] bytes = new byte[1024];
                     int n = socket.getInputStream().read(bytes);
-                    System.out.println(new String(bytes, 0, n, StandardCharsets.UTF_8));
-                    String response = "HTTP/1.1 200 OK\r\n" //
-                            + "Content-Type: text/plain\r\n" //
-                            + "Content-Length: 10\r\n" //
+                    String response = "HTTP/1.1 200 OK\r\n"
+                            + "Content-Type: text/plain\r\n"
+                            + "Content-Length: 10\r\n"
                             + "\r\n" //
                             + "zi";
                     OutputStream out = socket.getOutputStream();
@@ -197,12 +204,15 @@ public class VertxHttpClientTests {
                 }).subscribeOn(Schedulers.boundedElastic()).subscribe();
                 //
                 latch.await();
-                HttpClient client = new VertxHttpClientBuilder(vertx).build();
+                HttpClient client = new VertxAsyncHttpClientBuilder().build();
                 HttpRequest request = new HttpRequest(HttpMethod.GET,
                         new URL("http://localhost:" + ss.getLocalPort() + "/ioException"));
 
-                StepVerifier.create(client.send(request))
-                        .verifyError(VertxException.class);
+                StepVerifier.create(client.send(request)
+                        .flatMap(response -> FluxUtil.collectBytesInByteBufferStream(response.getBody())
+                                .zipWith(Mono.just(response.getStatusCode()))))
+                        .expectError(HttpClosedException.class)
+                        .verify(Duration.ofSeconds(5));
             }
         });
     }
@@ -210,14 +220,14 @@ public class VertxHttpClientTests {
     @Test
     public void testConcurrentRequests() throws NoSuchAlgorithmException {
         int numRequests = 100; // 100 = 1GB of data read
-        HttpClient client = new VertxHttpClientProvider().createInstance();
+        HttpClient client = new VertxAsyncHttpClientProvider().createInstance();
         byte[] expectedDigest = digest(LONG_BODY);
         long expectedByteCount = (long) numRequests * LONG_BODY.getBytes(StandardCharsets.UTF_8).length;
 
         Mono<Long> numBytesMono = Flux.range(1, numRequests)
                 .parallel(10)
                 .runOn(Schedulers.boundedElastic())
-                .flatMap(n -> Mono.fromCallable(() -> getResponse(client, "/long")).flatMapMany(response -> {
+                .flatMap(n -> Mono.fromCallable(() -> getResponse(client, "/long", Context.NONE)).flatMapMany(response -> {
                     MessageDigest md = md5Digest();
                     return response.getBody()
                             .doOnNext(buffer -> md.update(buffer.duplicate()))
@@ -235,7 +245,7 @@ public class VertxHttpClientTests {
 
     @Test
     public void validateHeadersReturnAsIs() {
-        HttpClient client = new VertxHttpClientProvider().createInstance();
+        HttpClient client = new VertxAsyncHttpClientProvider().createInstance();
 
         final String singleValueHeaderName = "singleValue";
         final String singleValueHeaderValue = "value";
@@ -250,7 +260,7 @@ public class VertxHttpClientTests {
         StepVerifier.create(client.send(new HttpRequest(HttpMethod.GET, url(server, RETURN_HEADERS_AS_IS_PATH),
                 headers, Flux.empty())))
                 .assertNext(response -> {
-                    Assertions.assertEquals(200, response.getStatusCode());
+                    assertEquals(200, response.getStatusCode());
 
                     HttpHeaders responseHeaders = response.getHeaders();
                     HttpHeader singleValueHeader = responseHeaders.get(singleValueHeaderName);
@@ -259,11 +269,53 @@ public class VertxHttpClientTests {
 
                     HttpHeader multiValueHeader = responseHeaders.get("Multi-value");
                     assertEquals(multiValueHeaderName, multiValueHeader.getName());
+                    assertLinesMatch(multiValueHeaderValue, multiValueHeader.getValuesList());
                 })
                 .expectComplete()
                 .verify(Duration.ofSeconds(10));
     }
 
+    @Test
+    public void testBufferedResponse() {
+        Context context = new Context("azure-eagerly-read-response", true);
+        HttpClient client = new VertxAsyncHttpClientBuilder().build();
+        HttpResponse response = getResponse(client, "/short", context);
+
+        StepVerifier.create(response.getBody())
+                .assertNext(buffer -> {
+                    assertEquals(SHORT_BODY, new String(buffer.array()));
+                })
+                .verifyComplete();
+    }
+
+    @Test
+    public void testEmptyBufferResponse() {
+        HttpResponse response = getResponse("/empty");
+
+        StepVerifierOptions stepVerifierOptions = StepVerifierOptions.create();
+        stepVerifierOptions.initialRequest(0);
+
+        StepVerifier.create(response.getBody(), stepVerifierOptions)
+                .expectNextCount(0)
+                .thenRequest(1)
+                .verifyComplete();
+    }
+
+    @Test
+    public void testEmptyBufferedResponse() {
+        Context context = new Context("azure-eagerly-read-response", true);
+        HttpClient client = new VertxAsyncHttpClientBuilder().build();
+        HttpResponse response = getResponse(client, "/empty", context);
+
+        StepVerifierOptions stepVerifierOptions = StepVerifierOptions.create();
+        stepVerifierOptions.initialRequest(0);
+
+        StepVerifier.create(response.getBody(), stepVerifierOptions)
+                .expectNextCount(0)
+                .thenRequest(1)
+                .verifyComplete();
+    }
+
     private static MessageDigest md5Digest() {
         try {
             return MessageDigest.getInstance("MD5");
@@ -278,14 +330,14 @@ public class VertxHttpClientTests {
         return md.digest();
     }
 
-    private HttpResponse getResponse(String path) {
-        HttpClient client = new VertxHttpClientBuilder(vertx).build();
-        return getResponse(client, path);
+    private static HttpResponse getResponse(String path) {
+        HttpClient client = new VertxAsyncHttpClientBuilder().build();
+        return getResponse(client, path, Context.NONE);
     }
 
-    private static HttpResponse getResponse(HttpClient client, String path) {
+    private static HttpResponse getResponse(HttpClient client, String path, Context context) {
         HttpRequest request = new HttpRequest(HttpMethod.GET, url(server, path));
-        return client.send(request).block();
+        return client.send(request, context).block();
     }
 
     static URL url(WireMockServer server, String path) {
@@ -306,7 +358,7 @@ public class VertxHttpClientTests {
     }
 
     private void checkBodyReceived(String expectedBody, String path) {
-        HttpClient client = new VertxHttpClientBuilder(vertx).build();
+        HttpClient client = new VertxAsyncHttpClientBuilder().build();
         StepVerifier.create(doRequest(client, path).getBodyAsByteArray())
                 .assertNext(bytes -> assertEquals(expectedBody, new String(bytes, StandardCharsets.UTF_8)))
                 .verifyComplete();
diff --git a/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientBuilderTests.java b/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientBuilderTests.java
deleted file mode 100644 (file)
index bddc6cb..0000000
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.support.azure.core.http.vertx;
-
-import java.net.InetSocketAddress;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-
-import com.azure.core.http.HttpClient;
-import com.azure.core.http.HttpMethod;
-import com.azure.core.http.HttpRequest;
-import com.azure.core.http.ProxyOptions;
-import com.azure.core.util.Configuration;
-import com.github.tomakehurst.wiremock.WireMockServer;
-import com.github.tomakehurst.wiremock.client.WireMock;
-import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
-import io.vertx.core.Vertx;
-import io.vertx.ext.web.client.WebClientOptions;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.EnumSource;
-import reactor.test.StepVerifier;
-
-import static org.apache.camel.quarkus.support.azure.core.http.vertx.VertxHttpClientTestResource.PROXY_PASSWORD;
-import static org.apache.camel.quarkus.support.azure.core.http.vertx.VertxHttpClientTestResource.PROXY_USER;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-
-/**
- * Tests {@link VertxHttpClientBuilder}.
- */
-@Disabled //https://github.com/apache/camel-quarkus/issues/4090
-public class VertxHttpClientBuilderTests {
-    private static final String COOKIE_VALIDATOR_PATH = "/cookieValidator";
-    private static final String DEFAULT_PATH = "/default";
-    private static final String DISPATCHER_PATH = "/dispatcher";
-
-    private static final WireMockServer server = new WireMockServer(
-            WireMockConfiguration.options().dynamicPort().disableRequestJournal());
-    private static final Vertx vertx = Vertx.vertx();
-
-    private static String defaultUrl;
-
-    @BeforeAll
-    public static void setupWireMock() {
-        // Mocked endpoint to test building a client with a prebuilt client.
-        server.stubFor(WireMock.get(COOKIE_VALIDATOR_PATH).withCookie("test", WireMock.matching("success"))
-                .willReturn(WireMock.aResponse().withStatus(200)));
-
-        // Mocked endpoint to test building a client with a timeout.
-        server.stubFor(WireMock.get(DEFAULT_PATH).willReturn(WireMock.aResponse().withStatus(200)));
-
-        // Mocked endpoint to test building a client with a dispatcher and uses a delayed response.
-        server.stubFor(WireMock.get(DISPATCHER_PATH).willReturn(WireMock.aResponse().withStatus(200)
-                .withFixedDelay(5000)));
-
-        server.start();
-
-        defaultUrl = "http://localhost:" + server.port() + DEFAULT_PATH;
-    }
-
-    @AfterAll
-    public static void afterAll() throws InterruptedException {
-        if (server.isRunning()) {
-            server.shutdown();
-        }
-        CountDownLatch latch = new CountDownLatch(1);
-        vertx.close(x -> latch.countDown());
-        latch.await();
-    }
-
-    @Test
-    public void buildWithConfigurationNone() {
-        HttpClient client = new VertxHttpClientBuilder(vertx)
-                .configuration(Configuration.NONE)
-                .build();
-        try {
-            StepVerifier.create(client.send(new HttpRequest(HttpMethod.GET, defaultUrl)))
-                    .assertNext(response -> assertEquals(200, response.getStatusCode()))
-                    .verifyComplete();
-        } finally {
-            ((VertxHttpClient) client).close();
-        }
-    }
-
-    @Test
-    public void buildWithDefaultConnectionOptions() {
-        WebClientOptions options = new WebClientOptions();
-
-        HttpClient client = new VertxHttpClientBuilder(vertx)
-                .webClientOptions(options)
-                .build();
-
-        try {
-            StepVerifier.create(client.send(new HttpRequest(HttpMethod.GET, defaultUrl)))
-                    .assertNext(response -> assertEquals(200, response.getStatusCode()))
-                    .verifyComplete();
-
-            assertEquals(10000, options.getConnectTimeout());
-            assertEquals(60, options.getIdleTimeout());
-            assertEquals(60, options.getReadIdleTimeout());
-            assertEquals(60, options.getWriteIdleTimeout());
-        } finally {
-            ((VertxHttpClient) client).close();
-        }
-    }
-
-    @Test
-    public void buildWithConnectionOptions() {
-        WebClientOptions options = new WebClientOptions();
-
-        HttpClient client = new VertxHttpClientBuilder(vertx)
-                .webClientOptions(options)
-                .connectTimeout(Duration.ofSeconds(10))
-                .idleTimeout(Duration.ofSeconds(20))
-                .readIdleTimeout(Duration.ofSeconds(30))
-                .writeIdleTimeout(Duration.ofSeconds(40))
-                .build();
-
-        try {
-            StepVerifier.create(client.send(new HttpRequest(HttpMethod.GET, defaultUrl)))
-                    .assertNext(response -> assertEquals(200, response.getStatusCode()))
-                    .verifyComplete();
-
-            assertEquals(10000, options.getConnectTimeout());
-            assertEquals(20, options.getIdleTimeout());
-            assertEquals(30, options.getReadIdleTimeout());
-            assertEquals(40, options.getWriteIdleTimeout());
-        } finally {
-            ((VertxHttpClient) client).close();
-        }
-    }
-
-    @ParameterizedTest
-    @EnumSource(ProxyOptions.Type.class)
-    public void allProxyOptions(ProxyOptions.Type type) {
-        WebClientOptions options = new WebClientOptions();
-        InetSocketAddress address = new InetSocketAddress("localhost", 8888);
-        ProxyOptions proxyOptions = new ProxyOptions(type, address);
-        proxyOptions.setCredentials(PROXY_USER, PROXY_PASSWORD);
-        proxyOptions.setNonProxyHosts("foo.*|*bar.com|microsoft.com");
-
-        HttpClient client = new VertxHttpClientBuilder(vertx)
-                .webClientOptions(options)
-                .proxy(proxyOptions)
-                .build();
-
-        try {
-            io.vertx.core.net.ProxyOptions vertxProxyOptions = options.getProxyOptions();
-            assertEquals(address.getHostName(), vertxProxyOptions.getHost());
-            assertEquals(address.getPort(), vertxProxyOptions.getPort());
-            assertEquals(type.name(), vertxProxyOptions.getType().name());
-            assertEquals(PROXY_USER, vertxProxyOptions.getUsername());
-            assertEquals(PROXY_PASSWORD, vertxProxyOptions.getPassword());
-
-            List<String> proxyHosts = new ArrayList<>();
-            proxyHosts.add("foo*");
-            proxyHosts.add("*bar.com");
-            proxyHosts.add("microsoft.com");
-            assertEquals(proxyHosts, options.getNonProxyHosts());
-        } finally {
-            ((VertxHttpClient) client).close();
-        }
-    }
-}
diff --git a/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientProviderTests.java b/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientProviderTests.java
deleted file mode 100644 (file)
index 588d38b..0000000
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.support.azure.core.http.vertx;
-
-import java.net.InetSocketAddress;
-import java.time.Duration;
-
-import com.azure.core.http.ProxyOptions;
-import com.azure.core.util.Configuration;
-import com.azure.core.util.HttpClientOptions;
-import io.quarkus.test.QuarkusUnitTest;
-import io.vertx.ext.web.client.WebClientOptions;
-import org.jboss.shrinkwrap.api.ShrinkWrap;
-import org.jboss.shrinkwrap.api.spec.JavaArchive;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertNull;
-
-/**
- * Tests {@link VertxHttpClientProvider}.
- */
-@Disabled //https://github.com/apache/camel-quarkus/issues/4090
-public class VertxHttpClientProviderTests {
-
-    @RegisterExtension
-    static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
-            .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class));
-
-    @Test
-    public void nullOptionsReturnsBaseClient() {
-        VertxHttpClient httpClient = (VertxHttpClient) new VertxHttpClientProvider()
-                .createInstance(null);
-
-        ProxyOptions environmentProxy = ProxyOptions.fromConfiguration(Configuration.getGlobalConfiguration());
-        WebClientOptions options = httpClient.getWebClientOptions();
-        io.vertx.core.net.ProxyOptions proxyOptions = options.getProxyOptions();
-        if (environmentProxy == null) {
-            assertNull(proxyOptions);
-        } else {
-            assertNotNull(proxyOptions);
-            assertEquals(environmentProxy.getAddress().getHostName(), proxyOptions.getHost());
-        }
-    }
-
-    @Test
-    public void defaultOptionsReturnsBaseClient() {
-        VertxHttpClient httpClient = (VertxHttpClient) new VertxHttpClientProvider()
-                .createInstance(new HttpClientOptions());
-
-        ProxyOptions environmentProxy = ProxyOptions.fromConfiguration(Configuration.getGlobalConfiguration());
-        WebClientOptions options = httpClient.getWebClientOptions();
-        io.vertx.core.net.ProxyOptions proxyOptions = options.getProxyOptions();
-        if (environmentProxy == null) {
-            assertNull(proxyOptions);
-        } else {
-            assertNotNull(proxyOptions);
-            assertEquals(environmentProxy.getAddress().getHostName(), proxyOptions.getHost());
-        }
-    }
-
-    @Test
-    public void optionsWithAProxy() {
-        ProxyOptions proxyOptions = new ProxyOptions(ProxyOptions.Type.HTTP, new InetSocketAddress("localhost", 8888));
-        proxyOptions.setNonProxyHosts("foo.*|bar.*|cheese.com|wine.org");
-
-        HttpClientOptions clientOptions = new HttpClientOptions().setProxyOptions(proxyOptions);
-
-        VertxHttpClient httpClient = (VertxHttpClient) new VertxHttpClientProvider()
-                .createInstance(clientOptions);
-
-        WebClientOptions options = httpClient.getWebClientOptions();
-        io.vertx.core.net.ProxyOptions vertxProxyOptions = options.getProxyOptions();
-        assertNotNull(vertxProxyOptions);
-        assertEquals(proxyOptions.getAddress().getHostName(), vertxProxyOptions.getHost());
-        assertEquals(proxyOptions.getAddress().getPort(), vertxProxyOptions.getPort());
-        assertEquals(proxyOptions.getType().name(), vertxProxyOptions.getType().name());
-    }
-
-    @Test
-    public void optionsWithTimeouts() {
-        long expectedTimeout = 15000;
-        Duration timeout = Duration.ofMillis(expectedTimeout);
-        HttpClientOptions clientOptions = new HttpClientOptions()
-                .setWriteTimeout(timeout)
-                .setResponseTimeout(timeout)
-                .setReadTimeout(timeout);
-
-        VertxHttpClient httpClient = (VertxHttpClient) new VertxHttpClientProvider()
-                .createInstance(clientOptions);
-
-        WebClientOptions options = httpClient.getWebClientOptions();
-
-        assertEquals(timeout.getSeconds(), options.getWriteIdleTimeout());
-        assertEquals(timeout.getSeconds(), options.getReadIdleTimeout());
-    }
-}
diff --git a/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientRestProxyTests.java b/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientRestProxyTests.java
deleted file mode 100644 (file)
index 61c55fd..0000000
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.support.azure.core.http.vertx;
-
-import javax.inject.Inject;
-
-import com.azure.core.http.HttpClient;
-import com.azure.core.test.RestProxyTestsWireMockServer;
-import com.azure.core.test.implementation.RestProxyTests;
-import com.github.tomakehurst.wiremock.WireMockServer;
-import io.quarkus.test.QuarkusUnitTest;
-import io.vertx.core.Vertx;
-import org.jboss.shrinkwrap.api.ShrinkWrap;
-import org.jboss.shrinkwrap.api.spec.JavaArchive;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-@Disabled //https://github.com/apache/camel-quarkus/issues/4090
-public class VertxHttpClientRestProxyTests extends RestProxyTests {
-    private final static WireMockServer server = RestProxyTestsWireMockServer.getRestProxyTestsServer();
-
-    @RegisterExtension
-    static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
-            .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
-                    .addAsResource("upload.txt", "upload.txt"));
-
-    @Inject
-    Vertx vertx;
-
-    @BeforeAll
-    public static void getWireMockServer() {
-        server.start();
-    }
-
-    @AfterAll
-    public static void shutdownWireMockServer() {
-        server.shutdown();
-    }
-
-    @Override
-    protected int getWireMockPort() {
-        return server.port();
-    }
-
-    @Override
-    protected HttpClient createHttpClient() {
-        return new VertxHttpClientBuilder(vertx).build();
-    }
-}
diff --git a/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientRestProxyWithHttpProxyTests.java b/extensions-support/azure-core-http-client-vertx/deployment/src/test/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientRestProxyWithHttpProxyTests.java
deleted file mode 100644 (file)
index 794caa0..0000000
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.support.azure.core.http.vertx;
-
-import java.net.InetSocketAddress;
-
-import javax.inject.Inject;
-
-import com.azure.core.http.HttpClient;
-import com.azure.core.http.ProxyOptions;
-import com.azure.core.test.RestProxyTestsWireMockServer;
-import com.azure.core.test.implementation.RestProxyTests;
-import com.github.tomakehurst.wiremock.WireMockServer;
-import io.quarkus.test.QuarkusUnitTest;
-import io.quarkus.test.common.QuarkusTestResource;
-import io.vertx.core.Vertx;
-import org.eclipse.microprofile.config.Config;
-import org.eclipse.microprofile.config.ConfigProvider;
-import org.jboss.shrinkwrap.api.ShrinkWrap;
-import org.jboss.shrinkwrap.api.spec.JavaArchive;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
-import org.junit.jupiter.api.extension.RegisterExtension;
-
-import static org.apache.camel.quarkus.support.azure.core.http.vertx.VertxHttpClientTestResource.PROXY_PASSWORD;
-import static org.apache.camel.quarkus.support.azure.core.http.vertx.VertxHttpClientTestResource.PROXY_USER;
-
-@Disabled //https://github.com/apache/camel-quarkus/issues/4090
-@QuarkusTestResource(VertxHttpClientTestResource.class)
-public class VertxHttpClientRestProxyWithHttpProxyTests extends RestProxyTests {
-    private static WireMockServer server;
-
-    @RegisterExtension
-    static final QuarkusUnitTest CONFIG = new QuarkusUnitTest()
-            .setArchiveProducer(() -> ShrinkWrap.create(JavaArchive.class)
-                    .addAsResource("upload.txt", "upload.txt"));
-
-    @Inject
-    Vertx vertx;
-
-    @BeforeAll
-    public static void getWireMockServer() {
-        server = RestProxyTestsWireMockServer.getRestProxyTestsServer();
-        server.start();
-    }
-
-    @AfterAll
-    public static void shutdownWireMockServer() {
-        if (server != null) {
-            server.shutdown();
-        }
-    }
-
-    @Override
-    protected int getWireMockPort() {
-        return server.port();
-    }
-
-    @Override
-    protected HttpClient createHttpClient() {
-        Config config = ConfigProvider.getConfig();
-        String proxyHost = config.getValue("tiny.proxy.host", String.class);
-        int proxyPort = config.getValue("tiny.proxy.port", int.class);
-
-        InetSocketAddress address = new InetSocketAddress(proxyHost, proxyPort);
-        ProxyOptions proxyOptions = new ProxyOptions(ProxyOptions.Type.HTTP, address);
-        proxyOptions.setCredentials(PROXY_USER, PROXY_PASSWORD);
-
-        return new VertxHttpClientBuilder(vertx)
-                .proxy(proxyOptions)
-                .build();
-    }
-}
index c0a491ffe6dacd7f09c80ceedd5a1124ec0ea780..4f3ca842db48f799cc8a9ea2ff92ad1202a253c3 100644 (file)
             <groupId>io.quarkus</groupId>
             <artifactId>quarkus-vertx</artifactId>
         </dependency>
-        <dependency>
-            <groupId>io.vertx</groupId>
-            <artifactId>vertx-web-client</artifactId>
-        </dependency>
         <dependency>
             <groupId>com.azure</groupId>
             <artifactId>azure-core</artifactId>
@@ -22,42 +22,11 @@ import javax.enterprise.inject.spi.Bean;
 import javax.enterprise.inject.spi.BeanManager;
 import javax.enterprise.inject.spi.CDI;
 
-import com.azure.core.http.HttpClient;
-import com.azure.core.http.HttpClientProvider;
-import com.azure.core.util.HttpClientOptions;
 import io.vertx.core.Vertx;
-import io.vertx.ext.web.client.WebClient;
-
-/**
- * {@link HttpClientProvider} backed by the Vert.x {@link WebClient}
- */
-public class VertxHttpClientProvider implements HttpClientProvider {
-
-    @Override
-    public HttpClient createInstance() {
-        return createInstance(null);
-    }
 
+public class QuarkusVertxProvider implements VertxProvider {
     @Override
-    public HttpClient createInstance(HttpClientOptions clientOptions) {
-        VertxHttpClientBuilder builder = new VertxHttpClientBuilder(getVertx());
-        if (clientOptions != null) {
-            builder = builder.proxy(clientOptions.getProxyOptions())
-                    .configuration(clientOptions.getConfiguration())
-                    .connectTimeout(clientOptions.getConnectTimeout())
-                    .idleTimeout(clientOptions.getConnectionIdleTimeout())
-                    .writeIdleTimeout(clientOptions.getWriteTimeout())
-                    .readIdleTimeout(clientOptions.getReadTimeout());
-        }
-        return builder.build();
-    }
-
-    /**
-     * Obtains a reference to the Quarkus managed {@link Vertx} instance
-     * 
-     * @return The Quarkus managed {@link Vertx} instance
-     */
-    private static final Vertx getVertx() {
+    public Vertx createVertx() {
         BeanManager beanManager = CDI.current().getBeanManager();
         Set<Bean<?>> beans = beanManager.getBeans(Vertx.class);
         if (beans.isEmpty()) {
diff --git a/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClient.java b/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClient.java
new file mode 100644 (file)
index 0000000..82e8737
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.support.azure.core.http.vertx;
+
+import java.nio.ByteBuffer;
+import java.util.Objects;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.HttpHeaders;
+import com.azure.core.http.HttpMethod;
+import com.azure.core.http.HttpRequest;
+import com.azure.core.http.HttpResponse;
+import com.azure.core.util.Context;
+import com.azure.core.util.Contexts;
+import com.azure.core.util.ProgressReporter;
+import io.netty.buffer.Unpooled;
+import io.vertx.core.Vertx;
+import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClientRequest;
+import io.vertx.core.http.HttpClientResponse;
+import io.vertx.core.http.RequestOptions;
+import org.apache.camel.quarkus.support.azure.core.http.vertx.implementation.BufferedVertxHttpResponse;
+import org.apache.camel.quarkus.support.azure.core.http.vertx.implementation.VertxHttpAsyncResponse;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+
+/**
+ * {@link HttpClient} implementation for the Vert.x {@link io.vertx.core.http.HttpClient}.
+ */
+class VertxAsyncHttpClient implements HttpClient {
+    private final Scheduler scheduler;
+    final io.vertx.core.http.HttpClient client;
+
+    /**
+     * Constructs a {@link VertxAsyncHttpClient}.
+     *
+     * @param client The Vert.x {@link io.vertx.core.http.HttpClient}
+     */
+    VertxAsyncHttpClient(io.vertx.core.http.HttpClient client, Vertx vertx) {
+        Objects.requireNonNull(client, "client cannot be null");
+        Objects.requireNonNull(vertx, "vertx cannot be null");
+        this.client = client;
+        this.scheduler = Schedulers.fromExecutor(vertx.nettyEventLoopGroup());
+    }
+
+    @Override
+    public Mono<HttpResponse> send(HttpRequest request) {
+        return send(request, Context.NONE);
+    }
+
+    @Override
+    public Mono<HttpResponse> send(HttpRequest request, Context context) {
+        boolean eagerlyReadResponse = (boolean) context.getData("azure-eagerly-read-response").orElse(false);
+        ProgressReporter progressReporter = Contexts.with(context).getHttpRequestProgressReporter();
+        return Mono.create(sink -> toVertxHttpRequest(request).subscribe(vertxHttpRequest -> {
+            vertxHttpRequest.exceptionHandler(sink::error);
+
+            HttpHeaders requestHeaders = request.getHeaders();
+            if (requestHeaders != null) {
+                requestHeaders.stream().forEach(header -> vertxHttpRequest.putHeader(header.getName(), header.getValuesList()));
+                if (request.getHeaders().get("Content-Length") == null) {
+                    vertxHttpRequest.setChunked(true);
+                }
+            } else {
+                vertxHttpRequest.setChunked(true);
+            }
+
+            vertxHttpRequest.response(event -> {
+                if (event.succeeded()) {
+                    HttpClientResponse vertxHttpResponse = event.result();
+                    vertxHttpResponse.exceptionHandler(sink::error);
+
+                    if (eagerlyReadResponse) {
+                        vertxHttpResponse.body(bodyEvent -> {
+                            if (bodyEvent.succeeded()) {
+                                sink.success(new BufferedVertxHttpResponse(request, vertxHttpResponse,
+                                        bodyEvent.result()));
+                            } else {
+                                sink.error(bodyEvent.cause());
+                            }
+                        });
+                    } else {
+                        sink.success(new VertxHttpAsyncResponse(request, vertxHttpResponse));
+                    }
+                } else {
+                    sink.error(event.cause());
+                }
+            });
+
+            getRequestBody(request, progressReporter)
+                    .subscribeOn(scheduler)
+                    .map(Unpooled::wrappedBuffer)
+                    .map(Buffer::buffer)
+                    .subscribe(vertxHttpRequest::write, sink::error, vertxHttpRequest::end);
+        }, sink::error));
+    }
+
+    private Mono<HttpClientRequest> toVertxHttpRequest(HttpRequest request) {
+        HttpMethod httpMethod = request.getHttpMethod();
+        io.vertx.core.http.HttpMethod requestMethod = io.vertx.core.http.HttpMethod.valueOf(httpMethod.name());
+
+        RequestOptions options = new RequestOptions();
+        options.setMethod(requestMethod);
+        options.setAbsoluteURI(request.getUrl());
+        return Mono.fromCompletionStage(client.request(options).toCompletionStage());
+    }
+
+    private Flux<ByteBuffer> getRequestBody(HttpRequest request, ProgressReporter progressReporter) {
+        Flux<ByteBuffer> body = request.getBody();
+        if (body == null) {
+            return Flux.empty();
+        }
+
+        if (progressReporter != null) {
+            body = body.map(buffer -> {
+                progressReporter.reportProgress(buffer.remaining());
+                return buffer;
+            });
+        }
+
+        return body;
+    }
+}
diff --git a/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientBuilder.java b/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientBuilder.java
new file mode 100644 (file)
index 0000000..3ec5a59
--- /dev/null
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.support.azure.core.http.vertx;
+
+import java.net.InetSocketAddress;
+import java.time.Duration;
+import java.util.Iterator;
+import java.util.ServiceLoader;
+import java.util.regex.Pattern;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.ProxyOptions;
+import com.azure.core.util.Configuration;
+import com.azure.core.util.CoreUtils;
+import com.azure.core.util.logging.ClientLogger;
+import io.vertx.core.Vertx;
+import io.vertx.core.http.HttpClientOptions;
+import io.vertx.core.net.ProxyType;
+
+import static com.azure.core.util.Configuration.PROPERTY_AZURE_REQUEST_CONNECT_TIMEOUT;
+import static com.azure.core.util.Configuration.PROPERTY_AZURE_REQUEST_READ_TIMEOUT;
+import static com.azure.core.util.Configuration.PROPERTY_AZURE_REQUEST_WRITE_TIMEOUT;
+import static com.azure.core.util.CoreUtils.getDefaultTimeoutFromEnvironment;
+
+/**
+ * Builds a {@link VertxAsyncHttpClient}.
+ */
+public class VertxAsyncHttpClientBuilder {
+    private static final ClientLogger LOGGER = new ClientLogger(VertxAsyncHttpClientBuilder.class);
+    private static final Pattern NON_PROXY_HOSTS_SPLIT = Pattern.compile("(?<!\\\\)\\|");
+    private static final Pattern NON_PROXY_HOST_DESANITIZE = Pattern.compile("(\\?|\\\\|\\(|\\)|\\\\E|\\\\Q|\\.\\.)");
+    private static final Pattern NON_PROXY_HOST_DOT_STAR = Pattern.compile("(\\.\\*)");
+    private static final long DEFAULT_CONNECT_TIMEOUT;
+    private static final long DEFAULT_WRITE_TIMEOUT;
+    private static final long DEFAULT_READ_TIMEOUT;
+
+    static {
+        Configuration configuration = Configuration.getGlobalConfiguration();
+        DEFAULT_CONNECT_TIMEOUT = getDefaultTimeoutFromEnvironment(configuration,
+                PROPERTY_AZURE_REQUEST_CONNECT_TIMEOUT, Duration.ofSeconds(10), LOGGER).toMillis();
+        DEFAULT_WRITE_TIMEOUT = getDefaultTimeoutFromEnvironment(configuration, PROPERTY_AZURE_REQUEST_WRITE_TIMEOUT,
+                Duration.ofSeconds(60), LOGGER).getSeconds();
+        DEFAULT_READ_TIMEOUT = getDefaultTimeoutFromEnvironment(configuration, PROPERTY_AZURE_REQUEST_READ_TIMEOUT,
+                Duration.ofSeconds(60), LOGGER).getSeconds();
+    }
+
+    private Duration readIdleTimeout;
+    private Duration writeIdleTimeout;
+    private Duration connectTimeout;
+    private Duration idleTimeout = Duration.ofSeconds(60);
+    private ProxyOptions proxyOptions;
+    private Configuration configuration;
+    private HttpClientOptions httpClientOptions;
+    private Vertx vertx;
+
+    /**
+     * Sets the read idle timeout.
+     *
+     * The default read idle timeout is 60 seconds.
+     *
+     * @param  readIdleTimeout the read idle timeout
+     * @return                 the updated VertxAsyncHttpClientBuilder object
+     */
+    public VertxAsyncHttpClientBuilder readIdleTimeout(Duration readIdleTimeout) {
+        this.readIdleTimeout = readIdleTimeout;
+        return this;
+    }
+
+    /**
+     * Sets the write idle timeout.
+     *
+     * The default read idle timeout is 60 seconds.
+     *
+     * @param  writeIdleTimeout the write idle timeout
+     * @return                  the updated VertxAsyncHttpClientBuilder object
+     */
+    public VertxAsyncHttpClientBuilder writeIdleTimeout(Duration writeIdleTimeout) {
+        this.writeIdleTimeout = writeIdleTimeout;
+        return this;
+    }
+
+    /**
+     * Sets the connect timeout.
+     *
+     * The default connect timeout is 10 seconds.
+     *
+     * @param  connectTimeout the connection timeout
+     * @return                the updated VertxAsyncHttpClientBuilder object
+     */
+    public VertxAsyncHttpClientBuilder connectTimeout(Duration connectTimeout) {
+        this.connectTimeout = connectTimeout;
+        return this;
+    }
+
+    /**
+     * Sets the connection idle timeout.
+     *
+     * The default connect timeout is 60 seconds.
+     *
+     * @param  idleTimeout the connection idle timeout
+     * @return             the updated VertxAsyncHttpClientBuilder object
+     */
+    public VertxAsyncHttpClientBuilder idleTimeout(Duration idleTimeout) {
+        this.idleTimeout = idleTimeout;
+        return this;
+    }
+
+    /**
+     * Sets proxy configuration.
+     *
+     * @param  proxyOptions The proxy configuration to use.
+     * @return              The updated VertxAsyncHttpClientBuilder object.
+     */
+    public VertxAsyncHttpClientBuilder proxy(ProxyOptions proxyOptions) {
+        this.proxyOptions = proxyOptions;
+        return this;
+    }
+
+    /**
+     * Sets the configuration store that is used during construction of the HTTP client.
+     * <p>
+     * The default configuration store is a clone of the
+     * {@link Configuration#getGlobalConfiguration() global configuration store}, use {@link Configuration#NONE} to
+     * bypass using configuration settings during construction.
+     *
+     * @param  configuration The configuration store.
+     * @return               The updated VertxAsyncHttpClientBuilder object.
+     */
+    public VertxAsyncHttpClientBuilder configuration(Configuration configuration) {
+        this.configuration = configuration;
+        return this;
+    }
+
+    /**
+     * Sets custom {@link HttpClientOptions} for the constructed {@link io.vertx.core.http.HttpClient}.
+     *
+     * @param  httpClientOptions The options of the web client.
+     * @return                   The updated VertxAsyncHttpClientBuilder object
+     */
+    public VertxAsyncHttpClientBuilder httpClientOptions(HttpClientOptions httpClientOptions) {
+        this.httpClientOptions = httpClientOptions;
+        return this;
+    }
+
+    /**
+     * Sets a custom {@link Vertx} instance that the constructed {@link io.vertx.core.http.HttpClient} will be created
+     * with.
+     *
+     * @param  vertx The vertx instance.
+     * @return       The updated VertxAsyncHttpClientBuilder object
+     */
+    public VertxAsyncHttpClientBuilder vertx(Vertx vertx) {
+        this.vertx = vertx;
+        return this;
+    }
+
+    /**
+     * Creates a new Vert.x {@link HttpClient} instance on every call, using the configuration set in the builder at the
+     * time of the build method call.
+     *
+     * @return A new Vert.x backed {@link HttpClient} instance.
+     */
+    public HttpClient build() {
+        Vertx configuredVertx = this.vertx;
+        if (configuredVertx == null) {
+            ServiceLoader<VertxProvider> vertxProviders = ServiceLoader.load(VertxProvider.class,
+                    VertxProvider.class.getClassLoader());
+            Iterator<VertxProvider> iterator = vertxProviders.iterator();
+            if (iterator.hasNext()) {
+                VertxProvider provider = iterator.next();
+                configuredVertx = provider.createVertx();
+                LOGGER.verbose("Using {} as the VertxProvider.", provider.getClass().getName());
+
+                while (iterator.hasNext()) {
+                    VertxProvider ignoredProvider = iterator.next();
+                    LOGGER.warning("Multiple VertxProviders were found on the classpath, ignoring {}.",
+                            ignoredProvider.getClass().getName());
+                }
+            } else {
+                throw new RuntimeException("Unable to find usable Vertx instance");
+            }
+        }
+
+        if (this.httpClientOptions == null) {
+            this.httpClientOptions = new HttpClientOptions();
+
+            if (this.connectTimeout != null) {
+                this.httpClientOptions.setConnectTimeout((int) this.connectTimeout.toMillis());
+            } else {
+                this.httpClientOptions.setConnectTimeout((int) DEFAULT_CONNECT_TIMEOUT);
+            }
+
+            if (this.readIdleTimeout != null) {
+                this.httpClientOptions.setReadIdleTimeout((int) this.readIdleTimeout.getSeconds());
+            } else {
+                this.httpClientOptions.setReadIdleTimeout((int) DEFAULT_READ_TIMEOUT);
+            }
+
+            if (this.writeIdleTimeout != null) {
+                this.httpClientOptions.setWriteIdleTimeout((int) this.writeIdleTimeout.getSeconds());
+            } else {
+                this.httpClientOptions.setWriteIdleTimeout((int) DEFAULT_WRITE_TIMEOUT);
+            }
+
+            this.httpClientOptions.setIdleTimeout((int) this.idleTimeout.getSeconds());
+
+            Configuration buildConfiguration = (this.configuration == null)
+                    ? Configuration.getGlobalConfiguration()
+                    : configuration;
+
+            ProxyOptions buildProxyOptions = (this.proxyOptions == null)
+                    ? ProxyOptions.fromConfiguration(buildConfiguration, true)
+                    : this.proxyOptions;
+
+            if (buildProxyOptions != null) {
+                io.vertx.core.net.ProxyOptions vertxProxyOptions = new io.vertx.core.net.ProxyOptions();
+                InetSocketAddress proxyAddress = buildProxyOptions.getAddress();
+
+                if (proxyAddress != null) {
+                    vertxProxyOptions.setHost(proxyAddress.getHostName());
+                    vertxProxyOptions.setPort(proxyAddress.getPort());
+                }
+
+                String proxyUsername = buildProxyOptions.getUsername();
+                String proxyPassword = buildProxyOptions.getPassword();
+                if (!CoreUtils.isNullOrEmpty(proxyUsername) && !CoreUtils.isNullOrEmpty(proxyPassword)) {
+                    vertxProxyOptions.setUsername(proxyUsername);
+                    vertxProxyOptions.setPassword(proxyPassword);
+                }
+
+                ProxyOptions.Type type = buildProxyOptions.getType();
+                if (type != null) {
+                    try {
+                        ProxyType proxyType = ProxyType.valueOf(type.name());
+                        vertxProxyOptions.setType(proxyType);
+                    } catch (IllegalArgumentException e) {
+                        throw LOGGER.logExceptionAsError(
+                                new IllegalArgumentException("Unknown Vert.x proxy type: " + type.name(), e));
+                    }
+                }
+
+                String nonProxyHosts = buildProxyOptions.getNonProxyHosts();
+                if (!CoreUtils.isNullOrEmpty(nonProxyHosts)) {
+                    for (String nonProxyHost : desanitizedNonProxyHosts(nonProxyHosts)) {
+                        this.httpClientOptions.addNonProxyHost(nonProxyHost);
+                    }
+                }
+
+                this.httpClientOptions.setProxyOptions(vertxProxyOptions);
+            }
+        }
+
+        io.vertx.core.http.HttpClient client = configuredVertx.createHttpClient(this.httpClientOptions);
+        return new VertxAsyncHttpClient(client, configuredVertx);
+    }
+
+    /**
+     * Reverses non proxy host string sanitization applied by {@link ProxyOptions}.
+     *
+     * This is necessary as Vert.x will apply its own sanitization logic.
+     *
+     * @param  nonProxyHosts The list of non proxy hosts
+     * @return               String array of desanitized proxy host strings
+     */
+    private String[] desanitizedNonProxyHosts(String nonProxyHosts) {
+        String desanitzedNonProxyHosts = NON_PROXY_HOST_DESANITIZE.matcher(nonProxyHosts)
+                .replaceAll("");
+
+        desanitzedNonProxyHosts = NON_PROXY_HOST_DOT_STAR.matcher(desanitzedNonProxyHosts)
+                .replaceAll("*");
+
+        return NON_PROXY_HOSTS_SPLIT.split(desanitzedNonProxyHosts);
+    }
+}
diff --git a/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientProvider.java b/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxAsyncHttpClientProvider.java
new file mode 100644 (file)
index 0000000..83a3fd5
--- /dev/null
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.support.azure.core.http.vertx;
+
+import com.azure.core.http.HttpClient;
+import com.azure.core.http.HttpClientProvider;
+import com.azure.core.util.Configuration;
+import com.azure.core.util.HttpClientOptions;
+
+/**
+ * {@link HttpClientProvider} backed by the Vert.x {@link io.vertx.core.http.HttpClient}
+ */
+public class VertxAsyncHttpClientProvider implements HttpClientProvider {
+    private static final boolean AZURE_ENABLE_HTTP_CLIENT_SHARING = Configuration.getGlobalConfiguration()
+            .get("AZURE_ENABLE_HTTP_CLIENT_SHARING", Boolean.FALSE);
+    private final boolean enableHttpClientSharing;
+
+    // Enum Singleton Pattern
+    private enum GlobalVertxHttpClient {
+        HTTP_CLIENT(new VertxAsyncHttpClientBuilder().build());
+
+        private final HttpClient httpClient;
+
+        GlobalVertxHttpClient(HttpClient httpClient) {
+            this.httpClient = httpClient;
+        }
+
+        private HttpClient getHttpClient() {
+            return httpClient;
+        }
+    }
+
+    /**
+     * For testing purpose only, assigning 'AZURE_ENABLE_HTTP_CLIENT_SHARING' to 'enableHttpClientSharing' for
+     * 'final' modifier.
+     */
+    public VertxAsyncHttpClientProvider() {
+        enableHttpClientSharing = AZURE_ENABLE_HTTP_CLIENT_SHARING;
+    }
+
+    VertxAsyncHttpClientProvider(Configuration configuration) {
+        enableHttpClientSharing = configuration.get("AZURE_ENABLE_HTTP_CLIENT_SHARING", Boolean.FALSE);
+    }
+
+    @Override
+    public HttpClient createInstance() {
+        if (enableHttpClientSharing) {
+            return GlobalVertxHttpClient.HTTP_CLIENT.getHttpClient();
+        }
+        return new VertxAsyncHttpClientBuilder().build();
+    }
+
+    @Override
+    public HttpClient createInstance(HttpClientOptions clientOptions) {
+        if (clientOptions == null) {
+            return createInstance();
+        }
+
+        return new VertxAsyncHttpClientBuilder()
+                .proxy(clientOptions.getProxyOptions())
+                .configuration(clientOptions.getConfiguration())
+                .connectTimeout(clientOptions.getConnectTimeout())
+                .idleTimeout(clientOptions.getConnectionIdleTimeout())
+                .writeIdleTimeout(clientOptions.getWriteTimeout())
+                .readIdleTimeout(clientOptions.getReadTimeout())
+                .build();
+    }
+}
diff --git a/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpAsyncResponse.java b/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpAsyncResponse.java
deleted file mode 100644 (file)
index 99bc421..0000000
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.support.azure.core.http.vertx;
-
-import java.nio.ByteBuffer;
-
-import com.azure.core.http.HttpRequest;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.ext.web.client.HttpResponse;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-class VertxHttpAsyncResponse extends VertxHttpResponse {
-
-    VertxHttpAsyncResponse(HttpRequest request, HttpResponse<Buffer> response) {
-        super(request, response);
-    }
-
-    @Override
-    public Flux<ByteBuffer> getBody() {
-        Buffer responseBody = getVertxHttpResponse().bodyAsBuffer();
-        if (responseBody == null || responseBody.length() == 0) {
-            return Flux.empty();
-        }
-        return Flux.just(responseBody.getByteBuf().nioBuffer());
-    }
-
-    @Override
-    public Mono<byte[]> getBodyAsByteArray() {
-        return Mono.fromCallable(() -> {
-            Buffer responseBody = getVertxHttpResponse().bodyAsBuffer();
-            if (responseBody == null || responseBody.length() == 0) {
-                return null;
-            }
-            return responseBody.getBytes();
-        });
-    }
-}
diff --git a/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClient.java b/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClient.java
deleted file mode 100644 (file)
index 5500a7d..0000000
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.support.azure.core.http.vertx;
-
-import java.io.Closeable;
-import java.net.MalformedURLException;
-import java.net.URL;
-import java.nio.ByteBuffer;
-import java.util.Objects;
-
-import com.azure.core.http.HttpClient;
-import com.azure.core.http.HttpMethod;
-import com.azure.core.http.HttpRequest;
-import com.azure.core.http.HttpResponse;
-import com.azure.core.util.Context;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.ext.web.client.WebClient;
-import io.vertx.ext.web.client.WebClientOptions;
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-/**
- * {@link HttpClient} implementation for the Vert.x {@link WebClient}.
- */
-public class VertxHttpClient implements HttpClient, Closeable {
-
-    private final WebClient client;
-    private final WebClientOptions options;
-
-    public VertxHttpClient(WebClient client, WebClientOptions options) {
-        Objects.requireNonNull(client, "client cannot be null");
-        Objects.requireNonNull(client, "options cannot be null");
-        this.client = client;
-        this.options = options;
-    }
-
-    @Override
-    public Mono<HttpResponse> send(HttpRequest request) {
-        return send(request, Context.NONE);
-    }
-
-    @Override
-    public Mono<HttpResponse> send(HttpRequest request, Context context) {
-        boolean eagerlyReadResponse = (boolean) context.getData("azure-eagerly-read-response").orElse(false);
-        return Mono.create(sink -> sink.onRequest(value -> {
-            toVertxHttpRequest(request).subscribe(vertxHttpRequest -> {
-                vertxHttpRequest.send(new VertxHttpResponseHandler(request, sink, eagerlyReadResponse));
-            }, sink::error);
-        }));
-    }
-
-    public void close() {
-        this.client.close();
-    }
-
-    // Exposed for testing
-    public WebClientOptions getWebClientOptions() {
-        return options;
-    }
-
-    private Mono<VertxHttpRequest> toVertxHttpRequest(HttpRequest request) {
-        return Mono.from(convertBodyToBuffer(request))
-                .map(buffer -> {
-                    HttpMethod httpMethod = request.getHttpMethod();
-                    io.vertx.core.http.HttpMethod requestMethod = io.vertx.core.http.HttpMethod.valueOf(httpMethod.name());
-
-                    URL url = request.getUrl();
-                    if (url.getPath().isEmpty()) {
-                        try {
-                            // Azure API documentation states:
-                            //
-                            // The URI must always include the forward slash (/) to separate the host name
-                            // from the path and query portions of the URI.
-                            //
-                            url = new URL(url.getProtocol(), url.getHost(), url.getPort(), "/" + url.getFile());
-                        } catch (MalformedURLException e) {
-                            throw new IllegalStateException(e);
-                        }
-                    }
-
-                    io.vertx.ext.web.client.HttpRequest<Buffer> delegate = client
-                            .requestAbs(requestMethod, url.toString());
-
-                    if (request.getHeaders() != null) {
-                        request.getHeaders()
-                                .stream()
-                                .forEach(httpHeader -> delegate.putHeader(httpHeader.getName(),
-                                        httpHeader.getValuesList()));
-                    }
-
-                    return new VertxHttpRequest(delegate, buffer);
-                });
-    }
-
-    private Mono<Buffer> convertBodyToBuffer(HttpRequest request) {
-        return Mono.using(() -> Buffer.buffer(),
-                buffer -> getBody(request).reduce(buffer, (b, byteBuffer) -> {
-                    for (int i = 0; i < byteBuffer.limit(); i++) {
-                        b.appendByte(byteBuffer.get(i));
-                    }
-                    return b;
-                }), buffer -> buffer.getClass());
-    }
-
-    private Flux<ByteBuffer> getBody(HttpRequest request) {
-        long contentLength = 0;
-        String contentLengthHeader = request.getHeaders().getValue("content-length");
-        if (contentLengthHeader != null) {
-            contentLength = Long.parseLong(contentLengthHeader);
-        }
-
-        Flux<ByteBuffer> body = request.getBody();
-        if (body == null || contentLength <= 0) {
-            body = Flux.just(Buffer.buffer().getByteBuf().nioBuffer());
-        }
-
-        return body;
-    }
-}
diff --git a/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientBuilder.java b/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpClientBuilder.java
deleted file mode 100644 (file)
index cd7adbf..0000000
+++ /dev/null
@@ -1,264 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.support.azure.core.http.vertx;
-
-import java.net.InetSocketAddress;
-import java.time.Duration;
-import java.util.Objects;
-import java.util.regex.Pattern;
-
-import com.azure.core.http.HttpClient;
-import com.azure.core.http.ProxyOptions;
-import com.azure.core.util.Configuration;
-import com.azure.core.util.CoreUtils;
-import com.azure.core.util.logging.ClientLogger;
-import io.vertx.core.Vertx;
-import io.vertx.core.net.ProxyType;
-import io.vertx.ext.web.client.WebClient;
-import io.vertx.ext.web.client.WebClientOptions;
-
-import static com.azure.core.util.Configuration.PROPERTY_AZURE_REQUEST_CONNECT_TIMEOUT;
-import static com.azure.core.util.Configuration.PROPERTY_AZURE_REQUEST_READ_TIMEOUT;
-import static com.azure.core.util.Configuration.PROPERTY_AZURE_REQUEST_WRITE_TIMEOUT;
-import static com.azure.core.util.CoreUtils.getDefaultTimeoutFromEnvironment;
-
-/**
- * Builds a {@link VertxHttpClient}.
- */
-public class VertxHttpClientBuilder {
-    private static final Pattern NON_PROXY_HOSTS_SPLIT = Pattern.compile("(?<!\\\\)\\|");
-    private static final Pattern NON_PROXY_HOST_DESANITIZE = Pattern.compile("(\\?|\\\\|\\(|\\)|\\\\E|\\\\Q|\\.\\.)");
-    private static final Pattern NON_PROXY_HOST_DOT_STAR = Pattern.compile("(\\.\\*)");
-    private static final long DEFAULT_CONNECT_TIMEOUT;
-    private static final long DEFAULT_WRITE_TIMEOUT;
-    private static final long DEFAULT_READ_TIMEOUT;
-
-    static {
-        ClientLogger logger = new ClientLogger(VertxHttpClientBuilder.class);
-        Configuration configuration = Configuration.getGlobalConfiguration();
-        DEFAULT_CONNECT_TIMEOUT = getDefaultTimeoutFromEnvironment(configuration,
-                PROPERTY_AZURE_REQUEST_CONNECT_TIMEOUT, Duration.ofSeconds(10), logger).toMillis();
-        DEFAULT_WRITE_TIMEOUT = getDefaultTimeoutFromEnvironment(configuration, PROPERTY_AZURE_REQUEST_WRITE_TIMEOUT,
-                Duration.ofSeconds(60), logger).toSeconds();
-        DEFAULT_READ_TIMEOUT = getDefaultTimeoutFromEnvironment(configuration, PROPERTY_AZURE_REQUEST_READ_TIMEOUT,
-                Duration.ofSeconds(60), logger).toSeconds();
-    }
-
-    private Duration readIdleTimeout;
-    private Duration writeIdleTimeout;
-    private Duration connectTimeout;
-    private Duration idleTimeout = Duration.ofSeconds(60);
-    private ProxyOptions proxyOptions;
-    private Configuration configuration;
-    private WebClientOptions webClientOptions;
-    private final Vertx vertx;
-
-    /**
-     * Creates VertxAsyncHttpClientBuilder.
-     *
-     * @param vertx The {@link Vertx} instance to pass to the {@link WebClient}.
-     */
-    public VertxHttpClientBuilder(Vertx vertx) {
-        Objects.requireNonNull(vertx, "vertx cannot be null");
-        this.vertx = vertx;
-    }
-
-    /**
-     * Sets the read idle timeout.
-     *
-     * The default read idle timeout is 60 seconds.
-     *
-     * @param  readIdleTimeout the read idle timeout
-     * @return                 the updated VertxAsyncHttpClientBuilder object
-     */
-    public VertxHttpClientBuilder readIdleTimeout(Duration readIdleTimeout) {
-        this.readIdleTimeout = readIdleTimeout;
-        return this;
-    }
-
-    /**
-     * Sets the write idle timeout.
-     *
-     * The default read idle timeout is 60 seconds.
-     *
-     * @param  writeIdleTimeout the write idle timeout
-     * @return                  the updated VertxAsyncHttpClientBuilder object
-     */
-    public VertxHttpClientBuilder writeIdleTimeout(Duration writeIdleTimeout) {
-        this.writeIdleTimeout = writeIdleTimeout;
-        return this;
-    }
-
-    /**
-     * Sets the connect timeout.
-     *
-     * The default connect timeout is 10 seconds.
-     *
-     * @param  connectTimeout the connection timeout
-     * @return                the updated VertxAsyncHttpClientBuilder object
-     */
-    public VertxHttpClientBuilder connectTimeout(Duration connectTimeout) {
-        this.connectTimeout = connectTimeout;
-        return this;
-    }
-
-    /**
-     * Sets the connection idle timeout.
-     *
-     * The default connect timeout is 60 seconds.
-     *
-     * @param  idleTimeout the connection idle timeout
-     * @return             the updated VertxAsyncHttpClientBuilder object
-     */
-    public VertxHttpClientBuilder idleTimeout(Duration idleTimeout) {
-        this.idleTimeout = idleTimeout;
-        return this;
-    }
-
-    /**
-     * Sets proxy configuration.
-     *
-     * @param  proxyOptions The proxy configuration to use.
-     * @return              The updated VertxAsyncHttpClientBuilder object.
-     */
-    public VertxHttpClientBuilder proxy(ProxyOptions proxyOptions) {
-        this.proxyOptions = proxyOptions;
-        return this;
-    }
-
-    /**
-     * Sets the configuration store that is used during construction of the HTTP client.
-     * <p>
-     * The default configuration store is a clone of the {@link Configuration#getGlobalConfiguration() global
-     * configuration store}, use {@link Configuration#NONE} to bypass using configuration settings during construction.
-     *
-     * @param  configuration The configuration store.
-     * @return               The updated VertxAsyncHttpClientBuilder object.
-     */
-    public VertxHttpClientBuilder configuration(Configuration configuration) {
-        this.configuration = configuration;
-        return this;
-    }
-
-    /**
-     * Sets custom {@link WebClientOptions} for the constructed {@link WebClient}.
-     *
-     * @param  webClientOptions The options of the web client.
-     * @return                  The updated VertxAsyncHttpClientBuilder object
-     */
-    public VertxHttpClientBuilder webClientOptions(WebClientOptions webClientOptions) {
-        this.webClientOptions = webClientOptions;
-        return this;
-    }
-
-    /**
-     * Creates a new Vert.x {@link com.azure.core.http.HttpClient} instance on every call, using the
-     * configuration set in the builder at the time of the build method call.
-     *
-     * @return A new Vert.x backed {@link com.azure.core.http.HttpClient} instance.
-     */
-    public HttpClient build() {
-        if (this.webClientOptions == null) {
-            this.webClientOptions = new WebClientOptions();
-        }
-
-        if (this.connectTimeout != null) {
-            this.webClientOptions.setConnectTimeout((int) this.connectTimeout.toMillis());
-        } else {
-            this.webClientOptions.setConnectTimeout((int) DEFAULT_CONNECT_TIMEOUT);
-        }
-
-        if (this.readIdleTimeout != null) {
-            this.webClientOptions.setReadIdleTimeout((int) this.readIdleTimeout.toSeconds());
-        } else {
-            this.webClientOptions.setReadIdleTimeout((int) DEFAULT_READ_TIMEOUT);
-        }
-
-        if (this.writeIdleTimeout != null) {
-            this.webClientOptions.setWriteIdleTimeout((int) this.writeIdleTimeout.toSeconds());
-        } else {
-            this.webClientOptions.setWriteIdleTimeout((int) DEFAULT_WRITE_TIMEOUT);
-        }
-
-        this.webClientOptions.setIdleTimeout((int) this.idleTimeout.toSeconds());
-
-        Configuration buildConfiguration = (configuration == null)
-                ? Configuration.getGlobalConfiguration()
-                : configuration;
-
-        ProxyOptions buildProxyOptions = (this.proxyOptions == null && buildConfiguration != Configuration.NONE)
-                ? ProxyOptions.fromConfiguration(buildConfiguration, true)
-                : this.proxyOptions;
-
-        if (buildProxyOptions != null) {
-            io.vertx.core.net.ProxyOptions vertxProxyOptions = new io.vertx.core.net.ProxyOptions();
-            InetSocketAddress proxyAddress = buildProxyOptions.getAddress();
-
-            if (proxyAddress != null) {
-                vertxProxyOptions.setHost(proxyAddress.getHostName());
-                vertxProxyOptions.setPort(proxyAddress.getPort());
-            }
-
-            String proxyUsername = buildProxyOptions.getUsername();
-            String proxyPassword = buildProxyOptions.getPassword();
-            if (proxyUsername != null && proxyPassword != null) {
-                vertxProxyOptions.setUsername(proxyUsername);
-                vertxProxyOptions.setPassword(proxyPassword);
-            }
-
-            ProxyOptions.Type type = buildProxyOptions.getType();
-            if (type != null) {
-                try {
-                    ProxyType proxyType = ProxyType.valueOf(type.name());
-                    vertxProxyOptions.setType(proxyType);
-                } catch (IllegalArgumentException e) {
-                    throw new IllegalStateException("Unknown Vert.x proxy type: " + type.name(), e);
-                }
-            }
-
-            String nonProxyHosts = buildProxyOptions.getNonProxyHosts();
-            if (!CoreUtils.isNullOrEmpty(nonProxyHosts)) {
-                for (String nonProxyHost : desanitizedNonProxyHosts(nonProxyHosts)) {
-                    this.webClientOptions.addNonProxyHost(nonProxyHost);
-                }
-            }
-
-            webClientOptions.setProxyOptions(vertxProxyOptions);
-        }
-
-        WebClient client = WebClient.create(this.vertx, this.webClientOptions);
-        return new VertxHttpClient(client, this.webClientOptions);
-    }
-
-    /**
-     * Reverses non proxy host string sanitization applied by {@link ProxyOptions}.
-     *
-     * This is necessary as Vert.x will apply its own sanitization logic.
-     *
-     * @param  nonProxyHosts The list of non proxy hosts
-     * @return               String array of desanitized proxy host strings
-     */
-    private String[] desanitizedNonProxyHosts(String nonProxyHosts) {
-        String desanitzedNonProxyHosts = NON_PROXY_HOST_DESANITIZE.matcher(nonProxyHosts)
-                .replaceAll("");
-
-        desanitzedNonProxyHosts = NON_PROXY_HOST_DOT_STAR.matcher(desanitzedNonProxyHosts)
-                .replaceAll("*");
-
-        return NON_PROXY_HOSTS_SPLIT.split(desanitzedNonProxyHosts);
-    }
-}
diff --git a/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpResponseHandler.java b/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/VertxHttpResponseHandler.java
deleted file mode 100644 (file)
index 9e33e0b..0000000
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.support.azure.core.http.vertx;
-
-import com.azure.core.http.HttpRequest;
-import io.vertx.core.AsyncResult;
-import io.vertx.core.Handler;
-import io.vertx.core.buffer.Buffer;
-import io.vertx.ext.web.client.HttpResponse;
-import reactor.core.publisher.MonoSink;
-
-/**
- * {@link Handler} for Azure HTTP responses.
- */
-class VertxHttpResponseHandler implements Handler<AsyncResult<HttpResponse<Buffer>>> {
-
-    private final HttpRequest request;
-    private final MonoSink<com.azure.core.http.HttpResponse> sink;
-    private final boolean eagerlyReadResponse;
-
-    VertxHttpResponseHandler(HttpRequest request, MonoSink<com.azure.core.http.HttpResponse> sink,
-            boolean eagerlyReadResponse) {
-        this.request = request;
-        this.sink = sink;
-        this.eagerlyReadResponse = eagerlyReadResponse;
-    }
-
-    @Override
-    public void handle(AsyncResult<HttpResponse<Buffer>> event) {
-        if (event.succeeded()) {
-            VertxHttpResponse response;
-            if (eagerlyReadResponse) {
-                io.vertx.ext.web.client.HttpResponse<Buffer> originalResponse = event.result();
-                response = new BufferedVertxHttpResponse(request, originalResponse, originalResponse.body());
-            } else {
-                response = new VertxHttpAsyncResponse(request, event.result());
-            }
-            sink.success(response);
-        } else {
-            if (event.cause() != null) {
-                sink.error(event.cause());
-            }
-        }
-    }
-}
  */
 package org.apache.camel.quarkus.support.azure.core.http.vertx;
 
-import io.vertx.core.buffer.Buffer;
-import io.vertx.ext.web.client.HttpRequest;
+import io.vertx.core.Vertx;
 
 /**
- * Holds a Vert.x {@link HttpRequest} together with a body payload.
+ * Service provider interface providing platforms and applications the means to have their own managed
+ * {@link Vertx} be resolved by the {@link VertxAsyncHttpClientBuilder}.
  */
-class VertxHttpRequest {
-    private final Buffer body;
-    private final HttpRequest<Buffer> delegate;
+public interface VertxProvider {
 
-    public VertxHttpRequest(HttpRequest<Buffer> delegate, Buffer body) {
-        this.delegate = delegate;
-        this.body = body;
-    }
-
-    public void send(VertxHttpResponseHandler handler) {
-        delegate.sendBuffer(body, handler);
-    }
+    /**
+     * Creates a {@link Vertx}. Could either be the result of returning {@code Vertx.vertx()},
+     * or returning a {@link Vertx} that was resolved from a dependency injection framework like Spring or CDI.
+     *
+     * @return The created {@link Vertx}.
+     */
+    Vertx createVertx();
 }
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.quarkus.support.azure.core.http.vertx;
+package org.apache.camel.quarkus.support.azure.core.http.vertx.implementation;
 
-import java.io.ByteArrayInputStream;
-import java.io.InputStream;
 import java.nio.ByteBuffer;
 
 import com.azure.core.http.HttpRequest;
 import com.azure.core.http.HttpResponse;
+import com.azure.core.util.BinaryData;
 import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClientResponse;
 import reactor.core.publisher.Flux;
 import reactor.core.publisher.Mono;
 
-final class BufferedVertxHttpResponse extends VertxHttpAsyncResponse {
-
+public final class BufferedVertxHttpResponse extends VertxHttpAsyncResponse {
     private final Buffer body;
 
-    BufferedVertxHttpResponse(HttpRequest request, io.vertx.ext.web.client.HttpResponse<Buffer> response, Buffer body) {
-        super(request, response);
+    public BufferedVertxHttpResponse(HttpRequest azureHttpRequest, HttpClientResponse vertxHttpResponse, Buffer body) {
+        super(azureHttpRequest, vertxHttpResponse);
         this.body = body;
     }
 
+    @Override
+    public BinaryData getBodyAsBinaryData() {
+        return BinaryData.fromBytes(body.getBytes());
+    }
+
     @Override
     public Flux<ByteBuffer> getBody() {
         return Flux.defer(() -> {
-            if (this.body == null || this.body.length() == 0) {
+            if (this.body.length() == 0) {
                 return Flux.empty();
             }
-            return Flux.just(this.body.getByteBuf().nioBuffer());
+            return Flux.just(ByteBuffer.wrap(this.body.getBytes()));
         });
     }
 
     @Override
     public Mono<byte[]> getBodyAsByteArray() {
         return Mono.defer(() -> {
-            if (this.body == null || this.body.length() == 0) {
+            if (this.body.length() == 0) {
                 return Mono.empty();
             }
             return Mono.just(this.body.getBytes());
         });
     }
 
-    @Override
-    public Mono<InputStream> getBodyAsInputStream() {
-        return Mono.defer(() -> {
-            if (this.body == null || this.body.length() == 0) {
-                return Mono.empty();
-            }
-            return Mono.just(new ByteArrayInputStream(this.body.getBytes()));
-        });
-    }
-
     @Override
     public HttpResponse buffer() {
         return this;
diff --git a/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/implementation/VertxHttpAsyncResponse.java b/extensions-support/azure-core-http-client-vertx/runtime/src/main/java/org/apache/camel/quarkus/support/azure/core/http/vertx/implementation/VertxHttpAsyncResponse.java
new file mode 100644 (file)
index 0000000..e872b08
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.support.azure.core.http.vertx.implementation;
+
+import java.nio.ByteBuffer;
+
+import com.azure.core.http.HttpRequest;
+import com.azure.core.util.FluxUtil;
+import io.vertx.core.http.HttpClientResponse;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * Default HTTP response for Vert.x.
+ */
+public class VertxHttpAsyncResponse extends VertxHttpResponseBase {
+    public VertxHttpAsyncResponse(HttpRequest azureHttpRequest, HttpClientResponse vertxHttpResponse) {
+        super(azureHttpRequest, vertxHttpResponse);
+        vertxHttpResponse.pause();
+    }
+
+    @Override
+    public Flux<ByteBuffer> getBody() {
+        return streamResponseBody();
+    }
+
+    @Override
+    public Mono<byte[]> getBodyAsByteArray() {
+        return FluxUtil.collectBytesFromNetworkResponse(streamResponseBody(), getHeaders())
+                .flatMap(bytes -> (bytes == null || bytes.length == 0)
+                        ? Mono.empty()
+                        : Mono.just(bytes));
+    }
+
+    private Flux<ByteBuffer> streamResponseBody() {
+        HttpClientResponse vertxHttpResponse = getVertxHttpResponse();
+        return Flux.create(sink -> {
+            vertxHttpResponse.handler(buffer -> {
+                sink.next(buffer.getByteBuf().nioBuffer());
+            }).endHandler(event -> {
+                sink.complete();
+            }).exceptionHandler(sink::error);
+
+            vertxHttpResponse.resume();
+        });
+    }
+}
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.camel.quarkus.support.azure.core.http.vertx;
+package org.apache.camel.quarkus.support.azure.core.http.vertx.implementation;
 
 import java.nio.charset.Charset;
 
@@ -23,18 +23,18 @@ import com.azure.core.http.HttpRequest;
 import com.azure.core.http.HttpResponse;
 import com.azure.core.util.CoreUtils;
 import io.vertx.core.MultiMap;
-import io.vertx.core.buffer.Buffer;
+import io.vertx.core.http.HttpClientResponse;
 import reactor.core.publisher.Mono;
 
-abstract class VertxHttpResponse extends HttpResponse {
+abstract class VertxHttpResponseBase extends HttpResponse {
 
-    private final io.vertx.ext.web.client.HttpResponse<Buffer> response;
+    private final HttpClientResponse vertxHttpResponse;
     private final HttpHeaders headers;
 
-    VertxHttpResponse(HttpRequest request, io.vertx.ext.web.client.HttpResponse<Buffer> response) {
-        super(request);
-        this.response = response;
-        this.headers = fromVertxHttpHeaders(response.headers());
+    VertxHttpResponseBase(HttpRequest azureHttpRequest, HttpClientResponse vertxHttpResponse) {
+        super(azureHttpRequest);
+        this.vertxHttpResponse = vertxHttpResponse;
+        this.headers = fromVertxHttpHeaders(vertxHttpResponse.headers());
     }
 
     private HttpHeaders fromVertxHttpHeaders(MultiMap headers) {
@@ -43,13 +43,13 @@ abstract class VertxHttpResponse extends HttpResponse {
         return azureHeaders;
     }
 
-    protected io.vertx.ext.web.client.HttpResponse<Buffer> getVertxHttpResponse() {
-        return this.response;
+    protected HttpClientResponse getVertxHttpResponse() {
+        return this.vertxHttpResponse;
     }
 
     @Override
     public int getStatusCode() {
-        return response.statusCode();
+        return this.vertxHttpResponse.statusCode();
     }
 
     @Override
@@ -69,6 +69,6 @@ abstract class VertxHttpResponse extends HttpResponse {
 
     @Override
     public final Mono<String> getBodyAsString(Charset charset) {
-        return Mono.fromCallable(() -> this.response.bodyAsString(charset.toString()));
+        return getBodyAsByteArray().map(bytes -> CoreUtils.bomAwareToString(bytes, charset.toString()));
     }
 }
index 8487b59c8fa5d7a331b677f98f704f875daad012..98565fba0c7e6b458c28e659dab3226981e52e01 100644 (file)
@@ -1 +1 @@
-org.apache.camel.quarkus.support.azure.core.http.vertx.VertxHttpClientProvider
\ No newline at end of file
+org.apache.camel.quarkus.support.azure.core.http.vertx.VertxAsyncHttpClientProvider
\ No newline at end of file
diff --git a/extensions-support/azure-core-http-client-vertx/runtime/src/main/resources/META-INF/services/org.apache.camel.quarkus.support.azure.core.http.vertx.VertxProvider b/extensions-support/azure-core-http-client-vertx/runtime/src/main/resources/META-INF/services/org.apache.camel.quarkus.support.azure.core.http.vertx.VertxProvider
new file mode 100644 (file)
index 0000000..9ee24de
--- /dev/null
@@ -0,0 +1 @@
+org.apache.camel.quarkus.support.azure.core.http.vertx.QuarkusVertxProvider
\ No newline at end of file
index a01965de75d77ff8090535d73b03181ccb5c45ab..2a76d1ed8ac47370c4ba711919c0286fc42db3e9 100644 (file)
                 </exclusion>
             </exclusions>
         </dependency>
+        <!-- TODO: Investigate whether this can be removed https://github.com/apache/camel-quarkus/issues/4091 -->
+        <dependency>
+            <groupId>io.netty</groupId>
+            <artifactId>netty-tcnative-boringssl-static</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.microsoft.azure</groupId>
             <artifactId>msal4j</artifactId>
             <artifactId>svm</artifactId>
             <scope>provided</scope>
         </dependency>
-<!--        quick fix - see https://github.com/apache/camel-quarkus/issues/4091 -->
-        <dependency>
-            <groupId>io.netty</groupId>
-            <artifactId>netty-tcnative-boringssl-static</artifactId>
-        </dependency>
     </dependencies>
 
     <build>