*/
inline bool readBoolean() {
_GEODE_CHECK_BUFFER_SIZE(1);
- return *(m_buf++) == 1 ? true : false;
+ return *(buffer_++) == 1 ? true : false;
}
/**
inline void readBytesOnly(uint8_t* buffer, size_t len) {
if (len > 0) {
_GEODE_CHECK_BUFFER_SIZE(len);
- std::memcpy(buffer, m_buf, len);
- m_buf += len;
+ std::memcpy(buffer, buffer_, len);
+ buffer_ += len;
}
}
inline void readBytesOnly(int8_t* buffer, size_t len) {
if (len > 0) {
_GEODE_CHECK_BUFFER_SIZE(len);
- std::memcpy(buffer, m_buf, len);
- m_buf += len;
+ std::memcpy(buffer, buffer_, len);
+ buffer_ += len;
}
}
if (length > 0) {
_GEODE_CHECK_BUFFER_SIZE(length);
_GEODE_NEW(buffer, uint8_t[length]);
- std::memcpy(buffer, m_buf, length);
- m_buf += length;
+ std::memcpy(buffer, buffer_, length);
+ buffer_ += length;
}
*bytes = buffer;
}
if (length > 0) {
_GEODE_CHECK_BUFFER_SIZE(length);
_GEODE_NEW(buffer, int8_t[length]);
- std::memcpy(buffer, m_buf, length);
- m_buf += length;
+ std::memcpy(buffer, buffer_, length);
+ buffer_ += length;
}
*bytes = buffer;
}
*/
inline int32_t readInt32() {
_GEODE_CHECK_BUFFER_SIZE(4);
- int32_t tmp = *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
+ int32_t tmp = *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
return tmp;
}
inline int64_t readInt64() {
_GEODE_CHECK_BUFFER_SIZE(8);
int64_t tmp;
- tmp = *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
- tmp = (tmp << 8) | *(m_buf++);
+ tmp = *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
+ tmp = (tmp << 8) | *(buffer_++);
return tmp;
}
* as readonly and modification of contents using this internal pointer
* has undefined behavior.
*/
- inline const uint8_t* currentBufferPosition() const { return m_buf; }
+ inline const uint8_t* currentBufferPosition() const { return buffer_; }
/** get the number of bytes read in the buffer */
- inline size_t getBytesRead() const { return m_buf - m_bufHead; }
+ inline size_t getBytesRead() const { return buffer_ - bufferHead_; }
/** get the number of bytes remaining to be read in the buffer */
inline size_t getBytesRemaining() const {
- return (m_bufLength - getBytesRead());
+ return (bufferLength_ - getBytesRead());
}
/** advance the cursor by given offset */
- inline void advanceCursor(size_t offset) { m_buf += offset; }
+ inline void advanceCursor(size_t offset) { buffer_ += offset; }
/** rewind the cursor by given offset */
- inline void rewindCursor(size_t offset) { m_buf -= offset; }
+ inline void rewindCursor(size_t offset) { buffer_ -= offset; }
/** reset the cursor to the start of buffer */
- inline void reset() { m_buf = m_bufHead; }
+ inline void reset() { buffer_ = bufferHead_; }
inline void setBuffer() {
- m_buf = currentBufferPosition();
- m_bufLength = getBytesRemaining();
+ buffer_ = currentBufferPosition();
+ bufferLength_ = getBytesRemaining();
}
- inline void resetPdx(size_t offset) { m_buf = m_bufHead + offset; }
+ inline void resetPdx(size_t offset) { buffer_ = bufferHead_ + offset; }
- inline size_t getPdxBytes() const { return m_bufLength; }
+ inline size_t getPdxBytes() const { return bufferLength_; }
static uint8_t* getBufferCopy(const uint8_t* from, size_t length) {
uint8_t* result;
return result;
}
- inline void reset(size_t offset) { m_buf = m_bufHead + offset; }
+ inline void reset(size_t offset) { buffer_ = bufferHead_ + offset; }
uint8_t* getBufferCopyFrom(const uint8_t* from, size_t length) {
uint8_t* result;
DataInput& operator=(DataInput&&) = default;
protected:
+ const uint8_t* buffer_;
+ const uint8_t* bufferHead_;
+ size_t bufferLength_;
+ Pool* pool_;
+ const CacheImpl* cache_;
+
/** constructor given a pre-allocated byte array with size */
DataInput(const uint8_t* buffer, size_t len, const CacheImpl* cache,
Pool* pool);
virtual const SerializationRegistry& getSerializationRegistry() const;
private:
- const uint8_t* m_buf;
- const uint8_t* m_bufHead;
- size_t m_bufLength;
- Pool* m_pool;
- const CacheImpl* m_cache;
-
std::shared_ptr<Serializable> readObjectInternal(int8_t typeId = -1);
template <typename mType>
inline char readPdxChar() { return static_cast<char>(readInt16()); }
- inline void _checkBufferSize(size_t size, int32_t line) {
- if ((m_bufLength - (m_buf - m_bufHead)) < size) {
+ virtual void _checkBufferSize(size_t size, int32_t line) {
+ if ((bufferLength_ - (buffer_ - bufferHead_)) < size) {
throw OutOfRangeException(
"DataInput: attempt to read beyond buffer at line " +
std::to_string(line) + ": available buffer size " +
- std::to_string(m_bufLength - (m_buf - m_bufHead)) +
+ std::to_string(bufferLength_ - (buffer_ - bufferHead_)) +
", attempted read of size " + std::to_string(size));
}
}
- inline int8_t readNoCheck() { return *(m_buf++); }
+ inline int8_t readNoCheck() { return *(buffer_++); }
inline int16_t readInt16NoCheck() {
- int16_t tmp = *(m_buf++);
- tmp = static_cast<int16_t>((tmp << 8) | *(m_buf++));
+ int16_t tmp = *(buffer_++);
+ tmp = static_cast<int16_t>((tmp << 8) | *(buffer_++));
return tmp;
}
value.assign(reinterpret_cast<const wchar_t*>(tmp.data()), tmp.length());
}
- Pool* getPool() const { return m_pool; }
+ Pool* getPool() const { return pool_; }
friend Cache;
friend CacheImpl;
*/
virtual uint16_t getPort() = 0;
+ /**
+ * Returns the remote endpoint for this connection in the form host:port
+ */
+ virtual std::string getRemoteEndpoint() = 0;
+
/**
* Writes an array of a known size to the underlying output stream.
*
DataInput::DataInput(const uint8_t* buffer, size_t len, const CacheImpl* cache,
Pool* pool)
- : m_buf(buffer),
- m_bufHead(buffer),
- m_bufLength(len),
- m_pool(pool),
- m_cache(cache) {}
+ : buffer_(buffer),
+ bufferHead_(buffer),
+ bufferLength_(len),
+ pool_(pool),
+ cache_(cache) {}
std::shared_ptr<Serializable> DataInput::readObjectInternal(int8_t typeId) {
return getSerializationRegistry().deserialize(*this, typeId);
}
const SerializationRegistry& DataInput::getSerializationRegistry() const {
- return *m_cache->getSerializationRegistry();
+ return *cache_->getSerializationRegistry();
}
-Cache* DataInput::getCache() const { return m_cache->getCache(); }
+Cache* DataInput::getCache() const { return cache_->getCache(); }
template <class _Traits, class _Allocator>
void DataInput::readJavaModifiedUtf8(
uint16_t length = readInt16();
_GEODE_CHECK_BUFFER_SIZE(length);
value = internal::JavaModifiedUtf8::decode(
- reinterpret_cast<const char*>(m_buf), length);
+ reinterpret_cast<const char*>(buffer_), length);
advanceCursor(length);
}
template APACHE_GEODE_EXPLICIT_TEMPLATE_EXPORT void
namespace client {
void GetAllServersResponse::toData(DataOutput& output) const {
- int32_t numServers = static_cast<int32_t>(m_servers.size());
- output.writeInt(numServers);
- for (int32_t i = 0; i < numServers; i++) {
- output.writeObject(m_servers.at(i));
+ auto numServers = servers_.size();
+ output.writeInt(static_cast<int32_t>(numServers));
+ for (unsigned int i = 0; i < numServers; i++) {
+ output.writeObject(servers_.at(i));
}
}
void GetAllServersResponse::fromData(DataInput& input) {
for (int i = 0; i < numServers; i++) {
std::shared_ptr<ServerLocation> sLoc = std::make_shared<ServerLocation>();
sLoc->fromData(input);
- m_servers.push_back(sLoc);
+ servers_.push_back(sLoc);
}
}
class GetAllServersResponse : public internal::DataSerializableFixedId_t<
internal::DSFid::GetAllServersResponse> {
- std::vector<std::shared_ptr<ServerLocation> > m_servers;
-
public:
static std::shared_ptr<Serializable> create() {
return std::make_shared<GetAllServersResponse>();
}
GetAllServersResponse() : Serializable() {}
+ explicit GetAllServersResponse(
+ std::vector<std::shared_ptr<ServerLocation> > servers)
+ : Serializable(), servers_(servers) {}
void toData(DataOutput& output) const override;
void fromData(DataInput& input) override;
size_t objectSize() const override {
- return sizeof(GetAllServersResponse) + m_servers.capacity();
+ return sizeof(GetAllServersResponse) + servers_.capacity();
}
std::vector<std::shared_ptr<ServerLocation> > getServers() {
- return m_servers;
+ return servers_;
}
~GetAllServersResponse() override = default;
+
+ private:
+ std::vector<std::shared_ptr<ServerLocation> > servers_;
};
} // namespace client
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StreamDataInput.hpp"
+
+#include <geode/DataInput.hpp>
+
+#include "Utils.hpp"
+#include "util/Log.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+constexpr size_t kBufferSize = 3000;
+
+StreamDataInput::StreamDataInput(std::chrono::milliseconds timeout,
+ std::unique_ptr<Connector> connector,
+ const CacheImpl* cache, Pool* pool)
+ : DataInput(nullptr, 0, cache, pool),
+ connector_(std::move(connector)),
+ remainingTimeBeforeTimeout_(timeout) {}
+
+void StreamDataInput::readDataIfNotAvailable(size_t size) {
+ char buff[kBufferSize];
+ while (getBytesRemaining() < size) {
+ const auto start = std::chrono::system_clock::now();
+
+ const auto receivedLength = connector_->receive_nothrowiftimeout(
+ buff, kBufferSize, remainingTimeBeforeTimeout_);
+
+ const auto timeSpent = std::chrono::system_clock::now() - start;
+
+ remainingTimeBeforeTimeout_ -=
+ std::chrono::duration_cast<decltype(remainingTimeBeforeTimeout_)>(
+ timeSpent);
+
+ LOGDEBUG(
+ "received %d bytes from %s: %s, time spent: "
+ "%ld millisecs, time remaining before timeout: %ld millisecs",
+ receivedLength, connector_->getRemoteEndpoint().c_str(),
+ Utils::convertBytesToString(reinterpret_cast<uint8_t*>(buff),
+ receivedLength)
+ .c_str(),
+ std::chrono::duration_cast<std::chrono::milliseconds>(timeSpent)
+ .count(),
+ remainingTimeBeforeTimeout_.count());
+
+ if (remainingTimeBeforeTimeout_ <= std::chrono::milliseconds::zero()) {
+ throw(TimeoutException(std::string("Timeout when receiving from ")
+ .append(connector_->getRemoteEndpoint())));
+ }
+
+ auto newLength = bufferLength_ + receivedLength;
+ auto currentPosition = getBytesRead();
+ streamBuf_.resize(newLength);
+ memcpy(streamBuf_.data() + bufferLength_, buff, receivedLength);
+
+ bufferHead_ = streamBuf_.data();
+ buffer_ = bufferHead_ + currentPosition;
+ bufferLength_ = newLength;
+ }
+}
+
+} // namespace client
+} // namespace geode
+} // namespace apache
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#ifndef GEODE_STREAMDATAINPUT_H_
+#define GEODE_STREAMDATAINPUT_H_
+
+#include <chrono>
+
+#include "Connector.hpp"
+#include "geode/DataInput.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+
+class Connector;
+
+/**
+ * Provides the same functionality as its parent class but
+ * data is retrieved, instead of from a passed buffer,
+ * from a socket connection.
+ */
+class APACHE_GEODE_EXPORT StreamDataInput : public DataInput {
+ public:
+ StreamDataInput(std::chrono::milliseconds timeout,
+ std::unique_ptr<Connector> connector, const CacheImpl* cache,
+ Pool* pool);
+
+ protected:
+ void _checkBufferSize(size_t size, int32_t /* line */) override {
+ readDataIfNotAvailable(size);
+ }
+
+ void readDataIfNotAvailable(size_t size);
+
+ private:
+ std::unique_ptr<Connector> connector_;
+ std::chrono::milliseconds remainingTimeBeforeTimeout_;
+ std::vector<uint8_t> streamBuf_;
+};
+} // namespace client
+} // namespace geode
+} // namespace apache
+
+#endif // GEODE_STREAMDATAINPUT_H_
#include "TcpConn.hpp"
-#include <iomanip>
#include <iostream>
#include <boost/optional.hpp>
// Return the local port for this TCP connection.
uint16_t TcpConn::getPort() { return socket_.local_endpoint().port(); }
+std::string TcpConn::getRemoteEndpoint() {
+ return socket_.remote_endpoint().address().to_string().append(":").append(
+ std::to_string(socket_.remote_endpoint().port()));
+}
+
void TcpConn::connect(boost::asio::ip::tcp::resolver::results_type r,
std::chrono::microseconds timeout) {
boost::optional<boost::system::error_code> connect_result;
uint16_t getPort() override final;
+ std::string getRemoteEndpoint() override final;
+
protected:
boost::asio::io_context io_context_;
boost::asio::ip::tcp::socket socket_;
#include <boost/thread/lock_types.hpp>
-#include <geode/DataInput.hpp>
-#include <geode/DataOutput.hpp>
#include <geode/SystemProperties.hpp>
#include "CacheImpl.hpp"
#include "LocatorListResponse.hpp"
#include "QueueConnectionRequest.hpp"
#include "QueueConnectionResponse.hpp"
+#include "StreamDataInput.hpp"
#include "TcpConn.hpp"
#include "TcpSslConn.hpp"
#include "TcrConnectionManager.hpp"
namespace geode {
namespace client {
-const size_t BUFF_SIZE = 3000;
const size_t DEFAULT_CONNECTION_RETRIES = 3;
ThinClientLocatorHelper::ThinClientLocatorHelper(
if (sentLength <= 0) {
return nullptr;
}
- char buff[BUFF_SIZE];
- const auto receivedLength = conn->receive(buff, m_poolDM->getReadTimeout());
- if (!receivedLength) {
- return nullptr;
- }
-
- LOGDEBUG("%s(%p): received %d bytes from locator: %s", __GNFN__, this,
- receivedLength,
- Utils::convertBytesToString(reinterpret_cast<uint8_t*>(buff),
- receivedLength)
- .c_str());
- auto di = m_poolDM->getConnectionManager().getCacheImpl()->createDataInput(
- reinterpret_cast<uint8_t*>(buff), receivedLength);
+ StreamDataInput di(m_poolDM->getReadTimeout(), std::move(conn),
+ m_poolDM->getConnectionManager().getCacheImpl(),
+ nullptr);
if (di.read() == REPLY_SSL_ENABLED && !sys_prop.sslEnabled()) {
LOGERROR(
QueueConnectionRequestTest.cpp
RegionAttributesFactoryTest.cpp
SerializableCreateTests.cpp
+ StreamDataInputTest.cpp
StringPrefixPartitionResolverTest.cpp
StructSetTest.cpp
TcrConnectionTest.cpp
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <gmock/gmock-actions.h>
+#include <gmock/gmock-matchers.h>
+
+#include <gtest/gtest.h>
+
+#include "CacheImpl.hpp"
+#include "Connector.hpp"
+#include "GetAllServersResponse.hpp"
+#include "ServerLocation.hpp"
+#include "StreamDataInput.hpp"
+#include "geode/DataOutput.hpp"
+#include "mock/ConnectorMock.hpp"
+
+namespace {
+
+using apache::geode::client::CacheImpl;
+using apache::geode::client::Connector;
+using apache::geode::client::ConnectorMock;
+using apache::geode::client::DataOutput;
+using apache::geode::client::GetAllServersResponse;
+using apache::geode::client::Serializable;
+using apache::geode::client::ServerLocation;
+using apache::geode::client::StreamDataInput;
+using apache::geode::client::TimeoutException;
+
+using ::testing::_;
+using ::testing::DoAll;
+using ::testing::Eq;
+using ::testing::Return;
+using ::testing::SetArrayArgument;
+using ::testing::SizeIs;
+
+constexpr size_t kReadBuffSize = 3000;
+constexpr size_t kStreamBufferSize = 10000;
+
+ACTION_P(WaitMs, milliseconds) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds));
+ return 0;
+}
+
+TEST(StreamDataInputTest, ObjectSizeGreaterThanReadBufferSize) {
+ std::unique_ptr<ConnectorMock> connector =
+ std::unique_ptr<ConnectorMock>(new ConnectorMock());
+
+ unsigned int numServers = 100;
+ std::vector<std::shared_ptr<ServerLocation> > servers(numServers);
+
+ for (unsigned int i = 0; i < numServers; i++) {
+ servers[i] = std::make_shared<ServerLocation>(
+ std::string("this.is.a.quite.long.hostname.and.the.reason.is.that.it."
+ "is.used.for.testing:") += std::to_string(2000 + i));
+ }
+
+ GetAllServersResponse getAllServersResponse(servers);
+
+ auto cache =
+ std::make_shared<CacheImpl>(nullptr, nullptr, false, false, nullptr);
+
+ auto dataOutput = cache->createDataOutput();
+
+ getAllServersResponse.toData(dataOutput);
+
+ auto buffer = dataOutput.getBuffer();
+ auto dataOutputBufferLength = dataOutput.getBufferLength();
+
+ // Gossip header
+ uint8_t streamBuffer[kStreamBufferSize];
+ streamBuffer[0] = 1;
+ streamBuffer[1] = 0xd6;
+ memcpy(streamBuffer + 2, buffer, dataOutputBufferLength);
+
+ auto streamBufferLength = dataOutputBufferLength + 2;
+
+ auto timeout = std::chrono::milliseconds(1000);
+ EXPECT_CALL(*connector, getRemoteEndpoint())
+ .WillRepeatedly(Return("locator:9999"));
+
+ EXPECT_CALL(*connector, receive_nothrowiftimeout(_, _, _))
+ .WillOnce(
+ DoAll(SetArrayArgument<0>(streamBuffer, streamBuffer + kReadBuffSize),
+ Return(kReadBuffSize)))
+ .WillOnce(DoAll(SetArrayArgument<0>(streamBuffer + kReadBuffSize,
+ streamBuffer + 2 * kReadBuffSize),
+ Return(kReadBuffSize)))
+ .WillOnce(DoAll(SetArrayArgument<0>(streamBuffer + 2 * kReadBuffSize,
+ &streamBuffer[streamBufferLength]),
+ Return(streamBufferLength - (2 * kReadBuffSize))));
+
+ StreamDataInput streamDataInput(timeout, std::move(connector), cache.get(),
+ nullptr);
+
+ auto object = streamDataInput.readObject();
+
+ auto response = std::dynamic_pointer_cast<GetAllServersResponse>(object);
+
+ ASSERT_THAT(response->getServers(), SizeIs(servers.size()));
+ for (unsigned int i = 0; i < servers.size(); i++) {
+ ASSERT_THAT(response->getServers()[i]->getEpString(),
+ Eq(servers[i]->getEpString()));
+ }
+}
+
+TEST(StreamDataInputTest, TimeoutWhenReading) {
+ auto connector = std::unique_ptr<ConnectorMock>(new ConnectorMock());
+
+ auto cache =
+ std::make_shared<CacheImpl>(nullptr, nullptr, false, false, nullptr);
+
+ EXPECT_CALL(*connector, getRemoteEndpoint())
+ .WillRepeatedly(Return("locator:9999"));
+
+ EXPECT_CALL(*connector, receive_nothrowiftimeout(_, _, _))
+ .WillOnce(WaitMs(2));
+
+ auto timeout = std::chrono::milliseconds(1);
+ StreamDataInput streamDataInput(timeout, std::move(connector), cache.get(),
+ nullptr);
+
+ ASSERT_THROW(streamDataInput.readObject(), TimeoutException);
+}
+
+} // namespace
--- /dev/null
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef GEODE_CONNECTORMOCK_H_
+#define GEODE_CONNECTORMOCK_H_
+
+#include <gmock/gmock.h>
+
+#include "Connector.hpp"
+
+namespace apache {
+namespace geode {
+namespace client {
+class ConnectorMock : public Connector {
+ public:
+ ConnectorMock(){
+ }
+
+ MOCK_METHOD3(receive, size_t(char *b, size_t len,
+ std::chrono::milliseconds timeout));
+
+ MOCK_METHOD0(getPort, uint16_t());
+
+ MOCK_METHOD0(getRemoteEndpoint, std::string());
+
+ MOCK_METHOD3(send, size_t(const char *b, size_t len,
+ std::chrono::milliseconds timeout));
+
+ MOCK_METHOD3(receive_nothrowiftimeout, size_t(
+ char *b, size_t len, std::chrono::milliseconds timeout));
+
+
+};
+} // namespace client
+} // namespace geode
+} // namespace apache
+
+#endif // GEODE_CONNECTORMOCK_H_