GEODE-10300: C++ native client: Allow locator responses greater than … (#970) develop
authorAlberto Gomez <alberto.gomez@est.tech>
Thu, 18 Aug 2022 07:39:47 +0000 (09:39 +0200)
committerGitHub <noreply@github.com>
Thu, 18 Aug 2022 07:39:47 +0000 (09:39 +0200)
* GEODE-10300: Fix locator response size limit in C++ client

If a response message from the locator to the C++ native client
is longer than 3000 bytes the C++ native client will only
read the first 3000 bytes.

* GEODE-10300: Updated after review

* GEODE-10300: Updated after review

* GEODE-10300: Updated after another review

* GEODE-10300: Updated after some more reviews

* GEODE-10300: Some more changes after review.

* GEODE-10300: Small change after review

* GEODE-10300: Remove unneeded space

13 files changed:
cppcache/include/geode/DataInput.hpp
cppcache/src/Connector.hpp
cppcache/src/DataInput.cpp
cppcache/src/GetAllServersResponse.cpp
cppcache/src/GetAllServersResponse.hpp
cppcache/src/StreamDataInput.cpp [new file with mode: 0644]
cppcache/src/StreamDataInput.hpp [new file with mode: 0644]
cppcache/src/TcpConn.cpp
cppcache/src/TcpConn.hpp
cppcache/src/ThinClientLocatorHelper.cpp
cppcache/test/CMakeLists.txt
cppcache/test/StreamDataInputTest.cpp [new file with mode: 0644]
cppcache/test/mock/ConnectorMock.hpp [new file with mode: 0644]

index 23a771af8d57192ad314d9b04add0a4ca2f1e3db..fde6c3aea13ac885d44fb692345790455e090a03 100644 (file)
@@ -73,7 +73,7 @@ class APACHE_GEODE_EXPORT DataInput {
    */
   inline bool readBoolean() {
     _GEODE_CHECK_BUFFER_SIZE(1);
-    return *(m_buf++) == 1 ? true : false;
+    return *(buffer_++) == 1 ? true : false;
   }
 
   /**
@@ -89,8 +89,8 @@ class APACHE_GEODE_EXPORT DataInput {
   inline void readBytesOnly(uint8_t* buffer, size_t len) {
     if (len > 0) {
       _GEODE_CHECK_BUFFER_SIZE(len);
-      std::memcpy(buffer, m_buf, len);
-      m_buf += len;
+      std::memcpy(buffer, buffer_, len);
+      buffer_ += len;
     }
   }
 
@@ -107,8 +107,8 @@ class APACHE_GEODE_EXPORT DataInput {
   inline void readBytesOnly(int8_t* buffer, size_t len) {
     if (len > 0) {
       _GEODE_CHECK_BUFFER_SIZE(len);
-      std::memcpy(buffer, m_buf, len);
-      m_buf += len;
+      std::memcpy(buffer, buffer_, len);
+      buffer_ += len;
     }
   }
 
@@ -129,8 +129,8 @@ class APACHE_GEODE_EXPORT DataInput {
     if (length > 0) {
       _GEODE_CHECK_BUFFER_SIZE(length);
       _GEODE_NEW(buffer, uint8_t[length]);
-      std::memcpy(buffer, m_buf, length);
-      m_buf += length;
+      std::memcpy(buffer, buffer_, length);
+      buffer_ += length;
     }
     *bytes = buffer;
   }
@@ -152,8 +152,8 @@ class APACHE_GEODE_EXPORT DataInput {
     if (length > 0) {
       _GEODE_CHECK_BUFFER_SIZE(length);
       _GEODE_NEW(buffer, int8_t[length]);
-      std::memcpy(buffer, m_buf, length);
-      m_buf += length;
+      std::memcpy(buffer, buffer_, length);
+      buffer_ += length;
     }
     *bytes = buffer;
   }
@@ -173,10 +173,10 @@ class APACHE_GEODE_EXPORT DataInput {
    */
   inline int32_t readInt32() {
     _GEODE_CHECK_BUFFER_SIZE(4);
-    int32_t tmp = *(m_buf++);
-    tmp = (tmp << 8) | *(m_buf++);
-    tmp = (tmp << 8) | *(m_buf++);
-    tmp = (tmp << 8) | *(m_buf++);
+    int32_t tmp = *(buffer_++);
+    tmp = (tmp << 8) | *(buffer_++);
+    tmp = (tmp << 8) | *(buffer_++);
+    tmp = (tmp << 8) | *(buffer_++);
     return tmp;
   }
 
@@ -186,14 +186,14 @@ class APACHE_GEODE_EXPORT DataInput {
   inline int64_t readInt64() {
     _GEODE_CHECK_BUFFER_SIZE(8);
     int64_t tmp;
-    tmp = *(m_buf++);
-    tmp = (tmp << 8) | *(m_buf++);
-    tmp = (tmp << 8) | *(m_buf++);
-    tmp = (tmp << 8) | *(m_buf++);
-    tmp = (tmp << 8) | *(m_buf++);
-    tmp = (tmp << 8) | *(m_buf++);
-    tmp = (tmp << 8) | *(m_buf++);
-    tmp = (tmp << 8) | *(m_buf++);
+    tmp = *(buffer_++);
+    tmp = (tmp << 8) | *(buffer_++);
+    tmp = (tmp << 8) | *(buffer_++);
+    tmp = (tmp << 8) | *(buffer_++);
+    tmp = (tmp << 8) | *(buffer_++);
+    tmp = (tmp << 8) | *(buffer_++);
+    tmp = (tmp << 8) | *(buffer_++);
+    tmp = (tmp << 8) | *(buffer_++);
     return tmp;
   }
 
@@ -396,33 +396,33 @@ class APACHE_GEODE_EXPORT DataInput {
    * as readonly and modification of contents using this internal pointer
    * has undefined behavior.
    */
-  inline const uint8_t* currentBufferPosition() const { return m_buf; }
+  inline const uint8_t* currentBufferPosition() const { return buffer_; }
 
   /** get the number of bytes read in the buffer */
-  inline size_t getBytesRead() const { return m_buf - m_bufHead; }
+  inline size_t getBytesRead() const { return buffer_ - bufferHead_; }
 
   /** get the number of bytes remaining to be read in the buffer */
   inline size_t getBytesRemaining() const {
-    return (m_bufLength - getBytesRead());
+    return (bufferLength_ - getBytesRead());
   }
 
   /** advance the cursor by given offset */
-  inline void advanceCursor(size_t offset) { m_buf += offset; }
+  inline void advanceCursor(size_t offset) { buffer_ += offset; }
 
   /** rewind the cursor by given offset */
-  inline void rewindCursor(size_t offset) { m_buf -= offset; }
+  inline void rewindCursor(size_t offset) { buffer_ -= offset; }
 
   /** reset the cursor to the start of buffer */
-  inline void reset() { m_buf = m_bufHead; }
+  inline void reset() { buffer_ = bufferHead_; }
 
   inline void setBuffer() {
-    m_buf = currentBufferPosition();
-    m_bufLength = getBytesRemaining();
+    buffer_ = currentBufferPosition();
+    bufferLength_ = getBytesRemaining();
   }
 
-  inline void resetPdx(size_t offset) { m_buf = m_bufHead + offset; }
+  inline void resetPdx(size_t offset) { buffer_ = bufferHead_ + offset; }
 
-  inline size_t getPdxBytes() const { return m_bufLength; }
+  inline size_t getPdxBytes() const { return bufferLength_; }
 
   static uint8_t* getBufferCopy(const uint8_t* from, size_t length) {
     uint8_t* result;
@@ -432,7 +432,7 @@ class APACHE_GEODE_EXPORT DataInput {
     return result;
   }
 
-  inline void reset(size_t offset) { m_buf = m_bufHead + offset; }
+  inline void reset(size_t offset) { buffer_ = bufferHead_ + offset; }
 
   uint8_t* getBufferCopyFrom(const uint8_t* from, size_t length) {
     uint8_t* result;
@@ -452,6 +452,12 @@ class APACHE_GEODE_EXPORT DataInput {
   DataInput& operator=(DataInput&&) = default;
 
  protected:
+  const uint8_t* buffer_;
+  const uint8_t* bufferHead_;
+  size_t bufferLength_;
+  Pool* pool_;
+  const CacheImpl* cache_;
+
   /** constructor given a pre-allocated byte array with size */
   DataInput(const uint8_t* buffer, size_t len, const CacheImpl* cache,
             Pool* pool);
@@ -459,12 +465,6 @@ class APACHE_GEODE_EXPORT DataInput {
   virtual const SerializationRegistry& getSerializationRegistry() const;
 
  private:
-  const uint8_t* m_buf;
-  const uint8_t* m_bufHead;
-  size_t m_bufLength;
-  Pool* m_pool;
-  const CacheImpl* m_cache;
-
   std::shared_ptr<Serializable> readObjectInternal(int8_t typeId = -1);
 
   template <typename mType>
@@ -502,21 +502,21 @@ class APACHE_GEODE_EXPORT DataInput {
 
   inline char readPdxChar() { return static_cast<char>(readInt16()); }
 
-  inline void _checkBufferSize(size_t size, int32_t line) {
-    if ((m_bufLength - (m_buf - m_bufHead)) < size) {
+  virtual void _checkBufferSize(size_t size, int32_t line) {
+    if ((bufferLength_ - (buffer_ - bufferHead_)) < size) {
       throw OutOfRangeException(
           "DataInput: attempt to read beyond buffer at line " +
           std::to_string(line) + ": available buffer size " +
-          std::to_string(m_bufLength - (m_buf - m_bufHead)) +
+          std::to_string(bufferLength_ - (buffer_ - bufferHead_)) +
           ", attempted read of size " + std::to_string(size));
     }
   }
 
-  inline int8_t readNoCheck() { return *(m_buf++); }
+  inline int8_t readNoCheck() { return *(buffer_++); }
 
   inline int16_t readInt16NoCheck() {
-    int16_t tmp = *(m_buf++);
-    tmp = static_cast<int16_t>((tmp << 8) | *(m_buf++));
+    int16_t tmp = *(buffer_++);
+    tmp = static_cast<int16_t>((tmp << 8) | *(buffer_++));
     return tmp;
   }
 
@@ -605,7 +605,7 @@ class APACHE_GEODE_EXPORT DataInput {
     value.assign(reinterpret_cast<const wchar_t*>(tmp.data()), tmp.length());
   }
 
-  Pool* getPool() const { return m_pool; }
+  Pool* getPool() const { return pool_; }
 
   friend Cache;
   friend CacheImpl;
index 98ce3d46941ba4f047b2254dda75e9724b7a98f1..f13042b3cde4ab7c8c68f597e55bba2fff4ff810 100644 (file)
@@ -116,6 +116,11 @@ class Connector {
    */
   virtual uint16_t getPort() = 0;
 
+  /**
+   * Returns the remote endpoint for this connection in the form host:port
+   */
+  virtual std::string getRemoteEndpoint() = 0;
+
   /**
    * Writes an array of a known size to the underlying output stream.
    *
index 3590dd917e3424ea63ff8b8fde1e91b612b8ce7d..0a1a07061c924f24eb9a11cf5752eab0ebcd7878 100644 (file)
@@ -30,21 +30,21 @@ namespace client {
 
 DataInput::DataInput(const uint8_t* buffer, size_t len, const CacheImpl* cache,
                      Pool* pool)
-    : m_buf(buffer),
-      m_bufHead(buffer),
-      m_bufLength(len),
-      m_pool(pool),
-      m_cache(cache) {}
+    : buffer_(buffer),
+      bufferHead_(buffer),
+      bufferLength_(len),
+      pool_(pool),
+      cache_(cache) {}
 
 std::shared_ptr<Serializable> DataInput::readObjectInternal(int8_t typeId) {
   return getSerializationRegistry().deserialize(*this, typeId);
 }
 
 const SerializationRegistry& DataInput::getSerializationRegistry() const {
-  return *m_cache->getSerializationRegistry();
+  return *cache_->getSerializationRegistry();
 }
 
-Cache* DataInput::getCache() const { return m_cache->getCache(); }
+Cache* DataInput::getCache() const { return cache_->getCache(); }
 
 template <class _Traits, class _Allocator>
 void DataInput::readJavaModifiedUtf8(
@@ -63,7 +63,7 @@ void DataInput::readJavaModifiedUtf8(
   uint16_t length = readInt16();
   _GEODE_CHECK_BUFFER_SIZE(length);
   value = internal::JavaModifiedUtf8::decode(
-      reinterpret_cast<const char*>(m_buf), length);
+      reinterpret_cast<const char*>(buffer_), length);
   advanceCursor(length);
 }
 template APACHE_GEODE_EXPLICIT_TEMPLATE_EXPORT void
index 44476348814f5d241c797ec88404ddacb0975634..8a7597e2377d82eb2122d98bd91461a799047af9 100644 (file)
@@ -21,10 +21,10 @@ namespace geode {
 namespace client {
 
 void GetAllServersResponse::toData(DataOutput& output) const {
-  int32_t numServers = static_cast<int32_t>(m_servers.size());
-  output.writeInt(numServers);
-  for (int32_t i = 0; i < numServers; i++) {
-    output.writeObject(m_servers.at(i));
+  auto numServers = servers_.size();
+  output.writeInt(static_cast<int32_t>(numServers));
+  for (unsigned int i = 0; i < numServers; i++) {
+    output.writeObject(servers_.at(i));
   }
 }
 void GetAllServersResponse::fromData(DataInput& input) {
@@ -33,7 +33,7 @@ void GetAllServersResponse::fromData(DataInput& input) {
   for (int i = 0; i < numServers; i++) {
     std::shared_ptr<ServerLocation> sLoc = std::make_shared<ServerLocation>();
     sLoc->fromData(input);
-    m_servers.push_back(sLoc);
+    servers_.push_back(sLoc);
   }
 }
 
index 73bf83e12d0f4996880167b580208f4e98eff6dc..c010716077a21238563f5c0abf8ec7c974bf696c 100644 (file)
@@ -35,23 +35,27 @@ namespace client {
 
 class GetAllServersResponse : public internal::DataSerializableFixedId_t<
                                   internal::DSFid::GetAllServersResponse> {
-  std::vector<std::shared_ptr<ServerLocation> > m_servers;
-
  public:
   static std::shared_ptr<Serializable> create() {
     return std::make_shared<GetAllServersResponse>();
   }
   GetAllServersResponse() : Serializable() {}
+  explicit GetAllServersResponse(
+      std::vector<std::shared_ptr<ServerLocation> > servers)
+      : Serializable(), servers_(servers) {}
   void toData(DataOutput& output) const override;
   void fromData(DataInput& input) override;
 
   size_t objectSize() const override {
-    return sizeof(GetAllServersResponse) + m_servers.capacity();
+    return sizeof(GetAllServersResponse) + servers_.capacity();
   }
   std::vector<std::shared_ptr<ServerLocation> > getServers() {
-    return m_servers;
+    return servers_;
   }
   ~GetAllServersResponse() override = default;
+
+ private:
+  std::vector<std::shared_ptr<ServerLocation> > servers_;
 };
 
 }  // namespace client
diff --git a/cppcache/src/StreamDataInput.cpp b/cppcache/src/StreamDataInput.cpp
new file mode 100644 (file)
index 0000000..3944689
--- /dev/null
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StreamDataInput.hpp"
+
+#include <geode/DataInput.hpp>
+
+#include "Utils.hpp"
+#include "util/Log.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+constexpr size_t kBufferSize = 3000;
+
+StreamDataInput::StreamDataInput(std::chrono::milliseconds timeout,
+                                 std::unique_ptr<Connector> connector,
+                                 const CacheImpl* cache, Pool* pool)
+    : DataInput(nullptr, 0, cache, pool),
+      connector_(std::move(connector)),
+      remainingTimeBeforeTimeout_(timeout) {}
+
+void StreamDataInput::readDataIfNotAvailable(size_t size) {
+  char buff[kBufferSize];
+  while (getBytesRemaining() < size) {
+    const auto start = std::chrono::system_clock::now();
+
+    const auto receivedLength = connector_->receive_nothrowiftimeout(
+        buff, kBufferSize, remainingTimeBeforeTimeout_);
+
+    const auto timeSpent = std::chrono::system_clock::now() - start;
+
+    remainingTimeBeforeTimeout_ -=
+        std::chrono::duration_cast<decltype(remainingTimeBeforeTimeout_)>(
+            timeSpent);
+
+    LOGDEBUG(
+        "received %d bytes from %s: %s, time spent: "
+        "%ld millisecs, time remaining before timeout: %ld millisecs",
+        receivedLength, connector_->getRemoteEndpoint().c_str(),
+        Utils::convertBytesToString(reinterpret_cast<uint8_t*>(buff),
+                                    receivedLength)
+            .c_str(),
+        std::chrono::duration_cast<std::chrono::milliseconds>(timeSpent)
+            .count(),
+        remainingTimeBeforeTimeout_.count());
+
+    if (remainingTimeBeforeTimeout_ <= std::chrono::milliseconds::zero()) {
+      throw(TimeoutException(std::string("Timeout when receiving from ")
+                                 .append(connector_->getRemoteEndpoint())));
+    }
+
+    auto newLength = bufferLength_ + receivedLength;
+    auto currentPosition = getBytesRead();
+    streamBuf_.resize(newLength);
+    memcpy(streamBuf_.data() + bufferLength_, buff, receivedLength);
+
+    bufferHead_ = streamBuf_.data();
+    buffer_ = bufferHead_ + currentPosition;
+    bufferLength_ = newLength;
+  }
+}
+
+}  // namespace client
+}  // namespace geode
+}  // namespace apache
diff --git a/cppcache/src/StreamDataInput.hpp b/cppcache/src/StreamDataInput.hpp
new file mode 100644 (file)
index 0000000..e926697
--- /dev/null
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#ifndef GEODE_STREAMDATAINPUT_H_
+#define GEODE_STREAMDATAINPUT_H_
+
+#include <chrono>
+
+#include "Connector.hpp"
+#include "geode/DataInput.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+class Connector;
+
+/**
+ * Provides the same functionality as its parent class but
+ * data is retrieved, instead of from a passed buffer,
+ * from a socket connection.
+ */
+class APACHE_GEODE_EXPORT StreamDataInput : public DataInput {
+ public:
+  StreamDataInput(std::chrono::milliseconds timeout,
+                  std::unique_ptr<Connector> connector, const CacheImpl* cache,
+                  Pool* pool);
+
+ protected:
+  void _checkBufferSize(size_t size, int32_t /* line */) override {
+    readDataIfNotAvailable(size);
+  }
+
+  void readDataIfNotAvailable(size_t size);
+
+ private:
+  std::unique_ptr<Connector> connector_;
+  std::chrono::milliseconds remainingTimeBeforeTimeout_;
+  std::vector<uint8_t> streamBuf_;
+};
+}  // namespace client
+}  // namespace geode
+}  // namespace apache
+
+#endif  // GEODE_STREAMDATAINPUT_H_
index e2798705ebb97e357e3fcc8141a6fd3de0d1fceb..1c0e916e57b392b21be48548db0e6298e53a188d 100644 (file)
@@ -17,7 +17,6 @@
 
 #include "TcpConn.hpp"
 
-#include <iomanip>
 #include <iostream>
 
 #include <boost/optional.hpp>
@@ -293,6 +292,11 @@ size_t TcpConn::send(const char *buff, const size_t len,
 //  Return the local port for this TCP connection.
 uint16_t TcpConn::getPort() { return socket_.local_endpoint().port(); }
 
+std::string TcpConn::getRemoteEndpoint() {
+  return socket_.remote_endpoint().address().to_string().append(":").append(
+      std::to_string(socket_.remote_endpoint().port()));
+}
+
 void TcpConn::connect(boost::asio::ip::tcp::resolver::results_type r,
                       std::chrono::microseconds timeout) {
   boost::optional<boost::system::error_code> connect_result;
index 028214fa9b9ee0d83cb3549d29180e9f2e77e56a..9cbec7bbc496c6be192af597f572f5d26d3a355e 100644 (file)
@@ -38,6 +38,8 @@ class TcpConn : public Connector {
 
   uint16_t getPort() override final;
 
+  std::string getRemoteEndpoint() override final;
+
  protected:
   boost::asio::io_context io_context_;
   boost::asio::ip::tcp::socket socket_;
index d896d6561eaf28fabf8864ea78e0e5e7c2d04729..4f6bb3963ee270b2520ec564b54e26d9a2cea441 100644 (file)
@@ -22,8 +22,6 @@
 
 #include <boost/thread/lock_types.hpp>
 
-#include <geode/DataInput.hpp>
-#include <geode/DataOutput.hpp>
 #include <geode/SystemProperties.hpp>
 
 #include "CacheImpl.hpp"
@@ -35,6 +33,7 @@
 #include "LocatorListResponse.hpp"
 #include "QueueConnectionRequest.hpp"
 #include "QueueConnectionResponse.hpp"
+#include "StreamDataInput.hpp"
 #include "TcpConn.hpp"
 #include "TcpSslConn.hpp"
 #include "TcrConnectionManager.hpp"
@@ -47,7 +46,6 @@ namespace apache {
 namespace geode {
 namespace client {
 
-const size_t BUFF_SIZE = 3000;
 const size_t DEFAULT_CONNECTION_RETRIES = 3;
 
 ThinClientLocatorHelper::ThinClientLocatorHelper(
@@ -143,20 +141,10 @@ std::shared_ptr<Serializable> ThinClientLocatorHelper::sendRequest(
     if (sentLength <= 0) {
       return nullptr;
     }
-    char buff[BUFF_SIZE];
-    const auto receivedLength = conn->receive(buff, m_poolDM->getReadTimeout());
-    if (!receivedLength) {
-      return nullptr;
-    }
-
-    LOGDEBUG("%s(%p): received %d bytes from locator: %s", __GNFN__, this,
-             receivedLength,
-             Utils::convertBytesToString(reinterpret_cast<uint8_t*>(buff),
-                                         receivedLength)
-                 .c_str());
 
-    auto di = m_poolDM->getConnectionManager().getCacheImpl()->createDataInput(
-        reinterpret_cast<uint8_t*>(buff), receivedLength);
+    StreamDataInput di(m_poolDM->getReadTimeout(), std::move(conn),
+                       m_poolDM->getConnectionManager().getCacheImpl(),
+                       nullptr);
 
     if (di.read() == REPLY_SSL_ENABLED && !sys_prop.sslEnabled()) {
       LOGERROR(
index dd1577d174b5723fb069c78c8688f2e2355d4fbb..501af1c2f5b96eee2e136a2da8c12ef78f300ce2 100644 (file)
@@ -53,6 +53,7 @@ add_executable(apache-geode_unittests
   QueueConnectionRequestTest.cpp
   RegionAttributesFactoryTest.cpp
   SerializableCreateTests.cpp
+  StreamDataInputTest.cpp
   StringPrefixPartitionResolverTest.cpp
   StructSetTest.cpp
   TcrConnectionTest.cpp
diff --git a/cppcache/test/StreamDataInputTest.cpp b/cppcache/test/StreamDataInputTest.cpp
new file mode 100644 (file)
index 0000000..c115e53
--- /dev/null
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-matchers.h>
+
+#include <gtest/gtest.h>
+
+#include "CacheImpl.hpp"
+#include "Connector.hpp"
+#include "GetAllServersResponse.hpp"
+#include "ServerLocation.hpp"
+#include "StreamDataInput.hpp"
+#include "geode/DataOutput.hpp"
+#include "mock/ConnectorMock.hpp"
+
+namespace {
+
+using apache::geode::client::CacheImpl;
+using apache::geode::client::Connector;
+using apache::geode::client::ConnectorMock;
+using apache::geode::client::DataOutput;
+using apache::geode::client::GetAllServersResponse;
+using apache::geode::client::Serializable;
+using apache::geode::client::ServerLocation;
+using apache::geode::client::StreamDataInput;
+using apache::geode::client::TimeoutException;
+
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::Eq;
+using ::testing::Return;
+using ::testing::SetArrayArgument;
+using ::testing::SizeIs;
+
+constexpr size_t kReadBuffSize = 3000;
+constexpr size_t kStreamBufferSize = 10000;
+
+ACTION_P(WaitMs, milliseconds) {
+  std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds));
+  return 0;
+}
+
+TEST(StreamDataInputTest, ObjectSizeGreaterThanReadBufferSize) {
+  std::unique_ptr<ConnectorMock> connector =
+      std::unique_ptr<ConnectorMock>(new ConnectorMock());
+
+  unsigned int numServers = 100;
+  std::vector<std::shared_ptr<ServerLocation> > servers(numServers);
+
+  for (unsigned int i = 0; i < numServers; i++) {
+    servers[i] = std::make_shared<ServerLocation>(
+        std::string("this.is.a.quite.long.hostname.and.the.reason.is.that.it."
+                    "is.used.for.testing:") += std::to_string(2000 + i));
+  }
+
+  GetAllServersResponse getAllServersResponse(servers);
+
+  auto cache =
+      std::make_shared<CacheImpl>(nullptr, nullptr, false, false, nullptr);
+
+  auto dataOutput = cache->createDataOutput();
+
+  getAllServersResponse.toData(dataOutput);
+
+  auto buffer = dataOutput.getBuffer();
+  auto dataOutputBufferLength = dataOutput.getBufferLength();
+
+  // Gossip header
+  uint8_t streamBuffer[kStreamBufferSize];
+  streamBuffer[0] = 1;
+  streamBuffer[1] = 0xd6;
+  memcpy(streamBuffer + 2, buffer, dataOutputBufferLength);
+
+  auto streamBufferLength = dataOutputBufferLength + 2;
+
+  auto timeout = std::chrono::milliseconds(1000);
+  EXPECT_CALL(*connector, getRemoteEndpoint())
+      .WillRepeatedly(Return("locator:9999"));
+
+  EXPECT_CALL(*connector, receive_nothrowiftimeout(_, _, _))
+      .WillOnce(
+          DoAll(SetArrayArgument<0>(streamBuffer, streamBuffer + kReadBuffSize),
+                Return(kReadBuffSize)))
+      .WillOnce(DoAll(SetArrayArgument<0>(streamBuffer + kReadBuffSize,
+                                          streamBuffer + 2 * kReadBuffSize),
+                      Return(kReadBuffSize)))
+      .WillOnce(DoAll(SetArrayArgument<0>(streamBuffer + 2 * kReadBuffSize,
+                                          &streamBuffer[streamBufferLength]),
+                      Return(streamBufferLength - (2 * kReadBuffSize))));
+
+  StreamDataInput streamDataInput(timeout, std::move(connector), cache.get(),
+                                  nullptr);
+
+  auto object = streamDataInput.readObject();
+
+  auto response = std::dynamic_pointer_cast<GetAllServersResponse>(object);
+
+  ASSERT_THAT(response->getServers(), SizeIs(servers.size()));
+  for (unsigned int i = 0; i < servers.size(); i++) {
+    ASSERT_THAT(response->getServers()[i]->getEpString(),
+                Eq(servers[i]->getEpString()));
+  }
+}
+
+TEST(StreamDataInputTest, TimeoutWhenReading) {
+  auto connector = std::unique_ptr<ConnectorMock>(new ConnectorMock());
+
+  auto cache =
+      std::make_shared<CacheImpl>(nullptr, nullptr, false, false, nullptr);
+
+  EXPECT_CALL(*connector, getRemoteEndpoint())
+      .WillRepeatedly(Return("locator:9999"));
+
+  EXPECT_CALL(*connector, receive_nothrowiftimeout(_, _, _))
+      .WillOnce(WaitMs(2));
+
+  auto timeout = std::chrono::milliseconds(1);
+  StreamDataInput streamDataInput(timeout, std::move(connector), cache.get(),
+                                  nullptr);
+
+  ASSERT_THROW(streamDataInput.readObject(), TimeoutException);
+}
+
+}  // namespace
diff --git a/cppcache/test/mock/ConnectorMock.hpp b/cppcache/test/mock/ConnectorMock.hpp
new file mode 100644 (file)
index 0000000..7525699
--- /dev/null
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef GEODE_CONNECTORMOCK_H_
+#define GEODE_CONNECTORMOCK_H_
+
+#include <gmock/gmock.h>
+
+#include "Connector.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+class ConnectorMock : public Connector {
+ public:
+  ConnectorMock(){
+  }
+
+  MOCK_METHOD3(receive, size_t(char *b, size_t len,
+                         std::chrono::milliseconds timeout));
+
+  MOCK_METHOD0(getPort, uint16_t());
+
+  MOCK_METHOD0(getRemoteEndpoint, std::string());
+
+  MOCK_METHOD3(send, size_t(const char *b, size_t len,
+                            std::chrono::milliseconds timeout));
+
+  MOCK_METHOD3(receive_nothrowiftimeout, size_t(
+      char *b, size_t len, std::chrono::milliseconds timeout));
+
+
+};
+}  // namespace client
+}  // namespace geode
+}  // namespace apache
+
+#endif  // GEODE_CONNECTORMOCK_H_