+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.cassandra.sidecar.utils;
-import java.io.File;
import java.time.Duration;
import java.time.Instant;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.MICROSECONDS;
import com.google.common.util.concurrent.SidecarRateLimiter;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.Inject;
import com.google.inject.Singleton;
+import io.vertx.core.Future;
+import io.vertx.core.Promise;
+import io.vertx.core.Vertx;
+import io.vertx.ext.web.handler.HttpException;
import org.apache.cassandra.sidecar.Configuration;
+import org.apache.cassandra.sidecar.exceptions.RangeException;
import org.apache.cassandra.sidecar.models.HttpResponse;
import org.apache.cassandra.sidecar.models.Range;
-import static java.util.concurrent.TimeUnit.MICROSECONDS;
+import static io.netty.handler.codec.http.HttpResponseStatus.REQUESTED_RANGE_NOT_SATISFIABLE;
+import static io.netty.handler.codec.http.HttpResponseStatus.TOO_MANY_REQUESTS;
/**
* General handler for serving files
public class FileStreamer
{
private static final Logger LOGGER = LoggerFactory.getLogger(FileStreamer.class);
- private static final ScheduledExecutorService SCHEDULER = Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder().setNameFormat("acquirePermit").setDaemon(true).build());
- private final Duration delay;
- private final Duration timeout;
+ private static final long DEFAULT_RATE_LIMIT_STREAM_REQUESTS_PER_SECOND = Long.MAX_VALUE;
+ private final Vertx vertx;
+ private final Configuration config;
private final SidecarRateLimiter rateLimiter;
@Inject
- public FileStreamer(Configuration config, SidecarRateLimiter rateLimiter)
+ public FileStreamer(Vertx vertx, Configuration config, SidecarRateLimiter rateLimiter)
{
+ this.vertx = vertx;
+ this.config = config;
this.rateLimiter = rateLimiter;
- this.delay = Duration.ofSeconds(config.getThrottleDelayInSeconds());
- this.timeout = Duration.ofSeconds(config.getThrottleTimeoutInSeconds());
}
- public void stream(final HttpResponse resp, final File file)
+ /**
+ * Streams the {@code filename file} with length {@code fileLength} for the (optionally) requested
+ * {@code rangeHeader} using the provided {@code response}.
+ *
+ * @param response the response to use
+ * @param filename the path to the file to serve
+ * @param fileLength the size of the file to serve
+ * @param rangeHeader (optional) a string representing the requested range for the file
+ * @return a future with the result of the streaming
+ */
+ public Future<Void> stream(HttpResponse response, String filename, long fileLength, String rangeHeader)
{
- stream(resp, file, new Range(0, file.length() - 1, file.length()));
+ return parseRangeHeader(rangeHeader, fileLength)
+ .compose(range -> stream(response, filename, fileLength, range));
}
- public void stream(final HttpResponse resp, final File file, final Range range)
+ /**
+ * Streams the {@code filename file} with length {@code fileLength} for the requested
+ * {@code range} using the provided {@code response}.
+ *
+ * @param response the response to use
+ * @param filename the path to the file to serve
+ * @param fileLength the size of the file to serve
+ * @param range the range to stream
+ * @return a future with the result of the streaming
+ */
+ public Future<Void> stream(HttpResponse response, String filename, long fileLength, Range range)
{
- if (!file.exists() || !file.isFile())
- {
- resp.setNotFoundStatus("File does not exist or it is not a normal file");
- return;
- }
- if (file.length() == 0)
- {
- resp.setBadRequestStatus("File is empty");
- return;
- }
- acquireAndSend(resp, file, range);
+ Promise<Void> promise = Promise.promise();
+ acquireAndSend(response, filename, fileLength, range, Instant.now(), promise);
+ return promise.future();
}
- private void acquireAndSend(HttpResponse response, File file, Range range)
+ /**
+ * Send the file if rate-limiting is disabled or when it successfully acquires a permit from the
+ * {@link SidecarRateLimiter}.
+ *
+ * @param response the response to use
+ * @param filename the path to the file to serve
+ * @param fileLength the size of the file to serve
+ * @param range the range to stream
+ * @param startTime the start time of this request
+ * @param promise a promise for the stream
+ */
+ private void acquireAndSend(HttpResponse response, String filename, long fileLength, Range range, Instant startTime,
+ Promise<Void> promise)
{
- acquireAndSend(response, file, range, Instant.now());
+ if (!isRateLimited() || acquire(response, filename, fileLength, range, startTime, promise))
+ {
+ // Stream data if rate limiting is disabled or if we acquire
+ LOGGER.info("Streaming range {} for file {} to client {}. Instance: {}", range, filename,
+ response.remoteAddress(), response.host());
+ response.sendFile(filename, fileLength, range)
+ .onSuccess(v ->
+ {
+ LOGGER.debug("Streamed file {} successfully to client {}. Instance: {}", filename,
+ response.remoteAddress(), response.host());
+ promise.complete();
+ })
+ .onFailure(promise::fail);
+ }
}
/**
- * If permit becomes available within a short time, retry immediately
+ * Acquires a permit from the {@link SidecarRateLimiter} if it can be acquired immediately without
+ * delay. Otherwise, it will retry acquiring the permit later in the future until it exhausts the
+ * retry timeout, in which case it will ask the client to retry later in the future.
+ *
+ * @param response the response to use
+ * @param filename the path to the file to serve
+ * @param fileLength the size of the file to serve
+ * @param range the range to stream
+ * @param startTime the start time of this request
+ * @param promise a promise for the stream
+ * @return {@code true} if the permit was acquired, {@code false} otherwise
*/
- private void acquireAndSend(HttpResponse response, File file, Range range, Instant startTime)
+ private boolean acquire(HttpResponse response, String filename, long fileLength, Range range, Instant startTime,
+ Promise<Void> promise)
{
- while (!rateLimiter.tryAcquire())
+ if (rateLimiter.tryAcquire())
+ return true;
+
+ long microsToWait;
+ if (checkRetriesExhausted(startTime))
+ {
+ LOGGER.error("Retries for acquiring permit exhausted for client {}. Instance: {}", response.remoteAddress(),
+ response.host());
+ promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Retry exhausted"));
+ }
+ else if ((microsToWait = rateLimiter.queryEarliestAvailable(0L))
+ < TimeUnit.SECONDS.toMicros(config.getThrottleDelayInSeconds()))
{
- if (checkRetriesExhausted(startTime))
- {
- LOGGER.error("Retries for acquiring permit exhausted!");
- response.setTooManyRequestsStatus();
- return;
- }
-
- final long microsToWait = rateLimiter.queryEarliestAvailable(0L);
- if (microsToWait <= 0) // immediately retry
- {
- continue;
- }
-
- if (TimeUnit.MICROSECONDS.toNanos(microsToWait) >= delay.getNano())
- {
- response.setRetryAfterHeader(microsToWait);
- }
- else
- {
- retryStreaming(response, file, range, startTime, microsToWait);
- }
- return;
+ microsToWait = Math.max(0, microsToWait);
+
+ LOGGER.debug("Retrying streaming after {} micros for client {}. Instance: {}", microsToWait,
+ response.remoteAddress(), response.host());
+ vertx.setTimer(MICROSECONDS.toMillis(microsToWait),
+ t -> acquireAndSend(response, filename, fileLength, range, startTime, promise));
}
- LOGGER.info("File {} streamed from path {}", file.getName(), file.getAbsolutePath());
- response.sendFile(file, range);
+ else
+ {
+ LOGGER.debug("Asking client {} to retry after {} micros. Instance: {}", response.remoteAddress(),
+ microsToWait, response.host());
+ response.setRetryAfterHeader(microsToWait);
+ promise.fail(new HttpException(TOO_MANY_REQUESTS.code(), "Ask client to retry later"));
+ }
+ return false;
+ }
+
+ /**
+ * @return true if this request is rate-limited, false otherwise
+ */
+ private boolean isRateLimited()
+ {
+ return config.getRateLimitStreamRequestsPerSecond() != DEFAULT_RATE_LIMIT_STREAM_REQUESTS_PER_SECOND;
}
+ /**
+ * @param startTime the request start time
+ * @return true if we exhausted the retries, false otherwise
+ */
private boolean checkRetriesExhausted(Instant startTime)
{
- return startTime.plus(timeout).isBefore(Instant.now());
+ return startTime.plus(Duration.ofSeconds(config.getThrottleTimeoutInSeconds()))
+ .isBefore(Instant.now());
}
- private void retryStreaming(HttpResponse response, File file, Range range, Instant startTime, long microsToSleep)
+ /**
+ * Returns the requested range for the request, or the entire range if {@code rangeHeader} is null
+ *
+ * @param rangeHeader The range header from the request
+ * @param fileLength The length of the file
+ * @return a succeeded future when the parsing is successful, a failed future when the range parsing fails
+ */
+ private Future<Range> parseRangeHeader(String rangeHeader, long fileLength)
{
- SCHEDULER.schedule(() -> acquireAndSend(response, file, range, startTime), microsToSleep, MICROSECONDS);
+ Range fr = Range.of(0, fileLength - 1);
+ if (rangeHeader == null)
+ return Future.succeededFuture(fr);
+
+ try
+ {
+ // sidecar does not support multiple ranges as of now
+ final Range hr = Range.parseHeader(rangeHeader, fileLength);
+ Range intersect = fr.intersect(hr);
+ LOGGER.debug("Calculated range {} for streaming", intersect);
+ return Future.succeededFuture(intersect);
+ }
+ catch (IllegalArgumentException | RangeException | UnsupportedOperationException e)
+ {
+ LOGGER.error(String.format("Failed to parse header '%s'", rangeHeader), e);
+ return Future.failedFuture(new HttpException(REQUESTED_RANGE_NOT_SATISFIABLE.code()));
+ }
}
}