[SSHD-882] Added hooks to allow users to capture and handle extended (STDERR) data...
authorLyor Goldstein <lgoldstein@apache.org>
Mon, 4 Feb 2019 13:25:05 +0000 (15:25 +0200)
committerLyor Goldstein <lgoldstein@apache.org>
Sun, 10 Feb 2019 06:31:13 +0000 (08:31 +0200)
CHANGES.md
README.md
sshd-common/src/main/java/org/apache/sshd/common/util/io/IoUtils.java
sshd-contrib/src/main/java/org/apache/sshd/common/util/io/LineOutputStream.java [new file with mode: 0644]
sshd-contrib/src/test/java/org/apache/sshd/common/util/io/LineOutputStreamTest.java [new file with mode: 0644]
sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelDataReceiver.java
sshd-core/src/main/java/org/apache/sshd/server/channel/ChannelSession.java
sshd-sftp/src/main/java/org/apache/sshd/server/subsystem/sftp/SftpSubsystem.java

index c1a02e4..e2221a6 100644 (file)
@@ -184,3 +184,8 @@ can be enabled/disabled via the `progress` command:
     ... set the progress marker indicator  ...
 
 ```
+
+# Since version 2.2.0
+
+* [SSHD-882](https://issues.apache.org/jira/browse/SSHD-882) - Provide hooks to allow users to register a consumer
+for STDERR data sent via the `ChannelSession` - especially for the SFTP subsystem.
index c5237a3..e193475 100644 (file)
--- a/README.md
+++ b/README.md
@@ -1267,6 +1267,20 @@ using a registered `SftpErrorStatusDataHandler`. The default implementation prov
 exception type. However, users may override it when creating the `SftpSubsystemFactory` and provide their own codes and/or messages - e.g.,
 for debugging one can register a `DetailedSftpErrorStatusDataHandler` (see `sshd-contrib`) that "leaks" more information in the generated message.
 
+If the registered handler implements `ChannelSessionAware` then it will also be informed of the registered `ChannelSession` when it is
+provided to the `SftpSubsystem` itself. This can be used to register an extended data writer that can handle data sent via the STDERR
+channel. **Note:** this feature is allowed according to [SFTP version 4 - section 3.1](https://tools.ietf.org/html/draft-ietf-secsh-filexfer-04#section-3.1):
+
+>> Packets are sent and received on stdout and stdin. Data sent on stderr by the server SHOULD be considered debug
+>> or supplemental error information, and MAY be displayed to the user.
+
+however, the current code provides no built-in support for this feature.
+
+If registering an extended data writer then one should take care of any race conditions that may occur where (extended) data
+may arrive before the handler is informed of the existence of the `ChannelSession`. For this purpose one should configure a
+reasonable buffer size by setting the `channel-session-max-extdata-bufsize` property. This way, if any data arrives before the
+extended data handler is registered it will be buffered (up to the specified max. size). **Note:** if a buffer size is configured
+but no extended data handler is registered when channel is spawning the command then an exception will occur.
 
 ## Port forwarding
 
index 40ef9b0..910a6a3 100644 (file)
@@ -149,6 +149,27 @@ public final class IoUtils {
     }
 
     /**
+     * Closes the specified {@link Closeable} resource
+     *
+     * @param c The resource to close - ignored if {@code null}
+     * @return The thrown {@link IOException} when {@code close()} was
+     * called - {@code null} if no exception was thrown (or no resource
+     * to close to begin with)
+     */
+    @SuppressWarnings("ThrowableResultOfMethodCallIgnored")
+    public static IOException closeQuietly(Closeable c) {
+        if (c != null) {
+            try {
+                c.close();
+            } catch (IOException e) {
+                return e;
+            }
+        }
+
+        return null;
+    }
+
+    /**
      * Closes a bunch of resources suppressing any {@link IOException}s their
      * {@link Closeable#close()} method may have thrown
      *
diff --git a/sshd-contrib/src/main/java/org/apache/sshd/common/util/io/LineOutputStream.java b/sshd-contrib/src/main/java/org/apache/sshd/common/util/io/LineOutputStream.java
new file mode 100644 (file)
index 0000000..b2e45d9
--- /dev/null
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.util.io;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+/**
+ * Calls the actual writing method only when LF detected in the written stream.
+ * <B>Note:</B> it strips CR if found before the LF
+ *
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+public abstract class LineOutputStream extends OutputStream {
+    protected final byte[] oneByte = new byte[1];
+    protected byte[] lineBuf;
+    protected int usedLen;
+
+    protected LineOutputStream() {
+        super();
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        oneByte[0] = (byte) (b & 0xff);
+        write(oneByte, 0, 1);
+    }
+
+    @Override
+    public void write(byte[] b) throws IOException {
+        write(b, 0, b.length);
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        int lastOffset = off;
+        int maxOffset = off + len;
+        for (int curOffset = off; curOffset < maxOffset; curOffset++) {
+            byte ch = b[curOffset];
+            if (ch != 0x0a) {
+                continue;
+            }
+
+            // Any previous line segment ?
+            if (usedLen > 0) {
+                accumulateLineData(b, lastOffset, curOffset - lastOffset);
+
+                // Strip CR
+                if (lineBuf[usedLen - 1] == 0x0d) {
+                    usedLen--;
+                }
+                handleLine(lineBuf, 0, usedLen);
+                usedLen = 0;
+            } else {
+                int lineLen = curOffset - lastOffset;
+                // Strip CR
+                if ((lineLen > 0) && (b[curOffset - 1] == 0x0d)) {
+                    lineLen--;
+                }
+                handleLine(b, lastOffset, lineLen);
+            }
+
+            lastOffset = curOffset + 1;
+        }
+
+        // any leftovers ?
+        if (lastOffset < maxOffset) {
+            accumulateLineData(b, lastOffset, maxOffset - lastOffset);
+        }
+    }
+
+    protected void accumulateLineData(byte[] b, int off, int len) throws IOException {
+        if (len <= 0) {
+            return;
+        }
+
+        int reqLen = usedLen + len;
+        if ((lineBuf == null) || (reqLen >= lineBuf.length)) {
+            byte[] tmp = new byte[reqLen + Byte.SIZE /* a bit extra to avoid frequent re-sizing */];
+            if (usedLen > 0) {
+                System.arraycopy(lineBuf, 0, tmp, 0, usedLen);
+            }
+            lineBuf = tmp;
+        }
+
+        System.arraycopy(b, off, lineBuf, usedLen, len);
+        usedLen += len;
+    }
+
+    protected abstract void handleLine(byte[] buf, int offset, int len) throws IOException;
+
+    @Override
+    public void close() throws IOException {
+        // Last line might not be LF terminated
+        if (usedLen > 0) {
+            // Strip CR
+            if (lineBuf[usedLen - 1] == 0x0d) {
+                usedLen--;
+            }
+
+            handleLine(lineBuf, 0, usedLen);
+            usedLen = 0;
+        }
+    }
+}
diff --git a/sshd-contrib/src/test/java/org/apache/sshd/common/util/io/LineOutputStreamTest.java b/sshd-contrib/src/test/java/org/apache/sshd/common/util/io/LineOutputStreamTest.java
new file mode 100644 (file)
index 0000000..c49f608
--- /dev/null
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.sshd.common.util.io;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.StreamCorruptedException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.sshd.util.test.JUnit4ClassRunnerWithParametersFactory;
+import org.apache.sshd.util.test.JUnitTestSupport;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.MethodSorters;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.junit.runners.Parameterized.UseParametersRunnerFactory;
+
+/**
+ * @author <a href="mailto:dev@mina.apache.org">Apache MINA SSHD Project</a>
+ */
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+@RunWith(Parameterized.class)   // see https://github.com/junit-team/junit/wiki/Parameterized-tests
+@UseParametersRunnerFactory(JUnit4ClassRunnerWithParametersFactory.class)
+public class LineOutputStreamTest extends JUnitTestSupport {
+    private final boolean withCR;
+
+    public LineOutputStreamTest(boolean withCR) {
+        this.withCR = withCR;
+    }
+
+    @Parameters(name = "CR={0}")
+    public static List<Object[]> parameters() {
+        return Arrays.asList(new Object[] {Boolean.TRUE}, new Object[] {Boolean.FALSE});
+    }
+
+    @Test
+    public void testLineParsing() throws IOException {
+        List<String> expected = new ArrayList<>();
+        String prefix = getClass().getName() + "#" + getCurrentTestName() + "-";
+        for (int index = 1; index < Byte.MAX_VALUE; index++) {
+            expected.add(prefix + index);
+        }
+
+        Path targetFile = getTargetRelativeFile(
+            getClass().getSimpleName(), getCurrentTestName() + "-" + (withCR ? "CR" : "LF") + ".txt");
+        Files.createDirectories(targetFile.getParent());
+        try (OutputStream fout = Files.newOutputStream(targetFile)) {
+            int lineCount = 0;
+            for (String l : expected) {
+                byte[] b = l.getBytes(StandardCharsets.UTF_8);
+                fout.write(b);
+                if (withCR) {
+                    fout.write(0x0d);
+                }
+                fout.write(0x0a);
+
+                lineCount++;
+                if ((lineCount & 0x03) == 0) {
+                    if (withCR) {
+                        fout.write(0x0d);
+                    }
+                    fout.write(0x0a);
+                }
+            }
+        }
+
+        List<String> actual = new ArrayList<>(expected.size());
+        try (InputStream fin = Files.newInputStream(targetFile);
+             OutputStream lout = new LineOutputStream() {
+                 private int lineCount;
+
+                 @Override
+                 protected void handleLine(byte[] buf, int offset, int len) throws IOException {
+                     lineCount++;
+                     if (len == 0) {
+                         return; // ignore empty lines
+                     }
+
+                     byte lastChar = buf[offset + len - 1];
+                     if ((lastChar == 0x0a) || (lastChar == 0x0d)) {
+                         throw new StreamCorruptedException("Invalid line ending at line #" + lineCount);
+                     }
+
+                     String l = new String(buf, offset, len, StandardCharsets.UTF_8);
+                     actual.add(l);
+                 }
+             }) {
+            IoUtils.copy(fin, lout);
+        }
+
+        assertListEquals(getCurrentTestName(), expected, actual);
+    }
+
+    @Override
+    public String toString() {
+        return getClass().getSimpleName() + "[withCR=" + withCR + "]";
+    }
+}
index 59ade13..f1cb21c 100644 (file)
@@ -43,7 +43,8 @@ import java.io.IOException;
 public interface ChannelDataReceiver extends Closeable {
     /**
      * <p>
-     * Called when the server receives additional bytes from the client.
+     * Called when the server receives additional bytes from the client. When {@link #close()}-d
+     * then indicates EOF - The client will no longer send us any more data.
      * </p>
      *
      * <p>
@@ -104,12 +105,4 @@ public interface ChannelDataReceiver extends Closeable {
      * @throws IOException if failed to consume the data
      */
     int data(ChannelSession channel, byte[] buf, int start, int len) throws IOException;
-
-    /**
-     * Called to indicate EOF. The client will no longer send us any more data.
-     *
-     * @throws IOException if failed
-     */
-    @Override
-    void close() throws IOException;
 }
index e54e0e4..ab57cb1 100644 (file)
@@ -83,7 +83,14 @@ import org.apache.sshd.server.x11.X11ForwardSupport;
  */
 public class ChannelSession extends AbstractServerChannel {
     public static final List<ChannelRequestHandler> DEFAULT_HANDLERS =
-            Collections.<ChannelRequestHandler>singletonList(PuttyRequestHandler.INSTANCE);
+        Collections.<ChannelRequestHandler>singletonList(PuttyRequestHandler.INSTANCE);
+
+    /**
+     * Maximum amount of extended (a.k.a. STDERR) data allowed to be accumulated
+     * until a {@link ChannelDataReceiver} for the data is registered
+     */
+    public static final String MAX_EXTDATA_BUFSIZE = "channel-session-max-extdata-bufsize";
+    public static final int DEFAULT_MAX_EXTDATA_BUFSIZE = 0;
 
     protected String type;
     protected ChannelAsyncOutputStream asyncOut;
@@ -92,7 +99,9 @@ public class ChannelSession extends AbstractServerChannel {
     protected OutputStream err;
     protected Command commandInstance;
     protected ChannelDataReceiver receiver;
-    protected Buffer tempBuffer;
+    protected ChannelDataReceiver extendedDataWriter;
+    protected Buffer receiverBuffer;
+    protected Buffer extendedDataBuffer;
     protected final AtomicBoolean commandStarted = new AtomicBoolean(false);
     protected final StandardEnvironment env = new StandardEnvironment();
     protected final CloseFuture commandExitFuture;
@@ -159,11 +168,11 @@ public class ChannelSession extends AbstractServerChannel {
             if (immediately || (commandInstance == null)) {
                 commandExitFuture.setClosed();
             } else if (!commandExitFuture.isClosed()) {
-                IOException e = IoUtils.closeQuietly(receiver);
+                IOException e = IoUtils.closeQuietly(receiver, extendedDataWriter);
                 boolean debugEnabled = log.isDebugEnabled();
                 if (e != null) {
                     if (debugEnabled) {
-                        log.debug("close({})[immediately={}] failed ({}) to close receiver: {}",
+                        log.debug("close({})[immediately={}] failed ({}) to close receiver(s): {}",
                               this, immediately, e.getClass().getSimpleName(), e.getMessage());
                     }
                 }
@@ -208,7 +217,7 @@ public class ChannelSession extends AbstractServerChannel {
             }
         }
 
-        IOException e = IoUtils.closeQuietly(getRemoteWindow(), out, err, receiver);
+        IOException e = IoUtils.closeQuietly(getRemoteWindow(), out, err, receiver, extendedDataWriter);
         if (e != null) {
             if (debugEnabled) {
                 log.debug("doCloseImmediately({}) failed ({}) to close resources: {}",
@@ -230,10 +239,10 @@ public class ChannelSession extends AbstractServerChannel {
     public void handleEof() throws IOException {
         super.handleEof();
 
-        IOException e = IoUtils.closeQuietly(receiver);
+        IOException e = IoUtils.closeQuietly(receiver, extendedDataWriter);
         if (e != null) {
             if (log.isDebugEnabled()) {
-                log.debug("handleEof({}) failed ({}) to close receiver: {}",
+                log.debug("handleEof({}) failed ({}) to close receiver(s): {}",
                       this, e.getClass().getSimpleName(), e.getMessage());
             }
 
@@ -251,24 +260,47 @@ public class ChannelSession extends AbstractServerChannel {
         }
         ValidateUtils.checkTrue(len <= Integer.MAX_VALUE, "Data length exceeds int boundaries: %d", len);
 
+        int reqLen = (int) len;
         if (receiver != null) {
-            int r = receiver.data(this, data, off, (int) len);
+            int r = receiver.data(this, data, off, reqLen);
             if (r > 0) {
                 Window wLocal = getLocalWindow();
                 wLocal.consumeAndCheck(r);
             }
         } else {
             ValidateUtils.checkTrue(len <= (Integer.MAX_VALUE - Long.SIZE), "Temporary data length exceeds int boundaries: %d", len);
-            if (tempBuffer == null) {
-                tempBuffer = new ByteArrayBuffer((int) len + Long.SIZE, false);
+            if (receiverBuffer == null) {
+                receiverBuffer = new ByteArrayBuffer(reqLen + Long.SIZE, false);
             }
-            tempBuffer.putRawBytes(data, off, (int) len);
+            receiverBuffer.putRawBytes(data, off, reqLen);
         }
     }
 
     @Override
     protected void doWriteExtendedData(byte[] data, int off, long len) throws IOException {
-        throw new UnsupportedOperationException("Server channel does not support extended data");
+        ValidateUtils.checkTrue(len <= (Integer.MAX_VALUE - Long.SIZE), "Extended data length exceeds int boundaries: %d", len);
+
+        if (extendedDataWriter != null) {
+            extendedDataWriter.data(this, data, off, (int) len);
+            return;
+        }
+
+        int reqSize = (int) len;
+        int maxBufSize = PropertyResolverUtils.getIntProperty(this, MAX_EXTDATA_BUFSIZE, DEFAULT_MAX_EXTDATA_BUFSIZE);
+        int curBufSize = (extendedDataBuffer == null) ? 0 : extendedDataBuffer.available();
+        int totalSize = curBufSize + reqSize;
+        if (totalSize > maxBufSize) {
+            if ((curBufSize <= 0) && (maxBufSize <= 0)) {
+                throw new UnsupportedOperationException("Session channel does not support extended data");
+            }
+
+            throw new IndexOutOfBoundsException("Extended data buffer size (" + maxBufSize + ") exceeded");
+        }
+
+        if (extendedDataBuffer == null) {
+            extendedDataBuffer = new ByteArrayBuffer(totalSize + Long.SIZE, false);
+        }
+        extendedDataBuffer.putRawBytes(data, off, reqSize);
     }
 
     @Override
@@ -616,6 +648,21 @@ public class ChannelSession extends AbstractServerChannel {
     }
 
     /**
+     * A special {@link ChannelDataReceiver} that can be used to receive
+     * data sent as &quot;extended&quot; - usually STDERR. <B>Note:</B> by
+     * default any such data sent to the channel session causes an exception,
+     * but specific implementations may choose to register such a receiver
+     * (e.g., for custom usage of the STDERR stream). A good place in the
+     * code to register such a writer would be in commands that also
+     * implement {@code ChannelSessionAware}.
+     *
+     * @param extendedDataWriter The {@link ChannelDataReceiver}.
+     */
+    public void setExtendedDataWriter(ChannelDataReceiver extendedDataWriter) {
+        this.extendedDataWriter = extendedDataWriter;
+    }
+
+    /**
      * Called by {@link #prepareChannelCommand(String, Command)} in order to set
      * up the command's streams, session, file-system, exit callback, etc..
      *
@@ -666,6 +713,7 @@ public class ChannelSession extends AbstractServerChannel {
             command.setOutputStream(out);
             command.setErrorStream(err);
         }
+
         if (this.receiver == null) {
             // if the command hasn't installed any ChannelDataReceiver, install the default
             // and give the command an InputStream
@@ -679,11 +727,23 @@ public class ChannelSession extends AbstractServerChannel {
                 command.setInputStream(recv.getIn());
             }
         }
-        if (tempBuffer != null) {
-            Buffer buffer = tempBuffer;
-            tempBuffer = null;
+
+        if (receiverBuffer != null) {
+            Buffer buffer = receiverBuffer;
+            receiverBuffer = null;
             doWriteData(buffer.array(), buffer.rpos(), buffer.available());
         }
+
+        if (extendedDataBuffer != null) {
+            if (extendedDataWriter == null) {
+                throw new UnsupportedOperationException("No extended data writer available though " + extendedDataBuffer.available() + " bytes accumulated");
+            }
+
+            Buffer buffer = extendedDataBuffer;
+            extendedDataBuffer = null;
+            doWriteExtendedData(buffer.array(), buffer.rpos(), buffer.available());
+        }
+
         command.setExitCallback((exitValue, exitMessage) -> {
             try {
                 closeShell(exitValue);
index e7f1538..802b368 100644 (file)
@@ -219,6 +219,11 @@ public class SftpSubsystem
     public void setChannelSession(ChannelSession session) {
         this.channelSession = session;
         session.setDataReceiver(this);
+
+        SftpErrorStatusDataHandler errHandler = getErrorStatusDataHandler();
+        if (errHandler instanceof ChannelSessionAware) {
+            ((ChannelSessionAware) errHandler).setChannelSession(session);
+        }
     }
 
     @Override