Optimize file path builder and have separate handler for streaming file trunk
authorSaranya Krishnakumar <saranya_k@apple.com>
Mon, 27 Jun 2022 17:47:26 +0000 (10:47 -0700)
committerYifan Cai <yifan_cai@apple.com>
Mon, 27 Jun 2022 17:48:32 +0000 (10:48 -0700)
patch by Francisco Guerrero, Saranya Krishnakumar; reviewed by Yifan Cai, Dinesh Joshi for CASSANDRASC-37

28 files changed:
.circleci/config.yml
common/build.gradle
common/src/main/java/org/apache/cassandra/sidecar/common/data/QualifiedTableName.java [new file with mode: 0644]
common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableComponent.java [new file with mode: 0644]
common/src/main/java/org/apache/cassandra/sidecar/common/data/StreamSSTableComponentRequest.java [new file with mode: 0644]
common/src/main/java/org/apache/cassandra/sidecar/common/utils/ValidationUtils.java [new file with mode: 0644]
common/src/test/java/org/apache/cassandra/sidecar/common/ValidationUtilsTest.java [new file with mode: 0644]
gradle.properties
src/main/java/org/apache/cassandra/sidecar/MainModule.java
src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadata.java
src/main/java/org/apache/cassandra/sidecar/cluster/instance/InstanceMetadataImpl.java
src/main/java/org/apache/cassandra/sidecar/models/ComponentInfo.java [deleted file]
src/main/java/org/apache/cassandra/sidecar/models/HttpResponse.java
src/main/java/org/apache/cassandra/sidecar/routes/FileStreamHandler.java [new file with mode: 0644]
src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponent.java [deleted file]
src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java [new file with mode: 0644]
src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java [new file with mode: 0644]
src/main/java/org/apache/cassandra/sidecar/utils/CachedFilePathBuilder.java [deleted file]
src/main/java/org/apache/cassandra/sidecar/utils/FilePathBuilder.java [deleted file]
src/main/java/org/apache/cassandra/sidecar/utils/FileStreamer.java
src/main/java/org/apache/cassandra/sidecar/utils/InstanceMetadataFetcher.java
src/test/java/org/apache/cassandra/sidecar/FilePathBuilderTest.java [deleted file]
src/test/java/org/apache/cassandra/sidecar/LoggerHandlerInjectionTest.java
src/test/java/org/apache/cassandra/sidecar/StreamSSTableComponentTest.java
src/test/java/org/apache/cassandra/sidecar/TestModule.java
src/test/java/org/apache/cassandra/sidecar/ThrottleTest.java
src/test/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java [new file with mode: 0644]
src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilderTest.java [new file with mode: 0644]

index 51a8555503730e462420976398bfa7f13b8a7a0f..f5350d14c128921988eec5f77a100fd017560dc4 100644 (file)
@@ -8,7 +8,7 @@ version: 2.1
 aliases:
   base_job: &base_job
     machine:
-      image: ubuntu-1604:202007-01
+      image: ubuntu-2004:202010-01
     working_directory: ~/repo
     environment:
       TERM: dumb
@@ -18,6 +18,7 @@ aliases:
     working_directory: ~/repo
     environment:
       TERM: dumb
+      TZ: "America/Los_Angeles"
 
 # we might modify this in the future to accept a parameter for the java package to install
 commands:
@@ -71,7 +72,7 @@ jobs:
       - checkout
       - install_common
       - install_kube
-      
+
       - install_java:
           version: adoptopenjdk-8-hotspot
       - run: sudo update-java-alternatives -s adoptopenjdk-8-hotspot-amd64 && java -version
@@ -141,9 +142,12 @@ jobs:
   rpm_build_install:
     <<: *centos
     steps:
+      - run: sed -i 's/mirrorlist/#mirrorlist/g' /etc/yum.repos.d/CentOS-*
+      - run: sed -i 's|#baseurl=http://mirror.centos.org|baseurl=http://vault.centos.org|g' /etc/yum.repos.d/CentOS-*
+      - run: dnf -qy distro-sync
+      - run: dnf -qy install java-11-openjdk git
       - checkout
-      - run: yum install -y java-11-openjdk-devel  # the image uses root by default, no need for sudo
-      - run: JAVA_HOME=/usr/lib/jvm/java-11-openjdk ./gradlew -i buildRpm
+      - run: JAVA_HOME=/usr/lib/jvm/java-11-openjdk-11.0.13.0.8-4.el8_5.x86_64 ${PWD}/gradlew -i buildRpm
       - run: yum install -y ./build/distributions/cassandra-sidecar*.rpm
       - run: test -f /opt/cassandra-sidecar/bin/cassandra-sidecar
 
index 653f9d24841c417d0110ab3ce07c7c67104685ad..10b24eece3975c76cd4c779192a4b8214c0dc0e8 100644 (file)
@@ -22,6 +22,7 @@ test {
 }
 
 dependencies {
+    compile "io.vertx:vertx-web:${project.vertxVersion}"
     compile 'org.slf4j:slf4j-api:1.7.25'
     compile 'ch.qos.logback:logback-core:1.2.3'
     compile 'ch.qos.logback:logback-classic:1.2.3'
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/QualifiedTableName.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/QualifiedTableName.java
new file mode 100644 (file)
index 0000000..bb2956f
--- /dev/null
@@ -0,0 +1,48 @@
+package org.apache.cassandra.sidecar.common.data;
+
+import org.apache.cassandra.sidecar.common.utils.ValidationUtils;
+
+/**
+ * Contains the keyspace and table name in Cassandra
+ */
+public class QualifiedTableName
+{
+    private final String keyspace;
+    private final String tableName;
+
+    /**
+     * Constructs a qualified name with the given {@code keyspace} and {@code tableName}
+     *
+     * @param keyspace  the keyspace in Cassandra
+     * @param tableName the table name in Cassandra
+     */
+    public QualifiedTableName(String keyspace, String tableName)
+    {
+        this.keyspace = ValidationUtils.validateKeyspaceName(keyspace);
+        this.tableName = ValidationUtils.validateTableName(tableName);
+    }
+
+    /**
+     * @return the keyspace in Cassandra
+     */
+    public String getKeyspace()
+    {
+        return keyspace;
+    }
+
+    /**
+     * @return the table name in Cassandra
+     */
+    public String getTableName()
+    {
+        return tableName;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String toString()
+    {
+        return keyspace + "." + tableName;
+    }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableComponent.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/SSTableComponent.java
new file mode 100644 (file)
index 0000000..ef9e701
--- /dev/null
@@ -0,0 +1,45 @@
+package org.apache.cassandra.sidecar.common.data;
+
+import org.apache.cassandra.sidecar.common.utils.ValidationUtils;
+
+/**
+ * Represents an SSTable component that includes a keyspace, table name and component name
+ */
+public class SSTableComponent extends QualifiedTableName
+{
+    private final String componentName;
+
+    /**
+     * Constructor for the holder class
+     *
+     * @param keyspace      the keyspace in Cassandra
+     * @param tableName     the table name in Cassandra
+     * @param componentName the name of the SSTable component
+     */
+    public SSTableComponent(String keyspace, String tableName, String componentName)
+    {
+        super(keyspace, tableName);
+        this.componentName = ValidationUtils.validateComponentName(componentName);
+    }
+
+    /**
+     * @return the name of the SSTable component
+     */
+    public String getComponentName()
+    {
+        return componentName;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public String toString()
+    {
+        return "SSTableComponent{" +
+               "keyspace='" + getKeyspace() + '\'' +
+               ", tableName='" + getTableName() + '\'' +
+               ", componentName='" + componentName + '\'' +
+               '}';
+    }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/data/StreamSSTableComponentRequest.java b/common/src/main/java/org/apache/cassandra/sidecar/common/data/StreamSSTableComponentRequest.java
new file mode 100644 (file)
index 0000000..c6d75fa
--- /dev/null
@@ -0,0 +1,47 @@
+package org.apache.cassandra.sidecar.common.data;
+
+import org.apache.cassandra.sidecar.common.utils.ValidationUtils;
+
+/**
+ * Holder class for the {@code org.apache.cassandra.sidecar.routes.StreamSSTableComponentHandler}
+ * request parameters
+ */
+public class StreamSSTableComponentRequest extends SSTableComponent
+{
+    private final String snapshotName;
+
+    /**
+     * Constructor for the holder class
+     *
+     * @param keyspace      the keyspace in Cassandra
+     * @param tableName     the table name in Cassandra
+     * @param snapshotName  the name of the snapshot
+     * @param componentName the name of the SSTable component
+     */
+    public StreamSSTableComponentRequest(String keyspace, String tableName, String snapshotName, String componentName)
+    {
+        super(keyspace, tableName, componentName);
+        this.snapshotName = ValidationUtils.validateSnapshotName(snapshotName);
+    }
+
+    /**
+     * @return the name of the snapshot
+     */
+    public String getSnapshotName()
+    {
+        return snapshotName;
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    public String toString()
+    {
+        return "StreamSSTableComponentRequest{" +
+               "keyspace='" + getKeyspace() + '\'' +
+               ", tableName='" + getTableName() + '\'' +
+               ", snapshot='" + snapshotName + '\'' +
+               ", componentName='" + getComponentName() + '\'' +
+               '}';
+    }
+}
diff --git a/common/src/main/java/org/apache/cassandra/sidecar/common/utils/ValidationUtils.java b/common/src/main/java/org/apache/cassandra/sidecar/common/utils/ValidationUtils.java
new file mode 100644 (file)
index 0000000..345f38c
--- /dev/null
@@ -0,0 +1,85 @@
+package org.apache.cassandra.sidecar.common.utils;
+
+import java.io.File;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.ext.web.handler.HttpException;
+import org.jetbrains.annotations.NotNull;
+
+/**
+ * Miscellaneous methods used for validation.
+ */
+public class ValidationUtils
+{
+    private static final Set<String> FORBIDDEN_DIRS = new HashSet<>(Arrays.asList("system_schema",
+                                                                                  "system_traces",
+                                                                                  "system_distributed",
+                                                                                  "system",
+                                                                                  "system_auth",
+                                                                                  "system_views",
+                                                                                  "system_virtual_schema"));
+    private static final String CHARS_ALLOWED_PATTERN = "[a-zA-Z0-9_-]+";
+    private static final Pattern PATTERN_WORD_CHARS = Pattern.compile(CHARS_ALLOWED_PATTERN);
+    private static final String REGEX_COMPONENT = CHARS_ALLOWED_PATTERN + "(.db|.cql|.json|.crc32|TOC.txt)";
+    private static final String REGEX_DB_TOC_COMPONENT = CHARS_ALLOWED_PATTERN + "(.db|TOC.txt)";
+
+    public static String validateKeyspaceName(final String keyspace)
+    {
+        Objects.requireNonNull(keyspace, "keyspace must not be null");
+        validatePattern(keyspace, "keyspace");
+        if (FORBIDDEN_DIRS.contains(keyspace))
+            throw new HttpException(HttpResponseStatus.FORBIDDEN.code(), "Forbidden keyspace: " + keyspace);
+        return keyspace;
+    }
+
+    public static String validateTableName(final String tableName)
+    {
+        Objects.requireNonNull(tableName, "tableName must not be null");
+        validatePattern(tableName, "table name");
+        return tableName;
+    }
+
+    public static String validateSnapshotName(final String snapshotName)
+    {
+        Objects.requireNonNull(snapshotName, "snapshotName must not be null");
+        //  most UNIX systems only disallow file separator and null characters for directory names
+        if (snapshotName.contains(File.separator) || snapshotName.contains("\0"))
+            throw new HttpException(HttpResponseStatus.BAD_REQUEST.code(),
+                                    "Invalid characters in snapshot name: " + snapshotName);
+        return snapshotName;
+    }
+
+    public static String validateComponentName(String componentName)
+    {
+        return validateComponentNameByRegex(componentName, REGEX_COMPONENT);
+    }
+
+    public static String validateDbOrTOCComponentName(String componentName)
+    {
+        return validateComponentNameByRegex(componentName, REGEX_DB_TOC_COMPONENT);
+    }
+
+    @NotNull
+    private static String validateComponentNameByRegex(String componentName, String regex)
+    {
+        Objects.requireNonNull(componentName, "componentName must not be null");
+        if (!componentName.matches(regex))
+            throw new HttpException(HttpResponseStatus.BAD_REQUEST.code(),
+                                    "Invalid component name: " + componentName);
+        return componentName;
+    }
+
+    private static void validatePattern(String input, String name)
+    {
+        final Matcher matcher = PATTERN_WORD_CHARS.matcher(input);
+        if (!matcher.matches())
+            throw new HttpException(HttpResponseStatus.BAD_REQUEST.code(),
+                                    "Invalid characters in " + name + ": " + input);
+    }
+}
diff --git a/common/src/test/java/org/apache/cassandra/sidecar/common/ValidationUtilsTest.java b/common/src/test/java/org/apache/cassandra/sidecar/common/ValidationUtilsTest.java
new file mode 100644 (file)
index 0000000..c262f86
--- /dev/null
@@ -0,0 +1,167 @@
+package org.apache.cassandra.sidecar.common;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.common.utils.ValidationUtils;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+/**
+ * Test validation methods.
+ */
+public class ValidationUtilsTest
+{
+
+    private void testCommon_invalidCharacters(String testName)
+    {
+        HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+        {
+            ValidationUtils.validateTableName(testName);
+        });
+        assertEquals(HttpResponseStatus.BAD_REQUEST.code(), httpEx.getStatusCode());
+        assertEquals("Invalid characters in table name: " + testName, httpEx.getPayload());
+    }
+
+    @Test
+    public void testValidateCharacters_validParams_expectNoException()
+    {
+        ValidationUtils.validateTableName("test_table_name");
+        ValidationUtils.validateTableName("test-table-name");
+        ValidationUtils.validateTableName("testTableName");
+    }
+
+    @Test
+    public void testValidateCharacters_paramWithColon_expectException()
+    {
+        testCommon_invalidCharacters("test:table_name");
+    }
+
+    @Test
+    public void testValidateCharacters_paramWithDollar_expectException()
+    {
+        testCommon_invalidCharacters("test-table$name");
+    }
+
+    @Test
+    public void testValidateCharacters_paramsWithSlash_expectException()
+    {
+        testCommon_invalidCharacters("testTable/Name");
+    }
+
+
+    @Test
+    public void testValidateKeyspaceName_validKeyspaceNames_expectNoException()
+    {
+        ValidationUtils.validateKeyspaceName("system-views");
+        ValidationUtils.validateKeyspaceName("SystemViews");
+        ValidationUtils.validateKeyspaceName("system_views_test");
+    }
+
+    @Test
+    public void testValidateKeyspaceName_forbiddenKeyspaceName_expectException()
+    {
+        String testKS = "system_views";
+        HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+        {
+            ValidationUtils.validateKeyspaceName(testKS);
+        });
+        assertEquals(HttpResponseStatus.FORBIDDEN.code(), httpEx.getStatusCode());
+        assertEquals("Forbidden keyspace: " + testKS, httpEx.getPayload());
+    }
+
+    @Test
+    public void testValidateKeyspaceName_keyspaceNameWithSpace_expectException()
+    {
+        String testKS = "test keyspace";
+        HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+        {
+            ValidationUtils.validateKeyspaceName(testKS);
+        });
+        assertEquals(HttpResponseStatus.BAD_REQUEST.code(), httpEx.getStatusCode());
+        assertEquals("Invalid characters in keyspace: " + testKS, httpEx.getPayload());
+    }
+
+
+    @Test
+    public void testValidateFileName_validFileNames_expectNoException()
+    {
+        ValidationUtils.validateComponentName("test-file-name.db");
+        ValidationUtils.validateComponentName("test_file_name.json");
+        ValidationUtils.validateComponentName("testFileName.cql");
+        ValidationUtils.validateComponentName("t_TOC.txt");
+        ValidationUtils.validateComponentName("crcfile.crc32");
+    }
+
+    private void testCommon_testInvalidFileName(String testFileName)
+    {
+        HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+        {
+            ValidationUtils.validateComponentName(testFileName);
+        });
+        assertEquals(HttpResponseStatus.BAD_REQUEST.code(), httpEx.getStatusCode());
+        assertEquals("Invalid component name: " + testFileName, httpEx.getPayload());
+    }
+
+    @Test
+    public void testValidateFileName_withoutExtension_expectException()
+    {
+        testCommon_testInvalidFileName("test-file-name");
+    }
+
+    @Test
+    public void testValidateFileName_incorrectExtension_expectException()
+    {
+        testCommon_testInvalidFileName("test-file-name.db1");
+    }
+
+    @Test
+    public void testValidateFileName_incorrectCrcExtension_expectException()
+    {
+        testCommon_testInvalidFileName("crcfile.crc64");
+    }
+
+    @Test
+    public void testValidateFileName_withoutFileName_expectException()
+    {
+        testCommon_testInvalidFileName("TOC.txt");
+    }
+
+
+    @Test
+    public void testValidateSnapshotName_validSnapshotNames_expectNoException()
+    {
+        ValidationUtils.validateSnapshotName("valid-snapshot-name");
+        ValidationUtils.validateSnapshotName("valid\\snapshot\\name"); // Is this really valid ??
+        ValidationUtils.validateSnapshotName("valid:snapshot:name");
+        ValidationUtils.validateSnapshotName("valid$snapshot$name");
+        ValidationUtils.validateSnapshotName("valid snapshot name");
+    }
+
+    @Test
+    public void testValidateSnapshotName_snapshotNameWithSlash_expectException()
+    {
+        String testSnapName = "valid" + '/' + "snapshotname";
+        HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+        {
+            ValidationUtils.validateSnapshotName(testSnapName);
+        });
+        assertEquals(HttpResponseStatus.BAD_REQUEST.code(), httpEx.getStatusCode());
+        assertEquals("Invalid characters in snapshot name: " + testSnapName, httpEx.getPayload());
+    }
+
+    @Test
+    public void testValidateSnapshotName_snapshotNameWithNullChar_expectException()
+    {
+        String testSnapName = "valid" + '\0' + "snapshotname";
+        HttpException httpEx = Assertions.assertThrows(HttpException.class, () ->
+        {
+            ValidationUtils.validateSnapshotName(testSnapName);
+        });
+        assertEquals(HttpResponseStatus.BAD_REQUEST.code(), httpEx.getStatusCode());
+        assertEquals("Invalid characters in snapshot name: " + testSnapName, httpEx.getPayload());
+    }
+
+}
index f004357b5ac58aa65114f56e703148ea9cc20f1c..7a32a308bdcd6221ae3d93fac09abb6227a610eb 100644 (file)
@@ -1,5 +1,5 @@
 version=1.0-SNAPSHOT
 junitVersion=5.4.2
 kubernetesClientVersion=9.0.0
-cassandra40Version=4.0.3
+cassandra40Version=4.0.4
 vertxVersion=4.2.1
index 6925eb8339514dd199b51991b27e269495f59723..2c914d5a9724319fccf7400851303278e2649044 100644 (file)
@@ -41,6 +41,7 @@ import com.google.inject.Provides;
 import com.google.inject.Singleton;
 import io.vertx.core.Vertx;
 import io.vertx.core.VertxOptions;
+import io.vertx.core.http.HttpMethod;
 import io.vertx.core.http.HttpServer;
 import io.vertx.core.http.HttpServerOptions;
 import io.vertx.core.net.JksOptions;
@@ -57,8 +58,9 @@ import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadataImpl;
 import org.apache.cassandra.sidecar.common.CQLSession;
 import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
 import org.apache.cassandra.sidecar.routes.CassandraHealthService;
+import org.apache.cassandra.sidecar.routes.FileStreamHandler;
 import org.apache.cassandra.sidecar.routes.HealthService;
-import org.apache.cassandra.sidecar.routes.StreamSSTableComponent;
+import org.apache.cassandra.sidecar.routes.StreamSSTableComponentHandler;
 import org.apache.cassandra.sidecar.routes.SwaggerOpenApiResource;
 import org.apache.cassandra.sidecar.utils.YAMLKeyConstants;
 import org.jboss.resteasy.plugins.server.vertx.VertxRegistry;
@@ -71,6 +73,7 @@ import org.jboss.resteasy.plugins.server.vertx.VertxResteasyDeployment;
 public class MainModule extends AbstractModule
 {
     private static final Logger logger = LoggerFactory.getLogger(MainModule.class);
+    private static final String V1_API_VERSION = "/api/v1";
 
     @Provides
     @Singleton
@@ -113,7 +116,6 @@ public class MainModule extends AbstractModule
     @Singleton
     private VertxRequestHandler configureServices(Vertx vertx,
                                                   HealthService healthService,
-                                                  StreamSSTableComponent ssTableComponent,
                                                   CassandraHealthService cassandraHealthService)
     {
         VertxResteasyDeployment deployment = new VertxResteasyDeployment();
@@ -122,7 +124,6 @@ public class MainModule extends AbstractModule
 
         r.addPerInstanceResource(SwaggerOpenApiResource.class);
         r.addSingletonResource(healthService);
-        r.addSingletonResource(ssTableComponent);
         r.addSingletonResource(cassandraHealthService);
 
         return new VertxRequestHandler(vertx, deployment);
@@ -130,7 +131,11 @@ public class MainModule extends AbstractModule
 
     @Provides
     @Singleton
-    public Router vertxRouter(Vertx vertx, LoggerHandler loggerHandler, ErrorHandler errorHandler)
+    public Router vertxRouter(Vertx vertx,
+                              StreamSSTableComponentHandler streamSSTableComponentHandler,
+                              FileStreamHandler fileStreamHandler,
+                              LoggerHandler loggerHandler,
+                              ErrorHandler errorHandler)
     {
         Router router = Router.router(vertx);
         router.route()
@@ -144,6 +149,21 @@ public class MainModule extends AbstractModule
         // Docs index.html page
         StaticHandler docs = StaticHandler.create("docs");
         router.route().path("/docs/*").handler(docs);
+
+        // add custom routers
+        final String componentRoute = "/keyspace/:keyspace/table/:table/snapshots/:snapshot/component/:component";
+        final String defaultStreamRoute = V1_API_VERSION + componentRoute;
+        final String instanceSpecificStreamRoute = V1_API_VERSION + "/instance/:instanceId" + componentRoute;
+        router.route().method(HttpMethod.GET)
+              .path(defaultStreamRoute)
+              .handler(streamSSTableComponentHandler::handleAllRequests)
+              .handler(fileStreamHandler);
+
+        router.route().method(HttpMethod.GET)
+              .path(instanceSpecificStreamRoute)
+              .handler(streamSSTableComponentHandler::handlePerInstanceRequests)
+              .handler(fileStreamHandler);
+
         return router;
     }
 
index 9081cb9d489cb01c23d0f189298542c6791ae8d1..2f531759ef231ed875dd06d28abb33030c8f94b3 100644 (file)
@@ -4,7 +4,6 @@ import java.util.List;
 
 import org.apache.cassandra.sidecar.common.CQLSession;
 import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
-import org.apache.cassandra.sidecar.utils.FilePathBuilder;
 
 /**
  * Metadata of an instance
@@ -40,9 +39,4 @@ public interface InstanceMetadata
      * Delegate specific for the instance.
      */
     CassandraAdapterDelegate delegate();
-
-    /**
-     * Maintain one path builder for one instance.
-     */
-    FilePathBuilder pathBuilder();
 }
index 987fdc236b1b6fb0dfa26e09b76c7e5841450c1d..8f3b56632c88e15bbedd8df9b95f7f49b325e4a3 100644 (file)
@@ -5,8 +5,6 @@ import java.util.List;
 import org.apache.cassandra.sidecar.common.CQLSession;
 import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
-import org.apache.cassandra.sidecar.utils.CachedFilePathBuilder;
-import org.apache.cassandra.sidecar.utils.FilePathBuilder;
 
 /**
  * Local implementation of InstanceMetadata.
@@ -19,7 +17,6 @@ public class InstanceMetadataImpl implements InstanceMetadata
     private final List<String> dataDirs;
     private final CQLSession session;
     private final CassandraAdapterDelegate delegate;
-    private final FilePathBuilder pathBuilder;
 
     public InstanceMetadataImpl(int id, String host, int port, List<String> dataDirs, CQLSession session,
                                 CassandraVersionProvider versionProvider, int healthCheckFrequencyMillis)
@@ -28,7 +25,6 @@ public class InstanceMetadataImpl implements InstanceMetadata
         this.host = host;
         this.port = port;
         this.dataDirs = dataDirs;
-        this.pathBuilder = new CachedFilePathBuilder(dataDirs);
 
         this.session = new CQLSession(host, port, healthCheckFrequencyMillis);
         this.delegate = new CassandraAdapterDelegate(versionProvider, session, healthCheckFrequencyMillis);
@@ -63,9 +59,4 @@ public class InstanceMetadataImpl implements InstanceMetadata
     {
         return delegate;
     }
-
-    public FilePathBuilder pathBuilder()
-    {
-        return pathBuilder;
-    }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/models/ComponentInfo.java b/src/main/java/org/apache/cassandra/sidecar/models/ComponentInfo.java
deleted file mode 100644 (file)
index 0588ce7..0000000
+++ /dev/null
@@ -1,40 +0,0 @@
-package org.apache.cassandra.sidecar.models;
-
-/**
- * Stores information needed to identify a SStable component.
- */
-public class ComponentInfo
-{
-    private final String keyspace;
-    private final String table;
-    private final String snapshot;
-    private final String component;
-
-    public ComponentInfo(String keyspace, String table, String snapshot, String component)
-    {
-        this.keyspace = keyspace;
-        this.table = table;
-        this.snapshot = snapshot;
-        this.component = component;
-    }
-
-    public String getKeyspace()
-    {
-        return keyspace;
-    }
-
-    public String getTable()
-    {
-        return table;
-    }
-
-    public String getSnapshot()
-    {
-        return snapshot;
-    }
-
-    public String getComponent()
-    {
-        return component;
-    }
-}
index 708221e2e3050b68e05e1af83088d24101c53855..ffeebf543eec8162f6ca8689b1997b55d0de92de 100644 (file)
@@ -1,24 +1,31 @@
 package org.apache.cassandra.sidecar.models;
 
-import java.io.File;
-
 import io.netty.handler.codec.http.HttpHeaderNames;
 import io.netty.handler.codec.http.HttpHeaderValues;
 import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.http.HttpHeaders;
+import io.vertx.core.http.HttpServerRequest;
 import io.vertx.core.http.HttpServerResponse;
+import io.vertx.core.net.SocketAddress;
 
 import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static org.apache.cassandra.sidecar.utils.RequestUtils.extractHostAddressWithoutPort;
 
 /**
  * Wrapper around HttpServerResponse
  */
 public class HttpResponse
 {
+    private final String host;
+    private final HttpServerRequest request;
     private final HttpServerResponse response;
 
-    public HttpResponse(HttpServerResponse response)
+    public HttpResponse(HttpServerRequest request, HttpServerResponse response)
     {
+        this.request = request;
         this.response = response;
+        this.host = extractHostAddressWithoutPort(request.host());
     }
 
     public void setRetryAfterHeader(long microsToWait)
@@ -63,26 +70,42 @@ public class HttpResponse
         response.setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).setStatusMessage(msg).end();
     }
 
-    public void sendFile(File file)
+    /**
+     * Send a range in a file asynchronously
+     *
+     * @param fileName   file to send
+     * @param fileLength the size of the file to send
+     * @param range      range to send
+     * @return a future completed with the body result
+     */
+    public Future<Void> sendFile(String fileName, long fileLength, Range range)
     {
-        response.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
-                .putHeader(HttpHeaderNames.CONTENT_LENGTH, Long.toString(file.length()))
-                .sendFile(file.getAbsolutePath());
-    }
+        // notify client we support range requests
+        response.putHeader(HttpHeaders.ACCEPT_RANGES, "bytes");
 
-    public void sendFile(File file, Range range)
-    {
-        if (range.length() != file.length())
+        if (range.length() != fileLength)
         {
             setPartialContentStatus(range);
         }
-        response.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
-                .putHeader(HttpHeaderNames.CONTENT_LENGTH, Long.toString(range.length()))
-                .sendFile(file.getAbsolutePath(), range.start(), range.length());
+
+        return response.putHeader(HttpHeaderNames.CONTENT_TYPE, HttpHeaderValues.APPLICATION_OCTET_STREAM)
+                       .putHeader(HttpHeaderNames.CONTENT_LENGTH, Long.toString(range.length()))
+                       .sendFile(fileName, range.start(), range.length());
+    }
+
+    /**
+     * @return the remote address for this connection, possibly {@code null} (e.g a server bound on a domain socket).
+     */
+    public SocketAddress remoteAddress()
+    {
+        return request.remoteAddress();
     }
 
-    public void setForbiddenStatus(String msg)
+    /**
+     * @return the request host without the port
+     */
+    public String host()
     {
-        response.setStatusCode(HttpResponseStatus.FORBIDDEN.code()).setStatusMessage(msg).end();
+        return host;
     }
 }
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/FileStreamHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/FileStreamHandler.java
new file mode 100644 (file)
index 0000000..b0c02b8
--- /dev/null
@@ -0,0 +1,95 @@
+package org.apache.cassandra.sidecar.routes;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import io.vertx.core.Future;
+import io.vertx.core.Handler;
+import io.vertx.core.file.FileProps;
+import io.vertx.core.file.FileSystem;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.models.HttpResponse;
+import org.apache.cassandra.sidecar.utils.FileStreamer;
+
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
+import static org.apache.cassandra.sidecar.utils.RequestUtils.extractHostAddressWithoutPort;
+
+/**
+ * Handler for sending out files.
+ */
+public class FileStreamHandler implements Handler<RoutingContext>
+{
+    private static final Logger LOGGER = LoggerFactory.getLogger(FileStreamHandler.class);
+    public static final String FILE_PATH_CONTEXT_KEY = "fileToTransfer";
+
+    private final FileStreamer fileStreamer;
+
+    @Inject
+    public FileStreamHandler(final FileStreamer fileStreamer)
+    {
+        this.fileStreamer = fileStreamer;
+    }
+
+    @Override
+    public void handle(RoutingContext context)
+    {
+        final String localFile = context.get(FILE_PATH_CONTEXT_KEY);
+        final HttpServerRequest request = context.request();
+        final String host = extractHostAddressWithoutPort(request.host());
+
+        LOGGER.debug("FileStreamHandler handle file transfer '{}' for client: {}. Instance: {}", localFile,
+                     request.remoteAddress(), host);
+
+        FileSystem fs = context.vertx().fileSystem();
+        fs.exists(localFile)
+          .compose(exists -> ensureValidFile(fs, localFile, exists))
+          .compose(fileProps -> fileStreamer.stream(new HttpResponse(request, context.response()), localFile,
+                                                    fileProps.size(), request.getHeader(HttpHeaderNames.RANGE)))
+          .onSuccess(v -> LOGGER.debug("Completed streaming file '{}'", localFile))
+          .onFailure(context::fail);
+    }
+
+    /**
+     * Ensures that the file exists and is a non-empty regular file
+     *
+     * @param fs        The underlying filesystem
+     * @param localFile The path the file in the filesystem
+     * @param exists    Whether the file exists or not
+     * @return a succeeded future with the {@link FileProps}, or a failed future if the file does not exist;
+     * is not a regular file; or if the file is empty
+     */
+    private Future<FileProps> ensureValidFile(FileSystem fs, String localFile, Boolean exists)
+    {
+        if (!exists)
+        {
+            LOGGER.error("The requested file '{}' does not exist", localFile);
+            return Future.failedFuture(new HttpException(NOT_FOUND.code(), "The requested file does not exist"));
+        }
+
+        return fs.props(localFile)
+                 .compose(fileProps ->
+                  {
+                     if (fileProps == null || !fileProps.isRegularFile())
+                     {
+                         // File is not a regular file
+                         LOGGER.error("The requested file '{}' does not exist", localFile);
+                         return Future.failedFuture(new HttpException(NOT_FOUND.code(),
+                                                                      "The requested file does not exist"));
+                     }
+
+                     if (fileProps.size() <= 0)
+                     {
+                         LOGGER.error("The requested file '{}' has 0 size", localFile);
+                         return Future.failedFuture(new HttpException(REQUESTED_RANGE_NOT_SATISFIABLE.code(),
+                                                                      "The requested file is empty"));
+                     }
+
+                     return Future.succeededFuture(fileProps);
+                 });
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponent.java b/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponent.java
deleted file mode 100644 (file)
index dc4596e..0000000
+++ /dev/null
@@ -1,131 +0,0 @@
-package org.apache.cassandra.sidecar.routes;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.nio.file.Path;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Set;
-import java.util.regex.Pattern;
-import javax.ws.rs.GET;
-import javax.ws.rs.HeaderParam;
-import javax.ws.rs.PathParam;
-import javax.ws.rs.core.Context;
-
-import com.google.inject.Inject;
-import com.google.inject.Singleton;
-import io.vertx.core.http.HttpServerRequest;
-import io.vertx.core.http.HttpServerResponse;
-import org.apache.cassandra.sidecar.models.ComponentInfo;
-import org.apache.cassandra.sidecar.models.HttpResponse;
-import org.apache.cassandra.sidecar.models.Range;
-import org.apache.cassandra.sidecar.utils.FilePathBuilder;
-import org.apache.cassandra.sidecar.utils.FileStreamer;
-import org.apache.cassandra.sidecar.utils.InstanceMetadataFetcher;
-
-import static org.apache.cassandra.sidecar.utils.RequestUtils.extractHostAddressWithoutPort;
-
-/**
- * Handler for serving SSTable components from snapshot folders
- */
-@Singleton
-@javax.ws.rs.Path("/api")
-public class StreamSSTableComponent
-{
-    private static final Pattern REGEX_DIR = Pattern.compile("[a-zA-Z0-9_-]+");
-    private static final Pattern REGEX_COMPONENT = Pattern.compile("[a-zA-Z0-9_-]+(.db|.cql|.json|.crc32|TOC.txt)");
-    private static final Set<String> FORBIDDEN_DIRS = new HashSet<>(
-            Arrays.asList("system_schema", "system_traces", "system_distributed", "system", "system_auth"));
-
-    private final InstanceMetadataFetcher metadataFetcher;
-    private final FileStreamer fileStreamer;
-
-    @Inject
-    public StreamSSTableComponent(final InstanceMetadataFetcher metadataFetcher, final FileStreamer fileStreamer)
-    {
-        this.metadataFetcher = metadataFetcher;
-        this.fileStreamer = fileStreamer;
-    }
-
-    @GET
-    @javax.ws.rs.Path("/v1/stream/keyspace/{keyspace}/table/{table}/snapshot/{snapshot}/component/{component}")
-    public void streamFromFirstInstance(@PathParam("keyspace") String keyspace, @PathParam("table") String table,
-                                        @PathParam("snapshot") String snapshot,
-                                        @PathParam("component") String component, @HeaderParam("Range") String range,
-                                        @Context HttpServerResponse resp, @Context HttpServerRequest req)
-    {
-        final String host = extractHostAddressWithoutPort(req.host());
-        stream(new ComponentInfo(keyspace, table, snapshot, component), range, null, host, resp);
-    }
-
-    @GET
-    @javax.ws.rs.Path
-    ("/v1/stream/instance/{instanceId}/keyspace/{keyspace}/table/{table}/snapshot/{snapshot}/component/{component}")
-    public void streamFromSpecificInstance(@PathParam("keyspace") String keyspace, @PathParam("table") String table,
-                                           @PathParam("snapshot") String snapshot,
-                                           @PathParam("component") String component,
-                                           @PathParam("instanceId") Integer instanceId,
-                                           @HeaderParam("Range") String range, @Context HttpServerResponse resp)
-    {
-        stream(new ComponentInfo(keyspace, table, snapshot, component), range, instanceId, null, resp);
-    }
-
-    private void stream(ComponentInfo componentInfo, String range, Integer instanceId,
-                        String host, HttpServerResponse resp)
-    {
-        final HttpResponse response = new HttpResponse(resp);
-        if (FORBIDDEN_DIRS.contains(componentInfo.getKeyspace()))
-        {
-            response.setForbiddenStatus(componentInfo.getKeyspace() + " keyspace is forbidden");
-            return;
-        }
-        if (!arePathParamsValid(componentInfo))
-        {
-            response.setBadRequestStatus("Invalid path params found");
-            return;
-        }
-
-        final Path path;
-        final FilePathBuilder pathBuilder = instanceId == null
-                                            ? metadataFetcher.getPathBuilder(host)
-                                            : metadataFetcher.getPathBuilder(instanceId);
-        try
-        {
-            path = pathBuilder.build(componentInfo.getKeyspace(), componentInfo.getTable(),
-                                     componentInfo.getSnapshot(), componentInfo.getComponent());
-        }
-        catch (FileNotFoundException e)
-        {
-            response.setNotFoundStatus(e.getMessage());
-            return;
-        }
-        final File file = path.toFile();
-        final Range r;
-        try
-        {
-            r = parseRangeHeader(range, file.length());
-        }
-        catch (Exception e)
-        {
-            response.setRangeNotSatisfiable(e.getMessage());
-            return;
-        }
-        fileStreamer.stream(response, file, r);
-    }
-
-    private boolean arePathParamsValid(ComponentInfo componentInfo)
-    {
-        return REGEX_DIR.matcher(componentInfo.getKeyspace()).matches()
-               && REGEX_DIR.matcher(componentInfo.getTable()).matches()
-               && REGEX_DIR.matcher(componentInfo.getSnapshot()).matches()
-               && REGEX_COMPONENT.matcher(componentInfo.getComponent()).matches();
-    }
-
-    private Range parseRangeHeader(String rangeHeader, long fileSize)
-    {
-        final Range fileRange = new Range(0, fileSize - 1, fileSize);
-        // sidecar does not support multiple ranges as of now
-        final Range headerRange = Range.parseHeader(rangeHeader, fileSize);
-        return fileRange.intersect(headerRange);
-    }
-}
diff --git a/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java b/src/main/java/org/apache/cassandra/sidecar/routes/StreamSSTableComponentHandler.java
new file mode 100644 (file)
index 0000000..517baac
--- /dev/null
@@ -0,0 +1,99 @@
+package org.apache.cassandra.sidecar.routes;
+
+import java.io.FileNotFoundException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.http.HttpServerRequest;
+import io.vertx.core.net.SocketAddress;
+import io.vertx.ext.web.RoutingContext;
+import io.vertx.ext.web.handler.HttpException;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.common.data.StreamSSTableComponentRequest;
+import org.apache.cassandra.sidecar.snapshots.SnapshotPathBuilder;
+
+import static org.apache.cassandra.sidecar.utils.RequestUtils.extractHostAddressWithoutPort;
+
+/**
+ * This handler validates that the component exists in the cluster and sets up the context
+ * for the {@link FileStreamHandler} to stream the component back to the client
+ */
+@Singleton
+public class StreamSSTableComponentHandler
+{
+    private static final Logger logger = LoggerFactory.getLogger(StreamSSTableComponentHandler.class);
+
+    private final SnapshotPathBuilder snapshotPathBuilder;
+    private final InstancesConfig instancesConfig;
+
+    @Inject
+    public StreamSSTableComponentHandler(SnapshotPathBuilder snapshotPathBuilder, InstancesConfig instancesConfig)
+    {
+        this.snapshotPathBuilder = snapshotPathBuilder;
+        this.instancesConfig = instancesConfig;
+    }
+
+    public void handleAllRequests(RoutingContext context)
+    {
+        final HttpServerRequest request = context.request();
+        final String host = extractHostAddressWithoutPort(request.host());
+        streamFilesForHost(host, context);
+    }
+
+    public void handlePerInstanceRequests(RoutingContext context)
+    {
+        final String instanceIdParam = context.request().getParam("InstanceId");
+        if (instanceIdParam == null)
+        {
+            context.fail(new HttpException(HttpResponseStatus.BAD_REQUEST.code(),
+                                           "InstanceId path parameter must be provided"));
+            return;
+        }
+
+        final Integer instanceId = Integer.valueOf(instanceIdParam);
+        final String host = instancesConfig.instanceFromId(instanceId).host();
+        streamFilesForHost(host, context);
+    }
+
+    public void streamFilesForHost(String host, RoutingContext context)
+    {
+        final SocketAddress remoteAddress = context.request().remoteAddress();
+        final StreamSSTableComponentRequest requestParams = extractParamsOrThrow(context);
+        logger.info("StreamSSTableComponentHandler received request: {} from: {}. Instance: {}", requestParams,
+                    remoteAddress, host);
+
+        snapshotPathBuilder.build(host, requestParams)
+                           .onSuccess(path ->
+               {
+                   logger.debug("StreamSSTableComponentHandler handled {} for client {}. Instance: {}", path,
+                                remoteAddress, host);
+                   context.put(FileStreamHandler.FILE_PATH_CONTEXT_KEY, path)
+                          .next();
+               })
+                           .onFailure(cause ->
+               {
+                   if (cause instanceof FileNotFoundException)
+                   {
+                       context.fail(new HttpException(HttpResponseStatus.NOT_FOUND.code(), cause.getMessage()));
+                   }
+                   else
+                   {
+                       context.fail(new HttpException(HttpResponseStatus.BAD_REQUEST.code(),
+                                                      "Invalid request for " + requestParams));
+                   }
+               });
+    }
+
+    private StreamSSTableComponentRequest extractParamsOrThrow(final RoutingContext rc)
+    {
+        return new StreamSSTableComponentRequest(rc.pathParam("keyspace"),
+                                                 rc.pathParam("table"),
+                                                 rc.pathParam("snapshot"),
+                                                 rc.pathParam("component")
+        );
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilder.java
new file mode 100644 (file)
index 0000000..8cb4a55
--- /dev/null
@@ -0,0 +1,279 @@
+package org.apache.cassandra.sidecar.snapshots;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.inject.Inject;
+import com.google.inject.Singleton;
+import io.vertx.core.CompositeFuture;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.file.FileProps;
+import io.vertx.core.file.FileSystem;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.common.data.StreamSSTableComponentRequest;
+import org.apache.cassandra.sidecar.common.utils.ValidationUtils;
+
+/**
+ * This class builds the snapshot path on a given host validating that it exists
+ */
+@Singleton
+public class SnapshotPathBuilder
+{
+    private static final Logger logger = LoggerFactory.getLogger(SnapshotPathBuilder.class);
+    private static final String DATA_SUB_DIR = "/data";
+    public static final String SNAPSHOTS_DIR_NAME = "snapshots";
+    protected final FileSystem fs;
+    protected final InstancesConfig instancesConfig;
+
+    /**
+     * Creates a new SnapshotPathBuilder for snapshots of an instance with the given {@code fs filesystem} and
+     * {@code instancesConfig Cassandra configuration}.
+     *
+     * @param fs              the underlying filesystem
+     * @param instancesConfig the configuration for Cassandra
+     */
+    @Inject
+    public SnapshotPathBuilder(FileSystem fs, InstancesConfig instancesConfig)
+    {
+        this.fs = fs;
+        this.instancesConfig = instancesConfig;
+    }
+
+    /**
+     * Builds the path to the given component given the {@code keyspace}, {@code table}, and {@code snapshot}
+     * inside the specified {@code host}. When a table has been dropped and recreated, the code searches for
+     * the latest modified directory for that table.
+     *
+     * @param host    the name of the host
+     * @param request the request to stream the SSTable component
+     * @return the absolute path of the component
+     */
+    public Future<String> build(String host, StreamSSTableComponentRequest request)
+    {
+        validate(request);
+        // Search for the file
+        return getDataDirectories(host)
+               .compose(dataDirs -> findKeyspaceDirectory(dataDirs, request.getKeyspace()))
+               .compose(keyspaceDirectory -> findTableDirectory(keyspaceDirectory, request.getTableName()))
+               .compose(tableDirectory -> findComponent(tableDirectory, request.getSnapshotName(),
+                                                        request.getComponentName()));
+    }
+
+    /**
+     * Validates that the component name is either {@code *.db} or a {@code *-TOC.txt}
+     * which are the only required components to read SSTables.
+     *
+     * @param request the request to stream the SSTable component
+     */
+    protected void validate(StreamSSTableComponentRequest request)
+    {
+        // Only allow .db and TOC.txt components here
+        ValidationUtils.validateDbOrTOCComponentName(request.getComponentName());
+    }
+
+    /**
+     * @param host the host
+     * @return the data directories for the given {@code host}
+     */
+    protected Future<List<String>> getDataDirectories(String host)
+    {
+        List<String> dataDirs = instancesConfig.instanceFromHost(host).dataDirs();
+        if (dataDirs == null || dataDirs.isEmpty())
+        {
+            logger.error("No data directories are available for host '{}'", host);
+            String errMsg = String.format("No data directories are available for host '%s'", host);
+            return Future.failedFuture(new FileNotFoundException(errMsg));
+        }
+        return Future.succeededFuture(dataDirs);
+    }
+
+    /**
+     * Searches in the list of {@code daraDirs} for the given {@code keyspace} and returns the directory
+     * of the keyspace when it is found, or failure when the {@code keyspace} directory does not exist. If
+     * one of the data directories does not exist, a failure will be reported.
+     *
+     * @param dataDirs the list of data directories for a given host
+     * @param keyspace the name of the Cassandra keyspace
+     * @return the directory of the keyspace when it is found, or failure if not found
+     */
+    protected Future<String> findKeyspaceDirectory(List<String> dataDirs, String keyspace)
+    {
+        List<Future<String>> candidates = buildPotentialKeyspaceDirectoryList(dataDirs, keyspace);
+        // We want to find the first valid directory in this case. If a future fails, we
+        // recover by checking each candidate for existence.
+        // Whenever the first successful future returns, we short-circuit the rest.
+        Future<String> root = candidates.get(0);
+        for (int i = 1; i < candidates.size(); i++)
+        {
+            Future<String> f = candidates.get(i);
+            root = root.recover(v -> f);
+        }
+        String errMsg = String.format("Keyspace '%s' does not exist", keyspace);
+        return root.recover(t -> Future.failedFuture(new FileNotFoundException(errMsg)));
+    }
+
+    /**
+     * Builds a list of potential directory lists for the keyspace
+     *
+     * @param dataDirs the list of directories
+     * @param keyspace the Cassandra keyspace
+     * @return a list of potential directories for the keyspace
+     */
+    private List<Future<String>> buildPotentialKeyspaceDirectoryList(List<String> dataDirs, String keyspace)
+    {
+        List<Future<String>> candidates = new ArrayList<>(dataDirs.size() * 2);
+        for (String baseDirectory : dataDirs)
+        {
+            String dir = StringUtils.removeEnd(baseDirectory, File.separator);
+            candidates.add(isValidDirectory(dir + File.separator + keyspace));
+            candidates.add(isValidDirectory(dir + DATA_SUB_DIR + File.separator + keyspace));
+        }
+        return candidates;
+    }
+
+    /**
+     * Finds the most recent directory for the given {@code tableName} in the {@code baseDirectory}. Cassandra
+     * appends the table UUID when a table is created. When a table is dropped and then recreated, a new directory
+     * with the new table UUID is created. For that reason we need to return the most recent directory for the
+     * given table name.
+     *
+     * @param baseDirectory the base directory where we search the table directory
+     * @param tableName     the name of the table
+     * @return the most recent directory for the given {@code tableName} in the {@code baseDirectory}
+     */
+    protected Future<String> findTableDirectory(String baseDirectory, String tableName)
+    {
+        return fs.readDir(baseDirectory, tableName + "($|-.*)") // match exact table name or table-.*
+                 .compose(list -> getLastModifiedTableDirectory(list, tableName));
+    }
+
+    /**
+     * Constructs the path to the component using the {@code baseDirectory}, {@code snapshotName}, and
+     * {@code componentName} and returns if it is a valid path to the component, or a failure otherwise.
+     *
+     * @param baseDirectory the base directory where we search the table directory
+     * @param snapshotName  the name of the snapshot
+     * @param componentName the name of the component
+     * @return the path to the component if it's valid, a failure otherwise
+     */
+    protected Future<String> findComponent(String baseDirectory, String snapshotName, String componentName)
+    {
+        String componentFilename = StringUtils.removeEnd(baseDirectory, File.separator) +
+                                   File.separator + SNAPSHOTS_DIR_NAME + File.separator + snapshotName +
+                                   File.separator + componentName;
+
+        return isValidFilename(componentFilename)
+               .recover(t ->
+               {
+                   logger.warn("Snapshot directory {} or component {} does not exist in {}", snapshotName,
+                               componentName, componentFilename);
+                   String errMsg = String.format("Component '%s' does not exist for snapshot '%s'",
+                                                 componentName, snapshotName);
+                   return Future.failedFuture(new FileNotFoundException(errMsg));
+               });
+    }
+
+    /**
+     * @param filename the path to the file
+     * @return a future of the {@code filename} if it exists and is a regular file, a failed future otherwise
+     */
+    protected Future<String> isValidFilename(String filename)
+    {
+        return isValidOfType(filename, FileProps::isRegularFile);
+    }
+
+    /**
+     * @param path the path to the directory
+     * @return a future of the {@code path} if it exists and is a directory, a failed future otherwise
+     */
+    protected Future<String> isValidDirectory(String path)
+    {
+        return isValidOfType(path, FileProps::isDirectory);
+    }
+
+    /**
+     * @param filename  the path
+     * @param predicate a predicate that evaluates based on {@link FileProps}
+     * @return a future of the {@code filename} if it exists and {@code predicate} evaluates to true,
+     * a failed future otherwise
+     */
+    protected Future<String> isValidOfType(String filename, Predicate<FileProps> predicate)
+    {
+        return fs.exists(filename)
+                 .compose(exists ->
+                 {
+                     if (!exists)
+                     {
+                         String errMsg = "File '" + filename + "' does not exist";
+                         return Future.failedFuture(new FileNotFoundException(errMsg));
+                     }
+                     return fs.props(filename)
+                              .compose(fileProps ->
+                              {
+                                  if (fileProps == null || !predicate.test(fileProps))
+                                  {
+                                      String errMsg = "File '" + filename + "' does not exist";
+                                      return Future.failedFuture(new FileNotFoundException(errMsg));
+                                  }
+                                  return Future.succeededFuture(filename);
+                              });
+                 });
+    }
+
+    /**
+     * @param fileList  a list of files
+     * @param tableName the name of the Cassandra table
+     * @return a future with the last modified directory from the list, or a failed future when there are no directories
+     */
+    protected Future<String> getLastModifiedTableDirectory(List<String> fileList, String tableName)
+    {
+        if (fileList.size() == 0)
+        {
+            String errMsg = String.format("Table '%s' does not exist", tableName);
+            return Future.failedFuture(new FileNotFoundException(errMsg));
+        }
+
+        //noinspection rawtypes
+        List<Future> futures = fileList.stream()
+                                       .map(fs::props)
+                                       .collect(Collectors.toList());
+
+        Promise<String> promise = Promise.promise();
+        CompositeFuture.all(futures)
+                       .onFailure(promise::fail)
+                       .onSuccess(ar ->
+                       {
+                           String directory = IntStream.range(0, fileList.size())
+                                                       .mapToObj(i -> Pair.of(fileList.get(i),
+                                                                              ar.<FileProps>resultAt(i)))
+                                                       .filter(pair -> pair.getRight().isDirectory())
+                                                       .max(Comparator.comparingLong(pair -> pair.getRight()
+                                                                                                 .lastModifiedTime()))
+                                                       .map(Pair::getLeft)
+                                                       .orElse(null);
+
+                           if (directory == null)
+                           {
+                               String errMsg = String.format("Table '%s' does not exist", tableName);
+                               promise.fail(new FileNotFoundException(errMsg));
+                           }
+                           else
+                           {
+                               promise.complete(directory);
+                           }
+                       });
+        return promise.future();
+    }
+}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/CachedFilePathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/utils/CachedFilePathBuilder.java
deleted file mode 100644 (file)
index 6468f9b..0000000
+++ /dev/null
@@ -1,217 +0,0 @@
-package org.apache.cassandra.sidecar.utils;
-
-import java.io.FileNotFoundException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.security.KeyException;
-import java.util.Collection;
-import java.util.Objects;
-import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.CacheLoader;
-import com.google.common.cache.LoadingCache;
-import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.inject.Inject;
-
-/**
- * Path builder that caches intermediate paths
- */
-public class CachedFilePathBuilder extends FilePathBuilder
-{
-    private static final Logger logger = LoggerFactory.getLogger(CachedFilePathBuilder.class);
-    private final CacheLoader<Key, String> loader = new PathCacheLoader();
-    private final LoadingCache<Key, String> sstableCache = getCacheBuilder();
-    private final LoadingCache<Key, String> snapshotCache = getCacheBuilder();
-    private final LoadingCache<Key, String> tableCache = getCacheBuilder();
-    private final LoadingCache<Key, String> keyspaceCache = getCacheBuilder();
-
-    private LoadingCache<Key, String> getCacheBuilder()
-    {
-        return CacheBuilder.newBuilder().maximumSize(10000).refreshAfterWrite(5, TimeUnit.MINUTES).build(loader);
-    }
-
-    @Inject
-    public CachedFilePathBuilder(final Collection<String> dataDirs)
-    {
-        super(dataDirs);
-    }
-
-    public Path build(String keyspace, String table, String snapshot, String component) throws FileNotFoundException
-    {
-        try
-        {
-            return Paths.get(sstableCache.get(new Key.Builder().setKeyspace(keyspace).setTable(table)
-                    .setSnapshot(snapshot).setComponent(component).build()));
-        }
-        catch (Throwable t)
-        {
-            if (ExceptionUtils.getRootCause(t) instanceof FileNotFoundException)
-            {
-                throw (FileNotFoundException) ExceptionUtils.getRootCause(t);
-            }
-            else
-            {
-                logger.error("Unexpected error while building path ", t);
-                throw new RuntimeException("Error loading value from path cache");
-            }
-        }
-    }
-
-    /**
-     * Cache Loader for guava cache storing path to files
-     */
-    public class PathCacheLoader extends CacheLoader<Key, String>
-    {
-        @Override
-        public String load(Key key) throws FileNotFoundException, KeyException, ExecutionException
-        {
-            switch (key.type())
-            {
-                case KEYSPACE_TABLE_SNAPSHOT_COMPONENT:
-                    return addSSTableComponentToPath(key.component(), snapshotCache.get(new Key.Builder()
-                            .setKeyspace(key.keyspace()).setTable(key.table()).setSnapshot(key.snapshot()).build()));
-                case KEYSPACE_TABLE_SNAPSHOT:
-                    return addSnapshotToPath(key.snapshot(), tableCache.get(new Key.Builder()
-                            .setKeyspace(key.keyspace()).setTable(key.table()).build()));
-                case KEYSPACE_TABLE:
-                    return addTableToPath(key.table(), keyspaceCache.get(new Key.Builder().setKeyspace(key.keyspace())
-                            .build()));
-                case JUST_KEYSPACE:
-                    return addKeyspaceToPath(key.keyspace());
-                default:
-                    throw new KeyException();
-            }
-        }
-    }
-
-    /**
-     * Key to retrieve path information from cache
-     */
-    public static class Key
-    {
-        private final String keyspace;
-        private final String table;
-        private final String snapshot;
-        private final String component;
-        private final KeyType type;
-
-        private Key(String keyspace, String table, String snapshot, String component, KeyType type)
-        {
-            this.keyspace = keyspace;
-            this.table = table;
-            this.snapshot = snapshot;
-            this.component = component;
-            this.type = type;
-        }
-
-        public String keyspace() throws KeyException
-        {
-            return Optional.ofNullable(keyspace).orElseThrow(KeyException::new);
-        }
-
-        public String table() throws KeyException
-        {
-            return Optional.ofNullable(table).orElseThrow(KeyException::new);
-        }
-
-        public String snapshot() throws KeyException
-        {
-            return Optional.ofNullable(snapshot).orElseThrow(KeyException::new);
-        }
-
-        public String component() throws KeyException
-        {
-            return Optional.ofNullable(component).orElseThrow(KeyException::new);
-        }
-
-        public KeyType type()
-        {
-            return type;
-        }
-
-        @Override
-        public boolean equals(Object o)
-        {
-            if (this == o)
-            {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass())
-            {
-                return false;
-            }
-            Key key = (Key) o;
-            return type == key.type &&
-                   Objects.equals(keyspace, key.keyspace) &&
-                   Objects.equals(table, key.table) &&
-                   Objects.equals(snapshot, key.snapshot) &&
-                   Objects.equals(component, key.component);
-        }
-
-        @Override
-        public int hashCode()
-        {
-            return Objects.hash(keyspace, table, snapshot, component, type);
-        }
-
-        /**
-         * Builder class for Key
-         */
-        public static class Builder
-        {
-            private String keyspace;
-            private String table;
-            private String snapshot;
-            private String component;
-            private int length;
-
-            public Builder setKeyspace(String keyspace)
-            {
-                length++;
-                this.keyspace = keyspace;
-                return this;
-            }
-
-            public Builder setTable(String table)
-            {
-                length++;
-                this.table = table;
-                return this;
-            }
-
-            public Builder setSnapshot(String snapshot)
-            {
-                length++;
-                this.snapshot = snapshot;
-                return this;
-            }
-
-            public Builder setComponent(String component)
-            {
-                length++;
-                this.component = component;
-                return this;
-            }
-
-            public CachedFilePathBuilder.Key build()
-            {
-                return new CachedFilePathBuilder.Key(keyspace, table, snapshot, component,
-                        KeyType.values()[length - 1]);
-            }
-        }
-    }
-
-    /**
-     * Enum to hold types of keys created
-     */
-    public enum KeyType
-    {
-        JUST_KEYSPACE, KEYSPACE_TABLE, KEYSPACE_TABLE_SNAPSHOT, KEYSPACE_TABLE_SNAPSHOT_COMPONENT
-    }
-}
diff --git a/src/main/java/org/apache/cassandra/sidecar/utils/FilePathBuilder.java b/src/main/java/org/apache/cassandra/sidecar/utils/FilePathBuilder.java
deleted file mode 100644 (file)
index 334f58e..0000000
+++ /dev/null
@@ -1,145 +0,0 @@
-package org.apache.cassandra.sidecar.utils;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.Collection;
-
-import javax.validation.constraints.NotNull;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Builds file path after verifying it exists
- */
-public abstract class FilePathBuilder
-{
-    private static final Logger logger = LoggerFactory.getLogger(FilePathBuilder.class);
-    private static final String dataSubDir = "/data";
-    private final Collection<String> dataDirs;
-
-    public FilePathBuilder(@NotNull final Collection<String> dataDirs)
-    {
-        this.dataDirs = dataDirs;
-    }
-
-    public abstract Path build(String keyspace, String table, String snapshot, String component)
-            throws FileNotFoundException;
-
-    String addKeyspaceToPath(String keyspace) throws FileNotFoundException
-    {
-        for (String dir : dataDirs)
-        {
-            StringBuilder path = new StringBuilder(dir);
-            if (addFileToPathIfPresent(path, keyspace, true))
-            {
-                return path.toString();
-            }
-
-            if (dir.endsWith(dataSubDir))
-            {
-                continue;
-            }
-
-            // check in "data" sub directory
-            if (addFileToPathIfPresent(path.append(dataSubDir), keyspace, true))
-            {
-                return path.toString();
-            }
-        }
-        throw new FileNotFoundException("Keyspace " + keyspace + " does not exist");
-    }
-
-    String addTableToPath(String table, String path) throws FileNotFoundException
-    {
-        final StringBuilder modifiedPath = new StringBuilder(path);
-        if (addFileToPathIfPresent(modifiedPath, table, false))
-        {
-            return modifiedPath.toString();
-        }
-        throw new FileNotFoundException("Table " + table + " not found, path searched: " + path);
-    }
-
-    String addSnapshotToPath(String snapshot, String path) throws FileNotFoundException
-    {
-        final StringBuilder modifiedPath = new StringBuilder(path);
-        if (addFileToPathIfPresent(modifiedPath.append("/snapshots"), snapshot, true))
-        {
-            return modifiedPath.toString();
-        }
-        throw new FileNotFoundException("Snapshot " + snapshot + " not found, path searched: " + path);
-    }
-
-    String addSSTableComponentToPath(String component, String path) throws FileNotFoundException
-    {
-        final StringBuilder modifiedPath = new StringBuilder(path);
-        if (addFileToPathIfPresent(modifiedPath, component, true))
-        {
-            return modifiedPath.toString();
-        }
-        throw new FileNotFoundException("Component " + component + " not found, path searched: " + path);
-    }
-
-    private boolean addFileToPathIfPresent(StringBuilder path, String file, boolean checkEqual)
-            throws FileNotFoundException
-    {
-        final Path fileDir = Paths.get(path.toString());
-        if (!checkDirExists(fileDir))
-        {
-            throw new FileNotFoundException(fileDir + " directory empty or does not exist!");
-        }
-
-        try
-        {
-            Path finalPath = null;
-            try (final DirectoryStream<Path> dirEntries = Files.newDirectoryStream(fileDir))
-            {
-                for (Path entry : dirEntries)
-                {
-                    final Path filePath = entry.getFileName();
-                    if (filePath == null)
-                    {
-                        continue;
-                    }
-                    final String fileName = filePath.toString();
-                    if (fileName.equals(file) || (!checkEqual && fileName.startsWith(file + "-")))
-                    {
-                        if (finalPath == null
-                                || Files.getLastModifiedTime(entry).compareTo(Files.getLastModifiedTime(finalPath)) > 0)
-                        {
-                            finalPath = entry;
-                        }
-                    }
-                }
-                if (finalPath != null)
-                {
-                    final Path finalFilePath = finalPath.getFileName();
-                    if (finalFilePath == null)
-                    {
-                        return false;
-                    }
-                    final String finalFileName = finalFilePath.toString();
-                    path.append('/').append(finalFileName);
-                    return true;
-                }
-            }
-        }
-        catch (IOException e)
-        {
-            logger.error("Error listing files in path {}, could not add file {} to path", path, file, e);
-            throw new RuntimeException("Failed to list files in path " + path);
-        }
-        return false;
-    }
-
-    private boolean checkDirExists(final Path path)
-    {
-        final File file = new File(path.toString());
-        return file.exists() && file.isDirectory();
-    }
-}
index b90f231af1603cf3df9250c41393f8a83e5fefd3..131320efaf715449bde659fa0dae1399c0395841 100644 (file)
@@ -1,24 +1,27 @@
 package org.apache.cassandra.sidecar.utils;
 
-import java.io.File;
 import java.time.Duration;
 import java.time.Instant;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
 
 import com.google.common.util.concurrent.SidecarRateLimiter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.handler.HttpException;
 import org.apache.cassandra.sidecar.Configuration;
+import org.apache.cassandra.sidecar.exceptions.RangeException;
 import org.apache.cassandra.sidecar.models.HttpResponse;
 import org.apache.cassandra.sidecar.models.Range;
 
-import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
+import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS;
 
 /**
  * General handler for serving files
@@ -27,87 +30,172 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS;
 public class FileStreamer
 {
     private static final Logger LOGGER = LoggerFactory.getLogger(FileStreamer.class);
-    private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
-            new ThreadFactoryBuilder().setNameFormat("acquirePermit").setDaemon(true).build());
-    private final Duration delay;
-    private final Duration timeout;
+    private static final long DEFAULT_RATE_LIMIT_STREAM_REQUESTS_PER_SECOND = Long.MAX_VALUE;
 
+    private final Vertx vertx;
+    private final Configuration config;
     private final SidecarRateLimiter rateLimiter;
 
     @Inject
-    public FileStreamer(Configuration config, SidecarRateLimiter rateLimiter)
+    public FileStreamer(Vertx vertx, Configuration config, SidecarRateLimiter rateLimiter)
     {
+        this.vertx = vertx;
+        this.config = config;
         this.rateLimiter = rateLimiter;
-        this.delay = Duration.ofSeconds(config.getThrottleDelayInSeconds());
-        this.timeout = Duration.ofSeconds(config.getThrottleTimeoutInSeconds());
     }
 
-    public void stream(final HttpResponse resp, final File file)
+    /**
+     * Streams the {@code filename file} with length {@code fileLength} for the (optionally) requested
+     * {@code rangeHeader} using the provided {@code response}.
+     *
+     * @param response      the response to use
+     * @param filename      the path to the file to serve
+     * @param fileLength    the size of the file to serve
+     * @param rangeHeader   (optional) a string representing the requested range for the file
+     * @return a future with the result of the streaming
+     */
+    public Future<Void> stream(HttpResponse response, String filename, long fileLength, String rangeHeader)
     {
-        stream(resp, file, new Range(0, file.length() - 1, file.length()));
+        return parseRangeHeader(rangeHeader, fileLength)
+               .compose(range -> stream(response, filename, fileLength, range));
     }
 
-    public void stream(final HttpResponse resp, final File file, final Range range)
+    /**
+     * Streams the {@code filename file} with length {@code fileLength} for the requested
+     * {@code range} using the provided {@code response}.
+     *
+     * @param response      the response to use
+     * @param filename      the path to the file to serve
+     * @param fileLength    the size of the file to serve
+     * @param range         the range to stream
+     * @return a future with the result of the streaming
+     */
+    public Future<Void> stream(HttpResponse response, String filename, long fileLength, Range range)
     {
-        if (!file.exists() || !file.isFile())
-        {
-            resp.setNotFoundStatus("File does not exist or it is not a normal file");
-            return;
-        }
-        if (file.length() == 0)
-        {
-            resp.setBadRequestStatus("File is empty");
-            return;
-        }
-        acquireAndSend(resp, file, range);
+        Promise<Void> promise = Promise.promise();
+        acquireAndSend(response, filename, fileLength, range, Instant.now(), promise);
+        return promise.future();
     }
 
-    private void acquireAndSend(HttpResponse response, File file, Range range)
+    /**
+     * Send the file if rate-limiting is disabled or when it successfully acquires a permit from the
+     * {@link SidecarRateLimiter}.
+     *
+     * @param response      the response to use
+     * @param filename      the path to the file to serve
+     * @param fileLength    the size of the file to serve
+     * @param range         the range to stream
+     * @param startTime     the start time of this request
+     * @param promise       a promise for the stream
+     */
+    private void acquireAndSend(HttpResponse response, String filename, long fileLength, Range range, Instant startTime,
+                                Promise<Void> promise)
     {
-        acquireAndSend(response, file, range, Instant.now());
+        if (!isRateLimited() || acquire(response, filename, fileLength, range, startTime, promise))
+        {
+            // Stream data if rate limiting is disabled or if we acquire
+            LOGGER.info("Streaming range {} for file {} to client {}. Instance: {}", range, filename,
+                        response.remoteAddress(), response.host());
+            response.sendFile(filename, fileLength, range)
+                    .onSuccess(v ->
+                    {
+                        LOGGER.debug("Streamed file {} successfully to client {}. Instance: {}", filename,
+                                     response.remoteAddress(), response.host());
+                        promise.complete();
+                    })
+                    .onFailure(promise::fail);
+        }
     }
 
     /**
-     * If permit becomes available within a short time, retry immediately
+     * Acquires a permit from the {@link SidecarRateLimiter} if it can be acquired immediately without
+     * delay. Otherwise, it will retry acquiring the permit later in the future until it exhausts the
+     * retry timeout, in which case it will ask the client to retry later in the future.
+     *
+     * @param response      the response to use
+     * @param filename      the path to the file to serve
+     * @param fileLength    the size of the file to serve
+     * @param range         the range to stream
+     * @param startTime     the start time of this request
+     * @param promise       a promise for the stream
+     * @return {@code true} if the permit was acquired, {@code false} otherwise
      */
-    private void acquireAndSend(HttpResponse response, File file, Range range, Instant startTime)
+    private boolean acquire(HttpResponse response, String filename, long fileLength, Range range, Instant startTime,
+                            Promise<Void> promise)
     {
-        while (!rateLimiter.tryAcquire())
+        if (rateLimiter.tryAcquire())
+            return true;
+
+        long microsToWait;
+        if (checkRetriesExhausted(startTime))
+        {
+            LOGGER.error("Retries for acquiring permit exhausted for client {}. Instance: {}", response.remoteAddress(),
+                         response.host());
+            promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Retry exhausted"));
+        }
+        else if ((microsToWait = rateLimiter.queryEarliestAvailable(0L))
+                 < TimeUnit.SECONDS.toMicros(config.getThrottleDelayInSeconds()))
         {
-            if (checkRetriesExhausted(startTime))
-            {
-                LOGGER.error("Retries for acquiring permit exhausted!");
-                response.setTooManyRequestsStatus();
-                return;
-            }
-
-            final long microsToWait = rateLimiter.queryEarliestAvailable(0L);
-            if (microsToWait <= 0) // immediately retry
-            {
-                continue;
-            }
-
-            if (TimeUnit.MICROSECONDS.toNanos(microsToWait) >= delay.getNano())
-            {
-                response.setRetryAfterHeader(microsToWait);
-            }
-            else
-            {
-                retryStreaming(response, file, range, startTime, microsToWait);
-            }
-            return;
+            microsToWait = Math.max(0, microsToWait);
+
+            LOGGER.debug("Retrying streaming after {} micros for client {}. Instance: {}", microsToWait,
+                         response.remoteAddress(), response.host());
+            vertx.setTimer(MICROSECONDS.toMillis(microsToWait),
+                           t -> acquireAndSend(response, filename, fileLength, range, startTime, promise));
+        }
+        else
+        {
+            LOGGER.debug("Asking client {} to retry after {} micros. Instance: {}", response.remoteAddress(),
+                         microsToWait, response.host());
+            response.setRetryAfterHeader(microsToWait);
+            promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Ask client to retry later"));
         }
-        LOGGER.info("File {} streamed from path {}", file.getName(), file.getAbsolutePath());
-        response.sendFile(file, range);
+        return false;
     }
 
+    /**
+     * @return true if this request is rate-limited, false otherwise
+     */
+    private boolean isRateLimited()
+    {
+        return config.getRateLimitStreamRequestsPerSecond() != DEFAULT_RATE_LIMIT_STREAM_REQUESTS_PER_SECOND;
+    }
+
+    /**
+     * @param startTime the request start time
+     * @return true if we exhausted the retries, false otherwise
+     */
     private boolean checkRetriesExhausted(Instant startTime)
     {
-        return startTime.plus(timeout).isBefore(Instant.now());
+        return startTime.plus(Duration.ofSeconds(config.getThrottleTimeoutInSeconds()))
+                        .isBefore(Instant.now());
     }
 
-    private void retryStreaming(HttpResponse response, File file, Range range, Instant startTime, long microsToSleep)
+    /**
+     * Returns the requested range for the request, or the entire range if {@code rangeHeader} is null
+     *
+     * @param rangeHeader The range header from the request
+     * @param fileLength  The length of the file
+     * @return a succeeded future when the parsing is successful, a failed future when the range parsing fails
+     */
+    private Future<Range> parseRangeHeader(String rangeHeader, long fileLength)
     {
-        SCHEDULER.schedule(() -> acquireAndSend(response, file, range, startTime), microsToSleep, MICROSECONDS);
+        Range fr = new Range(0, fileLength - 1, fileLength);
+        if (rangeHeader == null)
+            return Future.succeededFuture(fr);
+
+        try
+        {
+            // sidecar does not support multiple ranges as of now
+            final Range hr = Range.parseHeader(rangeHeader, fileLength);
+            Range intersect = fr.intersect(hr);
+            LOGGER.debug("Calculated range {} for streaming", intersect);
+            return Future.succeededFuture(intersect);
+        }
+        catch (IllegalArgumentException | RangeException | UnsupportedOperationException e)
+        {
+            LOGGER.error(String.format("Failed to parse header '%s'", rangeHeader), e);
+            return Future.failedFuture(new HttpException(REQUESTED_RANGE_NOT_SATISFIABLE.code()));
+        }
     }
 }
index 0ae7fe14ec4055e392d728ada59c599047bfcaaa..f74d08ea644c6798e15fec4b60ef13f220f6046c 100644 (file)
@@ -35,20 +35,6 @@ public class InstanceMetadataFetcher
                : instancesConfig.instanceFromId(instanceId).delegate();
     }
 
-    public FilePathBuilder getPathBuilder(String host)
-    {
-        return host == null
-               ? getFirstInstance().pathBuilder()
-               : instancesConfig.instanceFromHost(host).pathBuilder();
-    }
-
-    public FilePathBuilder getPathBuilder(Integer instanceId)
-    {
-        return instanceId == null
-               ? getFirstInstance().pathBuilder()
-               : instancesConfig.instanceFromId(instanceId).pathBuilder();
-    }
-
     private InstanceMetadata getFirstInstance()
     {
         if (instancesConfig.instances().isEmpty())
diff --git a/src/test/java/org/apache/cassandra/sidecar/FilePathBuilderTest.java b/src/test/java/org/apache/cassandra/sidecar/FilePathBuilderTest.java
deleted file mode 100644 (file)
index 10ffafd..0000000
+++ /dev/null
@@ -1,126 +0,0 @@
-package org.apache.cassandra.sidecar;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.util.Collections;
-
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Test;
-
-import com.google.inject.Guice;
-import com.google.inject.Injector;
-import com.google.inject.util.Modules;
-import org.apache.cassandra.sidecar.cluster.InstancesConfig;
-import org.apache.cassandra.sidecar.utils.CachedFilePathBuilder;
-import org.apache.cassandra.sidecar.utils.FilePathBuilder;
-import org.assertj.core.api.Assertions;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertThrows;
-
-/**
- * FilePathBuilderTest
- */
-public class FilePathBuilderTest
-{
-    private static final String expectedFilePath = "src/test/resources/instance1/data/TestKeyspace" +
-            "/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots/TestSnapshot" +
-            "/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-    private static FilePathBuilder pathBuilder;
-
-    @BeforeAll
-    public static void setUp()
-    {
-        Injector injector = Guice.createInjector(Modules.override(new MainModule()).with(new TestModule()));
-        pathBuilder = injector.getInstance(InstancesConfig.class).instances().get(0).pathBuilder();
-    }
-
-    @Test
-    public void testRoute() throws IOException
-    {
-        final String keyspace = "TestKeyspace";
-        final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
-        final String snapshot = "TestSnapshot";
-        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        Path filePath = pathBuilder.build(keyspace, table, snapshot, component);
-        assertEquals(expectedFilePath, filePath.toString());
-    }
-
-    @Test
-    public void testKeyspaceNotFound()
-    {
-        final String keyspace = "random";
-        final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
-        final String snapshot = "TestSnapshot";
-        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
-        {
-            pathBuilder.build(keyspace, table, snapshot, component);
-        });
-        String msg = "Keyspace random does not exist";
-        assertEquals(msg, thrownException.getMessage());
-    }
-
-    @Test
-    public void testTableNotFound()
-    {
-        final String keyspace = "TestKeyspace";
-        final String table = "random";
-        final String snapshot = "TestSnapshot";
-        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
-        {
-            pathBuilder.build(keyspace, table, snapshot, component);
-        });
-        String msg = "Table random not found, path searched: src/test/resources/instance1/data/TestKeyspace";
-        assertEquals(msg, thrownException.getMessage());
-    }
-
-    @Test
-    public void testSnapshotNotFound()
-    {
-        final String keyspace = "TestKeyspace";
-        final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
-        final String snapshot = "random";
-        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
-        {
-            pathBuilder.build(keyspace, table, snapshot, component);
-        });
-        String msg = "Snapshot random not found, path searched: src/test/resources/instance1/data/TestKeyspace" +
-                     "/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
-        assertEquals(msg, thrownException.getMessage());
-    }
-
-    @Test
-    public void testPartialTableName() throws FileNotFoundException
-    {
-        final String keyspace = "TestKeyspace";
-        final String table = "TestTable";
-        final String snapshot = "TestSnapshot";
-        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        Path filePath = pathBuilder.build(keyspace, table, snapshot, component);
-        assertEquals(expectedFilePath, filePath.toString());
-    }
-
-    @Test
-    public void testEmptyDataDir() throws IOException
-    {
-        String dataDir = new File("./").getCanonicalPath() + "/src/test/resources/instance";
-
-        final String keyspace = "TestKeyspace";
-        final String table = "TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b";
-        final String snapshot = "TestSnapshot";
-        final String component = "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-
-        FilePathBuilder pathBuilder = new CachedFilePathBuilder(Collections.singletonList(dataDir));
-        FileNotFoundException thrownException = assertThrows(FileNotFoundException.class, () ->
-        {
-            pathBuilder.build(keyspace, table, snapshot, component);
-        });
-        String msg = "directory empty or does not exist!";
-        Assertions.assertThat(thrownException.getMessage()).contains(msg);
-    }
-}
index 774f880d4544330bb9467ebdeac679f080f4b58a..b31b29ab763c74cc39b4fd6c6fbd2efb3d5dd2c1 100644 (file)
@@ -89,9 +89,12 @@ public class LoggerHandlerInjectionTest
         final CountDownLatch closeLatch = new CountDownLatch(1);
         server.close(res -> closeLatch.countDown());
         vertx.close();
-        if (closeLatch.await(60, TimeUnit.SECONDS)) {
+        if (closeLatch.await(60, TimeUnit.SECONDS))
+        {
             logger.info("Close event received before timeout.");
-        } else {
+        }
+        else
+        {
             logger.error("Close event timed out.");
         }
     }
index fc03af90b0d5134d9125242cd8867651d99deb8a..889baf956062ae3f954af107420406f56695a48f 100644 (file)
@@ -69,9 +69,9 @@ public class StreamSSTableComponentTest
     void testRoute(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .as(BodyCodec.buffer())
                 .send(context.succeeding(response -> context.verify(() ->
                 {
@@ -85,9 +85,9 @@ public class StreamSSTableComponentTest
     void testKeyspaceNotFound(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/random/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/random/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .send(context.succeeding(response -> context.verify(() ->
                 {
                     assertThat(response.statusCode()).isEqualTo(NOT_FOUND.code());
@@ -99,9 +99,9 @@ public class StreamSSTableComponentTest
     void testSnapshotNotFound(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/random/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .send(context.succeeding(response -> context.verify(() ->
                 {
                     assertThat(response.statusCode()).isEqualTo(NOT_FOUND.code());
@@ -113,13 +113,13 @@ public class StreamSSTableComponentTest
     void testForbiddenKeyspace(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/system/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/system/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .send(context.succeeding(response -> context.verify(() ->
                 {
                     assertThat(response.statusCode()).isEqualTo(FORBIDDEN.code());
-                    assertThat(response.statusMessage()).isEqualTo("system keyspace is forbidden");
+                    assertThat(response.statusMessage()).isEqualTo(FORBIDDEN.reasonPhrase());
                     context.completeNow();
                 })));
     }
@@ -128,13 +128,13 @@ public class StreamSSTableComponentTest
     void testIncorrectKeyspaceFormat(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/k*s/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/k*s/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .send(context.succeeding(response -> context.verify(() ->
                 {
                     assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code());
-                    assertThat(response.statusMessage()).isEqualTo("Invalid path params found");
+                    assertThat(response.statusMessage()).isEqualTo(BAD_REQUEST.reasonPhrase());
                     context.completeNow();
                 })));
     }
@@ -143,13 +143,13 @@ public class StreamSSTableComponentTest
     void testIncorrectComponentFormat(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data...db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .send(context.succeeding(response -> context.verify(() ->
                 {
                     assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code());
-                    assertThat(response.statusMessage()).isEqualTo("Invalid path params found");
+                    assertThat(response.statusMessage()).isEqualTo(BAD_REQUEST.reasonPhrase());
                     context.completeNow();
                 })));
     }
@@ -158,13 +158,13 @@ public class StreamSSTableComponentTest
     void testAccessDeniedToCertainComponents(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Digest.crc32d";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .send(context.succeeding(response -> context.verify(() ->
                 {
                     assertThat(response.statusCode()).isEqualTo(BAD_REQUEST.code());
-                    assertThat(response.statusMessage()).isEqualTo("Invalid path params found");
+                    assertThat(response.statusMessage()).isEqualTo(BAD_REQUEST.reasonPhrase());
                     context.completeNow();
                 })));
     }
@@ -173,9 +173,9 @@ public class StreamSSTableComponentTest
     void testPartialTableName(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable/snapshot/TestSnapshot/component" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable/snapshots/TestSnapshot/component" +
                 "/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .putHeader("Range", "bytes=0-")
                 .as(BodyCodec.buffer())
                 .send(context.succeeding(response -> context.verify(() ->
@@ -190,9 +190,9 @@ public class StreamSSTableComponentTest
     void testInvalidRange(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .putHeader("Range", "bytes=4-3")
                 .send(context.succeeding(response -> context.verify(() ->
                 {
@@ -205,9 +205,9 @@ public class StreamSSTableComponentTest
     void testRangeExceeds(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .putHeader("Range", "bytes=5-9")
                 .send(context.succeeding(response -> context.verify(() ->
                 {
@@ -220,9 +220,9 @@ public class StreamSSTableComponentTest
     void testPartialRangeExceeds(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .putHeader("Range", "bytes=5-")
                 .send(context.succeeding(response -> context.verify(() ->
                 {
@@ -235,9 +235,9 @@ public class StreamSSTableComponentTest
     void testRangeBoundaryExceeds(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .putHeader("Range", "bytes=0-999999")
                 .as(BodyCodec.buffer())
                 .send(context.succeeding(response -> context.verify(() ->
@@ -252,9 +252,9 @@ public class StreamSSTableComponentTest
     void testPartialRangeStreamed(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .putHeader("Range", "bytes=0-2") // 3 bytes streamed
                 .as(BodyCodec.buffer())
                 .send(context.succeeding(response -> context.verify(() ->
@@ -269,9 +269,9 @@ public class StreamSSTableComponentTest
     void testSuffixRange(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .putHeader("Range", "bytes=-2") // last 2 bytes streamed
                 .as(BodyCodec.buffer())
                 .send(context.succeeding(response -> context.verify(() ->
@@ -286,9 +286,9 @@ public class StreamSSTableComponentTest
     void testSuffixRangeExceeds(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .putHeader("Range", "bytes=-5")
                 .send(context.succeeding(response -> context.verify(() ->
                 {
@@ -301,9 +301,9 @@ public class StreamSSTableComponentTest
     void testInvalidRangeUnit(VertxTestContext context)
     {
         WebClient client = WebClient.create(vertx);
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1" + testRoute)
                 .putHeader("Range", "bits=0-2")
                 .send(context.succeeding(response -> context.verify(() ->
                 {
@@ -317,9 +317,9 @@ public class StreamSSTableComponentTest
     {
         WebClient client = WebClient.create(vertx);
         String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/" +
-                           "snapshot/TestSnapshot/component/" +
+                           "snapshots/TestSnapshot/component/" +
                            "TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
-        client.get(config.getPort(), "localhost", "/api/v1/stream/instance/2" + testRoute)
+        client.get(config.getPort(), "localhost", "/api/v1/instance/2" + testRoute)
               .as(BodyCodec.buffer())
               .send(context.succeeding(response -> context.verify(() ->
               {
index 892dcd756248acf3ec7a71a755dd738203b2e2a9..708b6849baff573dac4a20eba554e3fa141e5f1e 100644 (file)
@@ -28,13 +28,14 @@ import org.slf4j.LoggerFactory;
 import com.google.inject.AbstractModule;
 import com.google.inject.Provides;
 import com.google.inject.Singleton;
+import io.vertx.core.Vertx;
+import io.vertx.core.file.FileSystem;
 import org.apache.cassandra.sidecar.cluster.InstancesConfig;
 import org.apache.cassandra.sidecar.cluster.InstancesConfigImpl;
 import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
 import org.apache.cassandra.sidecar.common.CassandraAdapterDelegate;
 import org.apache.cassandra.sidecar.common.CassandraVersionProvider;
 import org.apache.cassandra.sidecar.common.MockCassandraFactory;
-import org.apache.cassandra.sidecar.utils.CachedFilePathBuilder;
 
 import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
@@ -100,7 +101,6 @@ public class TestModule extends AbstractModule
         when(instanceMeta.id()).thenReturn(id);
         when(instanceMeta.host()).thenReturn(host);
         when(instanceMeta.port()).thenReturn(6475);
-        when(instanceMeta.pathBuilder()).thenReturn(new CachedFilePathBuilder(Collections.singletonList(dataDir)));
         when(instanceMeta.dataDirs()).thenReturn(Collections.singletonList(dataDir));
 
         CassandraAdapterDelegate delegate = mock(CassandraAdapterDelegate.class);
@@ -110,6 +110,13 @@ public class TestModule extends AbstractModule
         return instanceMeta;
     }
 
+    @Provides
+    @Singleton
+    public FileSystem fileSystem(Vertx vertx)
+    {
+        return vertx.fileSystem();
+    }
+
     /**
      * The Mock factory is used for testing purposes, enabling us to test all failures and possible results
      * @return
index d77c8e558e2720c6892e7c9ef3c4698c75b2ed7f..65f39b936d0c2c45429ecc590e76e7f7fc586cc3 100644 (file)
@@ -67,7 +67,7 @@ public class ThrottleTest
     @Test
     void testStreamRequestsThrottled() throws Exception
     {
-        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshot" +
+        String testRoute = "/keyspace/TestKeyspace/table/TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b/snapshots" +
                 "/TestSnapshot/component/TestKeyspace-TestTable-54ea95ce-bba2-4e0a-a9be-e428e5d7160b-Data.db";
 
         for (int i = 0; i < 20; i++)
@@ -89,7 +89,7 @@ public class ThrottleTest
     private void unblockingClientRequest(String route)
     {
         WebClient client = WebClient.create(vertx);
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + route)
+        client.get(config.getPort(), "localhost", "/api/v1" + route)
                 .as(BodyCodec.buffer())
                 .send(resp ->
                 {
@@ -101,7 +101,7 @@ public class ThrottleTest
     {
         WebClient client = WebClient.create(vertx);
         CompletableFuture<HttpResponse> future = new CompletableFuture<>();
-        client.get(config.getPort(), "localhost", "/api/v1/stream" + route)
+        client.get(config.getPort(), "localhost", "/api/v1" + route)
                 .as(BodyCodec.buffer())
                 .send(resp -> future.complete(resp.result()));
         return future.get();
diff --git a/src/test/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java b/src/test/java/org/apache/cassandra/sidecar/snapshots/AbstractSnapshotPathBuilderTest.java
new file mode 100644 (file)
index 0000000..0aa2e3e
--- /dev/null
@@ -0,0 +1,672 @@
+package org.apache.cassandra.sidecar.snapshots;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.junit.rules.TemporaryFolder;
+
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.vertx.core.Future;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.handler.HttpException;
+import io.vertx.junit5.VertxTestContext;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+import org.apache.cassandra.sidecar.cluster.instance.InstanceMetadata;
+import org.apache.cassandra.sidecar.common.data.StreamSSTableComponentRequest;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.Assertions.from;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+abstract class AbstractSnapshotPathBuilderTest
+{
+    @TempDir
+    File dataDir0;
+
+    @TempDir
+    File dataDir1;
+
+    SnapshotPathBuilder instance;
+    Vertx vertx = Vertx.vertx();
+
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    @BeforeEach
+    void setup() throws IOException
+    {
+        InstancesConfig mockInstancesConfig = mock(InstancesConfig.class);
+        InstanceMetadata mockInstanceMeta = mock(InstanceMetadata.class);
+        InstanceMetadata mockInvalidDataDirInstanceMeta = mock(InstanceMetadata.class);
+        InstanceMetadata mockEmptyDataDirInstanceMeta = mock(InstanceMetadata.class);
+
+        when(mockInstancesConfig.instanceFromHost("localhost")).thenReturn(mockInstanceMeta);
+        when(mockInstanceMeta.dataDirs()).thenReturn(Arrays.asList(dataDir0.getAbsolutePath(),
+                                                                   dataDir1.getAbsolutePath()));
+
+        when(mockInstancesConfig.instanceFromHost("invalidDataDirInstance")).thenReturn(mockInvalidDataDirInstanceMeta);
+        String invalidDirPath = dataDir0.getParentFile().getAbsolutePath() + "/invalid-data-dir";
+        when(mockInvalidDataDirInstanceMeta.dataDirs()).thenReturn(Arrays.asList(invalidDirPath));
+
+        when(mockInstancesConfig.instanceFromHost("emptyDataDirInstance")).thenReturn(mockEmptyDataDirInstanceMeta);
+        when(mockEmptyDataDirInstanceMeta.dataDirs()).thenReturn(Arrays.asList());
+
+        // Create some files and directories
+        assertThat(new File(dataDir0, "not_a_keyspace_dir").createNewFile());
+        assertThat(new File(dataDir0, "ks1/table1/snapshots/backup.2022-03-17-04-PDT/not_a_file.db").mkdirs());
+        assertThat(new File(dataDir0, "ks1/not_a_table_dir").createNewFile());
+        assertThat(new File(dataDir0, "ks1/table1/snapshots/not_a_snapshot_dir").createNewFile());
+        assertThat(new File(dataDir0, "data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd").mkdirs());
+
+        assertThat(new File(dataDir1, "ks3/table3/snapshots/snapshot1").mkdirs());
+
+        // this is a different table with the same "table4" prefix
+        assertThat(new File(dataDir1, "data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1" +
+                                      "/snapshots/this_is_a_valid_snapshot_name_i_❤_u").mkdirs());
+
+        // table && table-<TABLE_UUID>
+        assertThat(new File(dataDir0, "ks1/a_table/snapshots/a_snapshot/").mkdirs());
+        assertThat(new File(dataDir0, "ks1/a_table-a72c8740a57611ec935db766a70c44a1/snapshots/a_snapshot/").mkdirs());
+
+        // create some files inside snapshot backup.2022-03-17-04-PDT
+        assertThat(new File(dataDir0, "ks1/table1/snapshots/backup.2022-03-17-04-PDT/data.db").createNewFile());
+        assertThat(new File(dataDir0, "ks1/table1/snapshots/backup.2022-03-17-04-PDT/index.db").createNewFile());
+        assertThat(new File(dataDir0, "ks1/table1/snapshots/backup.2022-03-17-04-PDT/nb-203-big-TOC.txt")
+                   .createNewFile());
+
+        // create some files inside snapshot ea823202-a62c-4603-bb6a-4e15d79091cd
+        assertThat(new File(dataDir0, "data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/data.db")
+                   .createNewFile());
+        assertThat(new File(dataDir0, "data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/index.db")
+                   .createNewFile());
+        assertThat(
+        new File(dataDir0, "data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/nb-203-big-TOC.txt")
+        .createNewFile());
+
+        // create some files inside snapshot snapshot1 in dataDir1
+        assertThat(new File(dataDir1, "ks3/table3/snapshots/snapshot1/data.db").createNewFile());
+        assertThat(new File(dataDir1, "ks3/table3/snapshots/snapshot1/index.db").createNewFile());
+        assertThat(new File(dataDir1, "ks3/table3/snapshots/snapshot1/nb-203-big-TOC.txt").createNewFile());
+
+        assertThat(new File(dataDir1, "data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1" +
+                                      "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/data.db").createNewFile());
+        assertThat(new File(dataDir1, "data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1" +
+                                      "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/index.db").createNewFile());
+        assertThat(new File(dataDir1, "data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1" +
+                                      "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/nb-203-big-TOC.txt")
+                   .createNewFile());
+
+        vertx = Vertx.vertx();
+        instance = initialize(vertx, mockInstancesConfig);
+    }
+
+    @AfterEach
+    void clear()
+    {
+        assertThat(dataDir0.delete());
+        assertThat(dataDir1.delete());
+    }
+
+    abstract SnapshotPathBuilder initialize(Vertx vertx, InstancesConfig instancesConfig);
+
+    @Test
+    void failsWhenKeyspaceIsNull()
+    {
+        assertThatThrownBy(() -> instance.build("localhost",
+                                                new StreamSSTableComponentRequest(null, "table",
+                                                                                  "snapshot", "component")))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessageContaining("keyspace must not be null");
+    }
+
+    @Test
+    void failsWhenKeyspaceContainsInvalidCharacters()
+    {
+        assertThatThrownBy(() -> instance.build("localhost",
+                                                new StreamSSTableComponentRequest("i_❤_u", "table",
+                                                                                  "snapshot", "component")))
+        .isInstanceOf(HttpException.class)
+        .hasMessageContaining("Bad Request")
+        .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+        .returns("Invalid characters in keyspace: i_❤_u", from(t -> ((HttpException) t).getPayload()));
+    }
+
+    @Test
+    void failsWhenKeyspaceContainsPathTraversalAttack()
+    {
+        assertThatThrownBy(() -> instance.build("localhost",
+                                                new StreamSSTableComponentRequest("../../../etc/passwd",
+                                                                                  "table",
+                                                                                  "snapshot",
+                                                                                  "component")))
+        .isInstanceOf(HttpException.class)
+        .hasMessageContaining("Bad Request")
+        .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+        .returns("Invalid characters in keyspace: ../../../etc/passwd", from(t -> ((HttpException) t)
+                                                                                  .getPayload()));
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = { "system_schema", "system_traces", "system_distributed", "system", "system_auth",
+                             "system_views", "system_virtual_schema" })
+    void failsWhenKeyspaceIsForbidden(String forbiddenKeyspace)
+    {
+        assertThatThrownBy(() -> instance.build("localhost",
+                                                new StreamSSTableComponentRequest(forbiddenKeyspace,
+                                                                                  "table",
+                                                                                  "snapshot",
+                                                                                  "component")))
+        .isInstanceOf(HttpException.class)
+        .hasMessageContaining("Forbidden")
+        .returns(HttpResponseStatus.FORBIDDEN.code(), from(t -> ((HttpException) t).getStatusCode()))
+        .returns("Forbidden keyspace: " + forbiddenKeyspace, from(t -> ((HttpException) t).getPayload()));
+    }
+
+    @Test
+    void failsWhenTableNameIsNull()
+    {
+        assertThatThrownBy(() -> instance.build("localhost",
+                                                new StreamSSTableComponentRequest("ks",
+                                                                                  null,
+                                                                                  "snapshot",
+                                                                                  "component")))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessageContaining("tableName must not be null");
+    }
+
+    @Test
+    void failsWhenTableNameContainsInvalidCharacters()
+    {
+        assertThatThrownBy(() -> instance.build("localhost",
+                                                new StreamSSTableComponentRequest("ks",
+                                                                                  "i_❤_u",
+                                                                                  "snapshot",
+                                                                                  "component")))
+        .isInstanceOf(HttpException.class)
+        .hasMessageContaining("Bad Request")
+        .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+        .returns("Invalid characters in table name: i_❤_u", from(t -> ((HttpException) t).getPayload()));
+    }
+
+    @Test
+    void failsWhenTableNameContainsPathTraversalAttack()
+    {
+        assertThatThrownBy(() -> instance.build("localhost",
+                                                new StreamSSTableComponentRequest("ks",
+                                                                                  "../../../etc/passwd",
+                                                                                  "snapshot",
+                                                                                  "component")))
+        .isInstanceOf(HttpException.class)
+        .hasMessageContaining("Bad Request")
+        .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+        .returns("Invalid characters in table name: ../../../etc/passwd", from(t -> ((HttpException) t)
+                                                                                    .getPayload()));
+    }
+
+    @Test
+    void failsWhenSnapshotNameIsNull()
+    {
+        assertThatThrownBy(() -> instance.build("localhost",
+                                                new StreamSSTableComponentRequest("ks",
+                                                                                  "table",
+                                                                                  null,
+                                                                                  "component.db")))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessageContaining("snapshotName must not be null");
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = { "slash/is-not-allowed", "null-char\0-is-not-allowed", "../../../etc/passwd" })
+    void failsWhenSnapshotNameContainsInvalidCharacters(String invalidFileName)
+    {
+        assertThatThrownBy(() -> instance.build("localhost",
+                                                new StreamSSTableComponentRequest("ks",
+                                                                                  "table",
+                                                                                  invalidFileName,
+                                                                                  "component.db")))
+        .isInstanceOf(HttpException.class)
+        .hasMessageContaining("Bad Request")
+        .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+        .returns("Invalid characters in snapshot name: " + invalidFileName, from(t -> ((HttpException) t)
+                                                                                      .getPayload()));
+    }
+
+    @Test
+    void failsWhenComponentNameIsNull()
+    {
+        assertThatThrownBy(() -> instance.build("localhost",
+                                                new StreamSSTableComponentRequest("ks",
+                                                                                  "table",
+                                                                                  "snapshot",
+                                                                                  null)))
+        .isInstanceOf(NullPointerException.class)
+        .hasMessageContaining("componentName must not be null");
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = { "i_❤_u.db", "this-is-not-allowed.jar", "cql-is-not-allowed-here.cql",
+                             "json-is-not-allowed-here.json", "crc32-is-not-allowed-here.crc32",
+                             "../../../etc/passwd.db" })
+    void failsWhenComponentNameContainsInvalidCharacters(String invalidComponentName)
+    {
+        assertThatThrownBy(() -> instance.build("localhost",
+                                                new StreamSSTableComponentRequest("ks",
+                                                                                  "table",
+                                                                                  "snapshot",
+                                                                                  invalidComponentName)))
+        .isInstanceOf(HttpException.class)
+        .hasMessageContaining("Bad Request")
+        .returns(HttpResponseStatus.BAD_REQUEST.code(), from(t -> ((HttpException) t).getStatusCode()))
+        .returns("Invalid component name: " + invalidComponentName, from(t -> ((HttpException) t)
+                                                                              .getPayload()));
+    }
+
+    @Test
+    void failsWhenDataDirsAreEmpty() throws InterruptedException
+    {
+        failsWithFileNotFoundException(instance.build("emptyDataDirInstance",
+                                                      new StreamSSTableComponentRequest("ks",
+                                                                                        "table",
+                                                                                        "snapshot",
+                                                                                        "component.db")),
+                                       "No data directories are available for host 'emptyDataDirInstance'");
+    }
+
+    @Test
+    void failsWhenInvalidDataDirectory() throws InterruptedException
+    {
+        failsWithFileNotFoundException(instance.build("invalidDataDirInstance",
+                                                      new StreamSSTableComponentRequest("ks",
+                                                                                        "table",
+                                                                                        "snapshot",
+                                                                                        "component.db")),
+                                       "Keyspace 'ks' does not exist");
+    }
+
+    @Test
+    void failsWhenKeyspaceDirectoryDoesNotExist() throws InterruptedException
+    {
+        failsWithFileNotFoundException(instance.build("localhost",
+                                                      new StreamSSTableComponentRequest("non_existent",
+                                                                                        "table",
+                                                                                        "snapshot",
+                                                                                        "component.db")),
+                                       "Keyspace 'non_existent' does not exist");
+    }
+
+    @Test
+    void failsWhenKeyspaceIsNotADirectory() throws InterruptedException
+    {
+        failsWithFileNotFoundException(instance.build("localhost",
+                                                      new StreamSSTableComponentRequest("not_a_keyspace_dir",
+                                                                                        "table",
+                                                                                        "snapshot",
+                                                                                        "component.db")),
+                                       "Keyspace 'not_a_keyspace_dir' does not exist");
+    }
+
+    @Test
+    void failsWhenTableDoesNotExist() throws InterruptedException
+    {
+        failsWithFileNotFoundException(instance.build("localhost",
+                                                      new StreamSSTableComponentRequest("ks1",
+                                                                                        "non_existent",
+                                                                                        "snapshot",
+                                                                                        "component.db")),
+                                       "Table 'non_existent' does not exist");
+    }
+
+    @Test
+    void failsWhenTableDoesNotExistWithSimilarPrefix() throws InterruptedException
+    {
+        // In this scenario, we have other tables with the "table" prefix (i.e table4)
+        failsWithFileNotFoundException(instance.build("localhost",
+                                                      new StreamSSTableComponentRequest("ks1",
+                                                                                        "table",
+                                                                                        "snapshot",
+                                                                                        "component.db")),
+                                       "Table 'table' does not exist");
+    }
+
+    @Test
+    void failsWhenTableNameIsNotADirectory() throws InterruptedException
+    {
+        failsWithFileNotFoundException(instance.build("localhost",
+                                                      new StreamSSTableComponentRequest("ks1",
+                                                                                        "not_a_table_dir",
+                                                                                        "snapshot",
+                                                                                        "component.db")),
+                                       "Table 'not_a_table_dir' does not exist");
+    }
+
+    @Test
+    void failsWhenSnapshotDirectoryDoesNotExist() throws InterruptedException
+    {
+        failsWithFileNotFoundException(instance.build("localhost",
+                                                      new StreamSSTableComponentRequest("ks1",
+                                                                                        "table1",
+                                                                                        "non_existent",
+                                                                                        "component.db")),
+                                       "Component 'component.db' does not exist for snapshot 'non_existent'");
+    }
+
+    @Test
+    void failsWhenSnapshotIsNotADirectory() throws InterruptedException
+    {
+        failsWithFileNotFoundException(instance.build("localhost",
+                                                      new StreamSSTableComponentRequest("ks1",
+                                                                                        "table1",
+                                                                                        "not_a_snapshot_dir",
+                                                                                        "component.db")),
+                                       "Component 'component.db' does not exist for snapshot 'not_a_snapshot_dir'");
+    }
+
+    @Test
+    void failsWhenComponentFileDoesNotExist() throws InterruptedException
+    {
+        String errMsg = "Component 'does-not-exist-TOC.txt' does not exist for snapshot 'backup.2022-03-17-04-PDT'";
+        failsWithFileNotFoundException(instance.build("localhost",
+                                                      new StreamSSTableComponentRequest("ks1",
+                                                                                        "table1",
+                                                                                        "backup.2022-03-17-04-PDT",
+                                                                                        "does-not-exist-TOC.txt")),
+                                       errMsg);
+    }
+
+    @Test
+    void failsWhenComponentIsNotAFile() throws InterruptedException
+    {
+        String errMsg = "Component 'not_a_file.db' does not exist for snapshot 'backup.2022-03-17-04-PDT'";
+        failsWithFileNotFoundException(instance.build("localhost",
+                                                      new StreamSSTableComponentRequest("ks1",
+                                                                                        "table1",
+                                                                                        "backup.2022-03-17-04-PDT",
+                                                                                        "not_a_file.db")),
+                                       errMsg);
+    }
+
+    @Test
+    void succeedsWhenComponentExists() throws Exception
+    {
+        String expectedPath;
+        expectedPath = dataDir0.getAbsolutePath() + "/ks1/table1/snapshots/backup.2022-03-17-04-PDT/data.db";
+        succeedsWhenComponentExists(instance.build("localhost",
+                                                   new StreamSSTableComponentRequest("ks1",
+                                                                                     "table1",
+                                                                                     "backup.2022-03-17-04-PDT",
+                                                                                     "data.db")),
+                                    expectedPath);
+        expectedPath = dataDir0.getAbsolutePath() + "/ks1/table1/snapshots/backup.2022-03-17-04-PDT/index.db";
+        succeedsWhenComponentExists(instance.build("localhost",
+                                                   new StreamSSTableComponentRequest("ks1",
+                                                                                     "table1",
+                                                                                     "backup.2022-03-17-04-PDT",
+                                                                                     "index.db")),
+                                    expectedPath);
+        expectedPath = dataDir0.getAbsolutePath() + "/ks1/table1/snapshots/backup.2022-03-17-04-PDT/nb-203-big-TOC.txt";
+        succeedsWhenComponentExists(instance.build("localhost",
+                                                   new StreamSSTableComponentRequest("ks1",
+                                                                                     "table1",
+                                                                                     "backup.2022-03-17-04-PDT",
+                                                                                     "nb-203-big-TOC.txt")),
+                                    expectedPath);
+        expectedPath = dataDir0.getAbsolutePath()
+                       + "/data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/data.db";
+        succeedsWhenComponentExists(instance
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks2",
+                                                                             "table2",
+                                                                             "ea823202-a62c-4603-bb6a-4e15d79091cd",
+                                                                             "data.db")),
+                                    expectedPath);
+        expectedPath = dataDir0.getAbsolutePath()
+                       + "/data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/index.db";
+        succeedsWhenComponentExists(instance
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks2",
+                                                                             "table2",
+                                                                             "ea823202-a62c-4603-bb6a-4e15d79091cd",
+                                                                             "index.db")),
+                                    expectedPath);
+        expectedPath = dataDir0.getAbsolutePath()
+                       + "/data/ks2/table2/snapshots/ea823202-a62c-4603-bb6a-4e15d79091cd/nb-203-big-TOC.txt";
+        succeedsWhenComponentExists(instance
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks2",
+                                                                             "table2",
+                                                                             "ea823202-a62c-4603-bb6a-4e15d79091cd",
+                                                                             "nb-203-big-TOC.txt")),
+                                    expectedPath);
+        expectedPath = dataDir1.getAbsolutePath() + "/ks3/table3/snapshots/snapshot1/data.db";
+        succeedsWhenComponentExists(instance.build("localhost",
+                                                   new StreamSSTableComponentRequest("ks3",
+                                                                                     "table3",
+                                                                                     "snapshot1",
+                                                                                     "data.db")),
+                                    expectedPath);
+        expectedPath = dataDir1.getAbsolutePath() + "/ks3/table3/snapshots/snapshot1/index.db";
+        succeedsWhenComponentExists(instance.build("localhost",
+                                                   new StreamSSTableComponentRequest("ks3",
+                                                                                     "table3",
+                                                                                     "snapshot1",
+                                                                                     "index.db")),
+                                    expectedPath);
+        expectedPath = dataDir1.getAbsolutePath() + "/ks3/table3/snapshots/snapshot1/nb-203-big-TOC.txt";
+        succeedsWhenComponentExists(instance.build("localhost",
+                                                   new StreamSSTableComponentRequest("ks3",
+                                                                                     "table3",
+                                                                                     "snapshot1",
+                                                                                     "nb-203-big-TOC.txt")),
+                                    expectedPath);
+
+
+
+        // table table4 shares the prefix with table table4abc
+        expectedPath = dataDir1.getAbsolutePath()
+                       + "/data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1"
+                       + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/data.db";
+        succeedsWhenComponentExists(instance
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks4",
+                                                                             "table4abc",
+                                                                             "this_is_a_valid_snapshot_name_i_❤_u",
+                                                                             "data.db")),
+                                    expectedPath);
+        expectedPath = dataDir1.getAbsolutePath()
+                       + "/data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1"
+                       + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/index.db";
+        succeedsWhenComponentExists(instance
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks4",
+                                                                             "table4abc",
+                                                                             "this_is_a_valid_snapshot_name_i_❤_u",
+                                                                             "index.db")),
+                                    expectedPath);
+        expectedPath = dataDir1.getAbsolutePath()
+                       + "/data/ks4/table4abc-a72c8740a57611ec935db766a70c44a1"
+                       + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/nb-203-big-TOC.txt";
+        succeedsWhenComponentExists(instance
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks4",
+                                                                             "table4abc",
+                                                                             "this_is_a_valid_snapshot_name_i_❤_u",
+                                                                             "nb-203-big-TOC.txt")),
+                                    expectedPath);
+
+
+    }
+
+    @Test
+    void testTableWithUUIDPicked() throws IOException, InterruptedException
+    {
+        TemporaryFolder tempFolder = new TemporaryFolder();
+        tempFolder.create();
+        File dataDir = tempFolder.newFolder("data");
+
+        InstancesConfig mockInstancesConfig = mock(InstancesConfig.class);
+        InstanceMetadata mockInstanceMeta = mock(InstanceMetadata.class);
+
+        when(mockInstancesConfig.instanceFromHost("localhost")).thenReturn(mockInstanceMeta);
+        when(mockInstanceMeta.dataDirs()).thenReturn(Arrays.asList(dataDir.getAbsolutePath()));
+
+        File atable = new File(dataDir, "data/ks1/a_table");
+        assertThat(atable.mkdirs());
+        File atableSnapshot = new File(atable, "snapshots/a_snapshot");
+        assertThat(atableSnapshot.mkdirs());
+        assertThat(new File(atable, "snapshots/a_snapshot/data.db").createNewFile());
+        assertThat(new File(atable, "snapshots/a_snapshot/index.db").createNewFile());
+        assertThat(new File(atable, "snapshots/a_snapshot/nb-203-big-TOC.txt").createNewFile());
+
+        File atableWithUUID = new File(dataDir, "data/ks1/a_table-a72c8740a57611ec935db766a70c44a1");
+        assertThat(atableWithUUID.mkdirs());
+        File atableWithUUIDSnapshot = new File(atableWithUUID, "snapshots/a_snapshot");
+        assertThat(atableWithUUIDSnapshot.mkdirs());
+
+        assertThat(new File(atableWithUUID, "snapshots/a_snapshot/data.db").createNewFile());
+        assertThat(new File(atableWithUUID, "snapshots/a_snapshot/index.db").createNewFile());
+        assertThat(new File(atableWithUUID, "snapshots/a_snapshot/nb-203-big-TOC.txt").createNewFile());
+        assertThat(atableWithUUID.setLastModified(System.currentTimeMillis() + 2000000));
+
+        String expectedPath;
+        // a_table and a_table-<TABLE_UUID> - the latter should be picked
+        SnapshotPathBuilder newBuilder = new SnapshotPathBuilder(vertx.fileSystem(), mockInstancesConfig);
+        expectedPath = atableWithUUID.getAbsolutePath() + "/snapshots/a_snapshot/data.db";
+        succeedsWhenComponentExists(newBuilder
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks1",
+                                                                             "a_table",
+                                                                             "a_snapshot",
+                                                                             "data.db")),
+                                    expectedPath);
+        expectedPath = atableWithUUID.getAbsolutePath() + "/snapshots/a_snapshot/index.db";
+        succeedsWhenComponentExists(newBuilder
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks1",
+                                                                             "a_table",
+                                                                             "a_snapshot",
+                                                                             "index.db")),
+                                    expectedPath);
+        expectedPath = atableWithUUID.getAbsolutePath() + "/snapshots/a_snapshot/nb-203-big-TOC.txt";
+        succeedsWhenComponentExists(newBuilder
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks1",
+                                                                             "a_table",
+                                                                             "a_snapshot",
+                                                                             "nb-203-big-TOC.txt")),
+                                    expectedPath);
+    }
+
+    @Test
+    void testLastModifiedTablePicked() throws IOException, InterruptedException
+    {
+        TemporaryFolder tempFolder = new TemporaryFolder();
+        tempFolder.create();
+        File dataDir = tempFolder.newFolder("data");
+
+        InstancesConfig mockInstancesConfig = mock(InstancesConfig.class);
+        InstanceMetadata mockInstanceMeta = mock(InstanceMetadata.class);
+
+        when(mockInstancesConfig.instanceFromHost("localhost")).thenReturn(mockInstanceMeta);
+        when(mockInstanceMeta.dataDirs()).thenReturn(Arrays.asList(dataDir.getAbsolutePath()));
+
+        File table4Old = new File(dataDir, "data/ks4/table4-a6442310a57611ec8b980b0b2009844e1");
+        assertThat(table4Old.mkdirs());
+
+        // table was dropped and recreated. The table gets a new uuid
+        File table4OldSnapshot = new File(table4Old, "snapshots/this_is_a_valid_snapshot_name_i_❤_u");
+        assertThat(table4OldSnapshot.mkdirs());
+        // create some files inside snapshot this_is_a_valid_snapshot_name_i_❤_u in dataDir1
+        assertThat(new File(table4Old, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/data.db").createNewFile());
+        assertThat(new File(table4Old, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/index.db").createNewFile());
+        assertThat(new File(table4Old, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/nb-203-big-TOC.txt")
+                   .createNewFile());
+
+        File table4New = new File(dataDir, "data/ks4/table4-a72c8740a57611ec935db766a70c44a11");
+        assertThat(table4New.mkdirs());
+
+        File table4NewSnapshot = new File(table4New, "snapshots/this_is_a_valid_snapshot_name_i_❤_u");
+        assertThat(table4NewSnapshot.mkdirs());
+
+        assertThat(new File(table4New, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/data.db").createNewFile());
+        assertThat(new File(table4New, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/index.db").createNewFile());
+        assertThat(new File(table4New, "snapshots/this_is_a_valid_snapshot_name_i_❤_u/nb-203-big-TOC.txt")
+                   .createNewFile());
+        assertThat(table4New.setLastModified(System.currentTimeMillis() + 2000000));
+
+        String expectedPath;
+        SnapshotPathBuilder newBuilder = new SnapshotPathBuilder(vertx.fileSystem(), mockInstancesConfig);
+        // table4-a72c8740a57611ec935db766a70c44a1 is the last modified, so it is the correct directory
+        expectedPath = table4New.getAbsolutePath()
+                       + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/data.db";
+        succeedsWhenComponentExists(newBuilder
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks4",
+                                                                             "table4",
+                                                                             "this_is_a_valid_snapshot_name_i_❤_u",
+                                                                             "data.db")),
+                                    expectedPath);
+        expectedPath = table4New.getAbsolutePath()
+                       + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/index.db";
+        succeedsWhenComponentExists(newBuilder
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks4",
+                                                                             "table4",
+                                                                             "this_is_a_valid_snapshot_name_i_❤_u",
+                                                                             "index.db")),
+                                    expectedPath);
+        expectedPath = table4New.getAbsolutePath()
+                       + "/snapshots/this_is_a_valid_snapshot_name_i_❤_u/nb-203-big-TOC.txt";
+        succeedsWhenComponentExists(newBuilder
+                                    .build("localhost",
+                                           new StreamSSTableComponentRequest("ks4",
+                                                                             "table4",
+                                                                             "this_is_a_valid_snapshot_name_i_❤_u",
+                                                                             "nb-203-big-TOC.txt")),
+                                    expectedPath);
+    }
+
+    protected void succeedsWhenComponentExists(Future<String> future, String expectedPath)
+    {
+        VertxTestContext testContext = new VertxTestContext();
+        future.onComplete(testContext.succeedingThenComplete());
+        // awaitCompletion has the semantics of a java.util.concurrent.CountDownLatch
+        try
+        {
+            assertThat(testContext.awaitCompletion(5, TimeUnit.SECONDS)).isTrue();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        assertThat(testContext.failed()).isFalse();
+        // we use ends with here, because MacOS prepends the /private path for temporary directories
+        assertThat(future.result()).endsWith(expectedPath);
+    }
+
+    protected void failsWithFileNotFoundException(Future<String> future, String expectedMessage)
+    {
+        VertxTestContext testContext = new VertxTestContext();
+        future.onComplete(testContext.succeedingThenComplete());
+        // awaitCompletion has the semantics of a java.util.concurrent.CountDownLatch
+        try
+        {
+            assertThat(testContext.awaitCompletion(5, TimeUnit.SECONDS)).isTrue();
+        }
+        catch (InterruptedException e)
+        {
+            throw new RuntimeException(e);
+        }
+        assertThat(testContext.failed()).isTrue();
+        assertThat(testContext.causeOfFailure()).isInstanceOf(FileNotFoundException.class)
+                                                .returns(expectedMessage, from(Throwable::getMessage));
+    }
+}
diff --git a/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilderTest.java b/src/test/java/org/apache/cassandra/sidecar/snapshots/SnapshotPathBuilderTest.java
new file mode 100644 (file)
index 0000000..4cf8688
--- /dev/null
@@ -0,0 +1,16 @@
+package org.apache.cassandra.sidecar.snapshots;
+
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import io.vertx.core.Vertx;
+import io.vertx.junit5.VertxExtension;
+import org.apache.cassandra.sidecar.cluster.InstancesConfig;
+
+@ExtendWith(VertxExtension.class)
+class SnapshotPathBuilderTest extends AbstractSnapshotPathBuilderTest
+{
+    SnapshotPathBuilder initialize(Vertx vertx, InstancesConfig instancesConfig)
+    {
+        return new SnapshotPathBuilder(vertx.fileSystem(), instancesConfig);
+    }
+}