[IOTDB-4116] Merge SenderService and ReceiverService into SyncService (#6966) master
authorChen YZ <43774645+Cpaulyz@users.noreply.github.com>
Fri, 12 Aug 2022 15:46:21 +0000 (23:46 +0800)
committerGitHub <noreply@github.com>
Fri, 12 Aug 2022 15:46:21 +0000 (23:46 +0800)
15 files changed:
integration/src/test/java/org/apache/iotdb/db/integration/sync/IoTDBSyncReceiverIT.java
node-commons/src/main/java/org/apache/iotdb/commons/service/ServiceType.java
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
server/src/main/java/org/apache/iotdb/db/service/DataNode.java
server/src/main/java/org/apache/iotdb/db/service/IoTDB.java
server/src/main/java/org/apache/iotdb/db/service/NewIoTDB.java
server/src/main/java/org/apache/iotdb/db/sync/SyncService.java [moved from server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java with 82% similarity]
server/src/main/java/org/apache/iotdb/db/sync/common/ISyncInfoFetcher.java
server/src/main/java/org/apache/iotdb/db/sync/common/LocalSyncInfoFetcher.java
server/src/main/java/org/apache/iotdb/db/sync/common/SyncInfo.java
server/src/main/java/org/apache/iotdb/db/sync/common/persistence/SyncLogReader.java
server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java [deleted file]
server/src/main/java/org/apache/iotdb/db/sync/sender/pipe/Pipe.java
server/src/main/java/org/apache/iotdb/db/sync/transport/client/IoTDBSInkTransportClient.java
server/src/main/java/org/apache/iotdb/db/sync/transport/server/TransportServerManager.java

index 2614dd9d2b5f2e256e26b236e6874f10a316c1dc..016ccb84c7df04a9ecda9a6e1fb3ba792de41814 100644 (file)
@@ -26,11 +26,11 @@ import org.apache.iotdb.db.qp.physical.PhysicalPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
 import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
-import org.apache.iotdb.db.sync.receiver.ReceiverService;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
 import org.apache.iotdb.db.sync.transport.client.IoTDBSInkTransportClient;
@@ -95,7 +95,7 @@ public class IoTDBSyncReceiverIT {
     EnvironmentUtils.cleanEnv();
     EnvironmentUtils.envSetUp();
     try {
-      ReceiverService.getInstance().startPipeServer(true);
+      SyncService.getInstance().startPipeServer(true);
       new Socket("localhost", 6670).close();
     } catch (Exception e) {
       Assert.fail("Failed to start pipe server because " + e.getMessage());
@@ -124,7 +124,7 @@ public class IoTDBSyncReceiverIT {
   public void testStopPipeServer() {
     logger.info("testStopPipeServerCheck");
     try {
-      ReceiverService.getInstance().stopPipeServer();
+      SyncService.getInstance().stopPipeServer();
     } catch (PipeServerException e) {
       Assert.fail("Can not stop pipe server");
     }
index 904562f6190109194b39e5d80b293bd81d0cc258..ae930eea60a1fd035cf82318017ae1588a72035d 100644 (file)
@@ -33,11 +33,10 @@ public enum ServiceType {
   JVM_MEM_CONTROL_SERVICE("Memory Controller", ""),
   AUTHORIZATION_SERVICE("Authorization ServerService", ""),
   FILE_READER_MANAGER_SERVICE("File reader manager ServerService", ""),
-  SYNC_SERVICE("SYNC ServerService", ""),
   UPGRADE_SERVICE("UPGRADE DataService", ""),
   SETTLE_SERVICE("SETTLE DataService", ""),
-  SENDER_SERVICE("Sync Sender service", ""),
-  RECEIVER_SERVICE("Sync Receiver service", ""),
+  SYNC_RPC_SERVICE("Sync RPC ServerService", ""),
+  SYNC_SERVICE("Sync Service", ""),
   MERGE_SERVICE("Merge Manager", "Merge Manager"),
   COMPACTION_SERVICE("Compaction Manager", "Compaction Manager"),
   PERFORMANCE_STATISTIC_SERVICE("PERFORMANCE_STATISTIC_SERVICE", "PERFORMANCE_STATISTIC_SERVICE"),
index ce944941e0888cd9003896f66152fcb309bc06cb..70c4883dcf2235fdb3d5456c371268757284b652 100644 (file)
@@ -155,9 +155,8 @@ import org.apache.iotdb.db.query.executor.IQueryRouter;
 import org.apache.iotdb.db.query.executor.QueryRouter;
 import org.apache.iotdb.db.service.IoTDB;
 import org.apache.iotdb.db.service.SettleService;
-import org.apache.iotdb.db.sync.receiver.ReceiverService;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
 import org.apache.iotdb.db.tools.TsFileSplitByPartitionTool;
 import org.apache.iotdb.db.utils.FileLoaderUtils;
 import org.apache.iotdb.db.utils.TypeInferenceUtils;
@@ -453,7 +452,7 @@ public class PlanExecutor implements IPlanExecutor {
 
   private boolean operateStopPipeServer() throws QueryProcessException {
     try {
-      ReceiverService.getInstance().stopPipeServer();
+      SyncService.getInstance().stopPipeServer();
     } catch (PipeServerException e) {
       throw new QueryProcessException(e);
     }
@@ -462,7 +461,7 @@ public class PlanExecutor implements IPlanExecutor {
 
   private boolean operateStartPipeServer() throws QueryProcessException {
     try {
-      ReceiverService.getInstance().startPipeServer(false);
+      SyncService.getInstance().startPipeServer(false);
     } catch (PipeServerException e) {
       throw new QueryProcessException(e);
     }
@@ -772,7 +771,7 @@ public class PlanExecutor implements IPlanExecutor {
   }
 
   private QueryDataSet processShowPipeServer(ShowPipeServerPlan plan) {
-    return ReceiverService.getInstance().showPipeServer(plan);
+    return SyncService.getInstance().showPipeServer(plan);
   }
 
   private QueryDataSet processCountNodes(CountPlan countPlan) throws MetadataException {
@@ -1295,7 +1294,7 @@ public class PlanExecutor implements IPlanExecutor {
                 new PartialPath(COLUMN_PIPESINK_ATTRIBUTES, false)),
             Arrays.asList(TSDataType.TEXT, TSDataType.TEXT, TSDataType.TEXT));
     boolean showAll = "".equals(plan.getPipeSinkName());
-    for (PipeSink pipeSink : SenderService.getInstance().getAllPipeSink()) {
+    for (PipeSink pipeSink : SyncService.getInstance().getAllPipeSink()) {
       if (showAll || plan.getPipeSinkName().equals(pipeSink.getPipeSinkName())) {
         RowRecord record = new RowRecord(0);
         record.addField(Binary.valueOf(pipeSink.getPipeSinkName()), TSDataType.TEXT);
@@ -1341,8 +1340,8 @@ public class PlanExecutor implements IPlanExecutor {
                 TSDataType.TEXT,
                 TSDataType.TEXT,
                 TSDataType.TEXT));
-    SenderService.getInstance().showPipe(plan, listDataSet);
-    ReceiverService.getInstance().showPipe(plan, listDataSet);
+    SyncService.getInstance().showPipe(plan, listDataSet);
+    SyncService.getInstance().showPipe(plan, listDataSet);
     // sort by create time
     listDataSet.sort(Comparator.comparing(o -> o.getFields().get(0).getStringValue()));
     return listDataSet;
@@ -2520,7 +2519,7 @@ public class PlanExecutor implements IPlanExecutor {
 
   private void createPipeSink(CreatePipeSinkPlan plan) throws QueryProcessException {
     try {
-      SenderService.getInstance().addPipeSink(plan);
+      SyncService.getInstance().addPipeSink(plan);
     } catch (PipeSinkException e) {
       throw new QueryProcessException("Create pipeSink error.", e); // e will override the message
     } catch (IllegalArgumentException e) {
@@ -2531,7 +2530,7 @@ public class PlanExecutor implements IPlanExecutor {
 
   private void dropPipeSink(DropPipeSinkPlan plan) throws QueryProcessException {
     try {
-      SenderService.getInstance().dropPipeSink(plan.getPipeSinkName());
+      SyncService.getInstance().dropPipeSink(plan.getPipeSinkName());
     } catch (PipeSinkException e) {
       throw new QueryProcessException("Can not drop pipeSink.", e);
     }
@@ -2539,7 +2538,7 @@ public class PlanExecutor implements IPlanExecutor {
 
   private void createPipe(CreatePipePlan plan) throws QueryProcessException {
     try {
-      SenderService.getInstance().addPipe(plan);
+      SyncService.getInstance().addPipe(plan);
     } catch (PipeException e) {
       throw new QueryProcessException("Create pipe error.", e);
     }
@@ -2548,11 +2547,11 @@ public class PlanExecutor implements IPlanExecutor {
   private void operatePipe(OperatePipePlan plan) throws QueryProcessException {
     try {
       if (Operator.OperatorType.STOP_PIPE.equals(plan.getOperatorType())) {
-        SenderService.getInstance().stopPipe(plan.getPipeName());
+        SyncService.getInstance().stopPipe(plan.getPipeName());
       } else if (Operator.OperatorType.START_PIPE.equals(plan.getOperatorType())) {
-        SenderService.getInstance().startPipe(plan.getPipeName());
+        SyncService.getInstance().startPipe(plan.getPipeName());
       } else if (Operator.OperatorType.DROP_PIPE.equals(plan.getOperatorType())) {
-        SenderService.getInstance().dropPipe(plan.getPipeName());
+        SyncService.getInstance().dropPipe(plan.getPipeName());
       } else {
         throw new QueryProcessException(
             String.format("Error operator type %s.", plan.getOperatorType()),
index 5dd30de66fcb09cff3bc5345d61dc5f15a2a1020..c5f27b081318c7dcde3873846d4ec68ba7fc3e15 100644 (file)
@@ -61,8 +61,7 @@ import org.apache.iotdb.db.service.basic.ServiceProvider;
 import org.apache.iotdb.db.service.basic.StandaloneServiceProvider;
 import org.apache.iotdb.db.service.metrics.MetricsService;
 import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
-import org.apache.iotdb.db.sync.receiver.ReceiverService;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.wal.WALManager;
 import org.apache.iotdb.db.wal.utils.WALMode;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -282,8 +281,6 @@ public class DataNode implements DataNodeMBean {
 
     registerUdfServices();
 
-    registerManager.register(ReceiverService.getInstance());
-
     logger.info(
         "IoTDB DataNode is setting up, some storage groups may not be ready now, please wait several seconds...");
 
@@ -297,7 +294,7 @@ public class DataNode implements DataNodeMBean {
       }
     }
 
-    registerManager.register(SenderService.getInstance());
+    registerManager.register(SyncService.getInstance());
     registerManager.register(UpgradeSevice.getINSTANCE());
     // in mpp mode we temporarily don't start settle service because it uses StorageEngine directly
     // in itself, but currently we need to use StorageEngineV2 instead of StorageEngine in mpp mode.
index 56ecdccb5b0d000e0ad8d8be2efe925cee256ae3..f262c33a8cec43fea7fcc146a09dc37c330b7291 100644 (file)
@@ -48,8 +48,7 @@ import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.basic.ServiceProvider;
 import org.apache.iotdb.db.service.basic.StandaloneServiceProvider;
 import org.apache.iotdb.db.service.metrics.MetricsService;
-import org.apache.iotdb.db.sync.receiver.ReceiverService;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.wal.WALManager;
 
 import org.slf4j.Logger;
@@ -150,7 +149,7 @@ public class IoTDB implements IoTDBMBean {
     registerManager.register(CacheHitRatioMonitor.getInstance());
     registerManager.register(CompactionTaskManager.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
-    registerManager.register(SenderService.getInstance());
+    registerManager.register(SyncService.getInstance());
     registerManager.register(WALManager.getInstance());
 
     registerManager.register(StorageEngine.getInstance());
@@ -165,7 +164,6 @@ public class IoTDB implements IoTDBMBean {
                 + File.separator
                 + "udf"
                 + File.separator));
-    registerManager.register(ReceiverService.getInstance());
 
     // in cluster mode, RPC service is not enabled.
     if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
index 5eb6db49a76d99b96f4acf12f5927d10ca912a41..a317ee9802265b40ff1f24efea14ef9862bc2392 100644 (file)
@@ -48,8 +48,7 @@ import org.apache.iotdb.db.rescon.PrimitiveArrayManager;
 import org.apache.iotdb.db.rescon.SystemInfo;
 import org.apache.iotdb.db.service.metrics.MetricsService;
 import org.apache.iotdb.db.service.thrift.impl.ClientRPCServiceImpl;
-import org.apache.iotdb.db.sync.receiver.ReceiverService;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.wal.WALManager;
 
 import org.slf4j.Logger;
@@ -129,7 +128,7 @@ public class NewIoTDB implements NewIoTDBMBean {
     registerManager.register(CacheHitRatioMonitor.getInstance());
     registerManager.register(CompactionTaskManager.getInstance());
     JMXService.registerMBean(getInstance(), mbeanName);
-    registerManager.register(SenderService.getInstance());
+    registerManager.register(SyncService.getInstance());
     registerManager.register(WALManager.getInstance());
 
     registerManager.register(StorageEngineV2.getInstance());
@@ -145,7 +144,6 @@ public class NewIoTDB implements NewIoTDBMBean {
                 + File.separator
                 + "udf"
                 + File.separator));
-    registerManager.register(ReceiverService.getInstance());
 
     // in cluster mode, RPC service is not enabled.
     if (IoTDBDescriptor.getInstance().getConfig().isEnableRpcService()) {
similarity index 82%
rename from server/src/main/java/org/apache/iotdb/db/sync/sender/service/SenderService.java
rename to server/src/main/java/org/apache/iotdb/db/sync/SyncService.java
index dd6e8c7dbb650fa5755787ef774faa7c81c173b9..f78978855dc57927af7351a94272bcbf6295a6e7 100644 (file)
  * under the License.
  *
  */
-package org.apache.iotdb.db.sync.sender.service;
+package org.apache.iotdb.db.sync;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
 import org.apache.iotdb.commons.exception.ShutdownException;
 import org.apache.iotdb.commons.exception.StartupException;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.commons.service.IService;
 import org.apache.iotdb.commons.service.ServiceType;
 import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.commons.sync.SyncPathUtil;
 import org.apache.iotdb.db.exception.sync.PipeException;
+import org.apache.iotdb.db.exception.sync.PipeServerException;
 import org.apache.iotdb.db.exception.sync.PipeSinkException;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipePlan;
 import org.apache.iotdb.db.qp.physical.sys.CreatePipeSinkPlan;
 import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
 import org.apache.iotdb.db.qp.utils.DatetimeUtils;
 import org.apache.iotdb.db.query.dataset.ListDataSet;
 import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
 import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
-import org.apache.iotdb.db.sync.common.persistence.SyncLogReader;
 import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginManager;
 import org.apache.iotdb.db.sync.externalpipe.ExtPipePluginRegister;
 import org.apache.iotdb.db.sync.externalpipe.ExternalPipeStatus;
@@ -47,11 +49,15 @@ import org.apache.iotdb.db.sync.sender.pipe.Pipe;
 import org.apache.iotdb.db.sync.sender.pipe.PipeInfo;
 import org.apache.iotdb.db.sync.sender.pipe.PipeSink;
 import org.apache.iotdb.db.sync.sender.pipe.TsFilePipe;
+import org.apache.iotdb.db.sync.sender.service.TransportHandler;
+import org.apache.iotdb.db.sync.transport.server.TransportServerManager;
 import org.apache.iotdb.db.utils.sync.SyncPipeUtil;
 import org.apache.iotdb.pipe.external.api.IExternalPipeSinkWriterFactory;
 import org.apache.iotdb.rpc.TSStatusCode;
 import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.read.common.Field;
 import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
 import org.apache.iotdb.tsfile.utils.Binary;
 
 import org.slf4j.Logger;
@@ -59,11 +65,14 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
-public class SenderService implements IService {
-  private static final Logger logger = LoggerFactory.getLogger(SenderService.class);
+import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESERVER_STATUS;
+
+public class SyncService implements IService {
+  private static final Logger logger = LoggerFactory.getLogger(SyncService.class);
 
   private Pipe runningPipe;
 
@@ -74,16 +83,16 @@ public class SenderService implements IService {
 
   private ISyncInfoFetcher syncInfoFetcher = LocalSyncInfoFetcher.getInstance();
 
-  private SenderService() {}
+  private SyncService() {}
 
-  private static class SenderServiceHolder {
-    private static final SenderService INSTANCE = new SenderService();
+  private static class SyncServiceHolder {
+    private static final SyncService INSTANCE = new SyncService();
 
-    private SenderServiceHolder() {}
+    private SyncServiceHolder() {}
   }
 
-  public static SenderService getInstance() {
-    return SenderService.SenderServiceHolder.INSTANCE;
+  public static SyncService getInstance() {
+    return SyncServiceHolder.INSTANCE;
   }
 
   // region Interfaces and Implementation of PipeSink
@@ -107,7 +116,61 @@ public class SenderService implements IService {
   }
 
   public List<PipeSink> getAllPipeSink() {
-    return syncInfoFetcher.getAllPipeSink();
+    return syncInfoFetcher.getAllPipeSinks();
+  }
+
+  // endregion
+
+  // region Interfaces and Implementation of PipeServer
+
+  /**
+   * start receiver service
+   *
+   * @param isRecovery if isRecovery, it will ignore check and force a start
+   */
+  public synchronized void startPipeServer(boolean isRecovery) throws PipeServerException {
+    if (syncInfoFetcher.isPipeServerEnable() && !isRecovery) {
+      return;
+    }
+    try {
+      TransportServerManager.getInstance().startService();
+      TSStatus status = syncInfoFetcher.startPipeServer();
+      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+        throw new PipeServerException("Failed to start pipe server because " + status.getMessage());
+      }
+    } catch (StartupException e) {
+      throw new PipeServerException("Failed to start pipe server because " + e.getMessage());
+    }
+  }
+
+  /** stop receiver service */
+  public synchronized void stopPipeServer() throws PipeServerException {
+    if (!syncInfoFetcher.isPipeServerEnable()) {
+      return;
+    }
+    TransportServerManager.getInstance().stopService();
+    TSStatus status = syncInfoFetcher.stopPipeServer();
+    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
+      throw new PipeServerException("Failed to stop pipe server because " + status.getMessage());
+    }
+  }
+
+  /**
+   * query by sql SHOW PIPESERVER STATUS
+   *
+   * @return QueryDataSet contained one column: enable
+   */
+  public QueryDataSet showPipeServer(ShowPipeServerPlan plan) {
+    ListDataSet dataSet =
+        new ListDataSet(
+            Collections.singletonList(new PartialPath(COLUMN_PIPESERVER_STATUS, false)),
+            Collections.singletonList(TSDataType.BOOLEAN));
+    RowRecord rowRecord = new RowRecord(0);
+    Field status = new Field(TSDataType.BOOLEAN);
+    status.setBoolV(syncInfoFetcher.isPipeServerEnable());
+    rowRecord.addField(status);
+    dataSet.putRecord(rowRecord);
+    return dataSet;
   }
 
   // endregion
@@ -241,56 +304,6 @@ public class SenderService implements IService {
     }
   }
 
-  // endregion
-
-  // region Interfaces and Implementation of External-Pipe
-
-  /** Start ExternalPipeProcessor who handle externalPipe */
-  private void startExternalPipeManager(boolean startExtPipe) throws PipeException {
-    if (!(runningPipe instanceof TsFilePipe)) {
-      logger.error("startExternalPipeManager(), runningPipe is not TsFilePipe. " + runningPipe);
-      return;
-    }
-
-    PipeSink pipeSink = runningPipe.getPipeSink();
-    if (!(pipeSink instanceof ExternalPipeSink)) {
-      logger.error("startExternalPipeManager(), pipeSink is not ExternalPipeSink." + pipeSink);
-      return;
-    }
-
-    String extPipeSinkTypeName = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
-    IExternalPipeSinkWriterFactory externalPipeSinkWriterFactory =
-        ExtPipePluginRegister.getInstance().getWriteFactory(extPipeSinkTypeName);
-    if (externalPipeSinkWriterFactory == null) {
-      logger.error(
-          String.format(
-              "startExternalPipeManager(), can not found ExternalPipe plugin for {}.",
-              extPipeSinkTypeName));
-      throw new PipeException("Can not found ExternalPipe plugin for " + extPipeSinkTypeName + ".");
-    }
-
-    if (extPipePluginManager == null) {
-      extPipePluginManager = new ExtPipePluginManager((TsFilePipe) this.runningPipe);
-    }
-
-    if (startExtPipe) {
-      try {
-        extPipePluginManager.startExtPipe(
-            extPipeSinkTypeName, ((ExternalPipeSink) pipeSink).getSinkParams());
-      } catch (IOException e) {
-        logger.error("Failed to start External Pipe: {}.", extPipeSinkTypeName, e);
-        throw new PipeException(
-            "Failed to start External Pipe: " + extPipeSinkTypeName + ". " + e.getMessage());
-      }
-    }
-  }
-
-  public ExtPipePluginManager getExternalPipeManager() {
-    return extPipePluginManager;
-  }
-
-  // endregion
-
   public synchronized void receiveMsg(PipeMessage.MsgType type, String message) {
     if (runningPipe == null || runningPipe.getStatus() == Pipe.PipeStatus.DROP) {
       logger.info(String.format("No running pipe for receiving msg %s.", message));
@@ -321,7 +334,7 @@ public class SenderService implements IService {
 
   public void showPipe(ShowPipePlan plan, ListDataSet listDataSet) {
     boolean showAll = "".equals(plan.getPipeName());
-    for (PipeInfo pipe : SenderService.getInstance().getAllPipeInfos()) {
+    for (PipeInfo pipe : SyncService.getInstance().getAllPipeInfos()) {
       if (showAll || plan.getPipeName().equals(pipe.getPipeName())) {
         RowRecord record = new RowRecord(0);
         record.addField(
@@ -337,7 +350,7 @@ public class SenderService implements IService {
         PipeSink pipeSink = syncInfoFetcher.getPipeSink(pipe.getPipeSinkName());
         if (pipeSink.getType() == PipeSink.PipeSinkType.ExternalPipe) { // for external pipe
           ExtPipePluginManager extPipePluginManager =
-              SenderService.getInstance().getExternalPipeManager();
+              SyncService.getInstance().getExternalPipeManager();
 
           if (extPipePluginManager != null) {
             String extPipeType = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
@@ -363,11 +376,71 @@ public class SenderService implements IService {
         listDataSet.putRecord(record);
       }
     }
+    // TODO: implement show pipe in receiver
   }
 
+  // endregion
+
+  // region Interfaces and Implementation of External-Pipe
+
+  /** Start ExternalPipeProcessor who handle externalPipe */
+  private void startExternalPipeManager(boolean startExtPipe) throws PipeException {
+    if (!(runningPipe instanceof TsFilePipe)) {
+      logger.error("startExternalPipeManager(), runningPipe is not TsFilePipe. " + runningPipe);
+      return;
+    }
+
+    PipeSink pipeSink = runningPipe.getPipeSink();
+    if (!(pipeSink instanceof ExternalPipeSink)) {
+      logger.error("startExternalPipeManager(), pipeSink is not ExternalPipeSink." + pipeSink);
+      return;
+    }
+
+    String extPipeSinkTypeName = ((ExternalPipeSink) pipeSink).getExtPipeSinkTypeName();
+    IExternalPipeSinkWriterFactory externalPipeSinkWriterFactory =
+        ExtPipePluginRegister.getInstance().getWriteFactory(extPipeSinkTypeName);
+    if (externalPipeSinkWriterFactory == null) {
+      logger.error(
+          String.format(
+              "startExternalPipeManager(), can not found ExternalPipe plugin for {}.",
+              extPipeSinkTypeName));
+      throw new PipeException("Can not found ExternalPipe plugin for " + extPipeSinkTypeName + ".");
+    }
+
+    if (extPipePluginManager == null) {
+      extPipePluginManager = new ExtPipePluginManager((TsFilePipe) this.runningPipe);
+    }
+
+    if (startExtPipe) {
+      try {
+        extPipePluginManager.startExtPipe(
+            extPipeSinkTypeName, ((ExternalPipeSink) pipeSink).getSinkParams());
+      } catch (IOException e) {
+        logger.error("Failed to start External Pipe: {}.", extPipeSinkTypeName, e);
+        throw new PipeException(
+            "Failed to start External Pipe: " + extPipeSinkTypeName + ". " + e.getMessage());
+      }
+    }
+  }
+
+  public ExtPipePluginManager getExternalPipeManager() {
+    return extPipePluginManager;
+  }
+
+  // endregion
+
   /** IService * */
   @Override
   public void start() throws StartupException {
+    // recover receiver
+    if (syncInfoFetcher.isPipeServerEnable()) {
+      try {
+        startPipeServer(true);
+      } catch (PipeServerException e) {
+        throw new StartupException(e.getMessage());
+      }
+    }
+    // recover sender
     // == Check whether loading extPipe plugin successfully.
     ExtPipePluginRegister extPipePluginRegister = ExtPipePluginRegister.getInstance();
     if (extPipePluginRegister == null) {
@@ -422,19 +495,17 @@ public class SenderService implements IService {
 
   @Override
   public ServiceType getID() {
-    return ServiceType.SENDER_SERVICE;
+    return ServiceType.SYNC_SERVICE;
   }
 
   private void recover() throws IOException, PipeException, StartupException {
-    SyncLogReader analyzer = new SyncLogReader();
-    analyzer.recover();
-    PipeInfo runningPipeInfo = analyzer.getRunningPipeInfo();
-    this.runningPipe =
-        SyncPipeUtil.parsePipeInfoAsPipe(
-            runningPipeInfo, analyzer.getAllPipeSinks().get(runningPipeInfo.getPipeSinkName()));
-    if (runningPipe == null || Pipe.PipeStatus.DROP.equals(runningPipeInfo.getStatus())) {
+    PipeInfo runningPipeInfo = syncInfoFetcher.getRunningPipeInfo();
+    if (runningPipeInfo == null || Pipe.PipeStatus.DROP.equals(runningPipeInfo.getStatus())) {
       return;
     } else {
+      this.runningPipe =
+          SyncPipeUtil.parsePipeInfoAsPipe(
+              runningPipeInfo, syncInfoFetcher.getPipeSink(runningPipeInfo.getPipeSinkName()));
       switch (runningPipeInfo.getStatus()) {
         case RUNNING:
           runningPipe.start();
index 72902febeef36807b8a4099f064c2ff9b8fa8cbd..b1bf9e1c902910450d49228ec6d1626e3df32517 100644 (file)
@@ -45,7 +45,7 @@ public interface ISyncInfoFetcher {
 
   PipeSink getPipeSink(String name);
 
-  List<PipeSink> getAllPipeSink();
+  List<PipeSink> getAllPipeSinks();
   // endregion
 
   // region Interfaces of Pipe
@@ -61,6 +61,8 @@ public interface ISyncInfoFetcher {
 
   List<PipeInfo> getAllPipeInfos();
 
+  PipeInfo getRunningPipeInfo();
+
   // endregion
 
   String getPipeMsg(String pipeName, long createTime);
index e479082e36a655348a25863fe41351e59e268bec..93e12d4c3818ad9bdc775e9c2113fa6a643af4be 100644 (file)
@@ -101,7 +101,7 @@ public class LocalSyncInfoFetcher implements ISyncInfoFetcher {
   }
 
   @Override
-  public List<PipeSink> getAllPipeSink() {
+  public List<PipeSink> getAllPipeSinks() {
     return syncInfo.getAllPipeSink();
   }
 
@@ -154,6 +154,11 @@ public class LocalSyncInfoFetcher implements ISyncInfoFetcher {
     return syncInfo.getAllPipeInfos();
   }
 
+  @Override
+  public PipeInfo getRunningPipeInfo() {
+    return syncInfo.getRunningPipeInfo();
+  }
+
   @Override
   public String getPipeMsg(String pipeName, long createTime) {
     return syncInfo.getPipeMessage(pipeName, createTime, false).getMsg();
index 1aa87001d9cfc0c11d04695af3570ac2bbb118a7..8f00ca3ec9064dc5e121615f795d227b8cf11f6f 100644 (file)
@@ -186,6 +186,11 @@ public class SyncInfo {
     return pipes;
   }
 
+  /** @return null if no pipe has been created */
+  public PipeInfo getRunningPipeInfo() {
+    return runningPipe;
+  }
+
   private void checkIfPipeExistAndRunning(String pipeName) throws PipeException {
     if (runningPipe == null || runningPipe.getStatus() == Pipe.PipeStatus.DROP) {
       throw new PipeException("There is no existing pipe.");
index 361e9e8d57f9bc9403879ae65a829e6b036fcc82..66f8fe3a0383bca44fe41dbac9cf6a9bc3b563d4 100644 (file)
@@ -77,7 +77,7 @@ public class SyncLogReader {
           logger.error("Sync msg log recovery error: log file parse error at line " + lineNum);
           logger.error(e.getMessage());
           throw new StartupException(
-              ServiceType.RECEIVER_SERVICE.getName(),
+              ServiceType.SYNC_SERVICE.getName(),
               "Sync msg log file recover error at line " + lineNum);
         }
       }
diff --git a/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java b/server/src/main/java/org/apache/iotdb/db/sync/receiver/ReceiverService.java
deleted file mode 100644 (file)
index f781932..0000000
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.iotdb.db.sync.receiver;
-
-import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.exception.StartupException;
-import org.apache.iotdb.commons.path.PartialPath;
-import org.apache.iotdb.commons.service.IService;
-import org.apache.iotdb.commons.service.ServiceType;
-import org.apache.iotdb.commons.sync.SyncPathUtil;
-import org.apache.iotdb.db.exception.sync.PipeServerException;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipePlan;
-import org.apache.iotdb.db.qp.physical.sys.ShowPipeServerPlan;
-import org.apache.iotdb.db.query.dataset.ListDataSet;
-import org.apache.iotdb.db.sync.common.ISyncInfoFetcher;
-import org.apache.iotdb.db.sync.common.LocalSyncInfoFetcher;
-import org.apache.iotdb.db.sync.transport.server.TransportServerManager;
-import org.apache.iotdb.rpc.TSStatusCode;
-import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
-import org.apache.iotdb.tsfile.read.common.Field;
-import org.apache.iotdb.tsfile.read.common.RowRecord;
-import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.Collections;
-
-import static org.apache.iotdb.commons.conf.IoTDBConstant.COLUMN_PIPESERVER_STATUS;
-
-public class ReceiverService implements IService {
-  private static final Logger logger = LoggerFactory.getLogger(ReceiverService.class);
-
-  private ISyncInfoFetcher syncInfoFetcher = LocalSyncInfoFetcher.getInstance();
-
-  /**
-   * start receiver service
-   *
-   * @param isRecovery if isRecovery, it will ignore check and force a start
-   */
-  public synchronized void startPipeServer(boolean isRecovery) throws PipeServerException {
-    if (syncInfoFetcher.isPipeServerEnable() && !isRecovery) {
-      return;
-    }
-    try {
-      TransportServerManager.getInstance().startService();
-      TSStatus status = syncInfoFetcher.startPipeServer();
-      if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-        throw new PipeServerException("Failed to start pipe server because " + status.getMessage());
-      }
-    } catch (StartupException e) {
-      throw new PipeServerException("Failed to start pipe server because " + e.getMessage());
-    }
-  }
-
-  /** stop receiver service */
-  public synchronized void stopPipeServer() throws PipeServerException {
-    if (!syncInfoFetcher.isPipeServerEnable()) {
-      return;
-    }
-    TransportServerManager.getInstance().stopService();
-    TSStatus status = syncInfoFetcher.stopPipeServer();
-    if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
-      throw new PipeServerException("Failed to stop pipe server because " + status.getMessage());
-    }
-  }
-
-  private void createDir(String pipeName, String remoteIp, long createTime) {
-    File f = new File(SyncPathUtil.getReceiverFileDataDir(pipeName, remoteIp, createTime));
-    if (!f.exists()) {
-      f.mkdirs();
-    }
-    f = new File(SyncPathUtil.getReceiverPipeLogDir(pipeName, remoteIp, createTime));
-    if (!f.exists()) {
-      f.mkdirs();
-    }
-  }
-
-  /**
-   * query by sql SHOW PIPESERVER STATUS
-   *
-   * @return QueryDataSet contained one column: enable
-   */
-  public QueryDataSet showPipeServer(ShowPipeServerPlan plan) {
-    ListDataSet dataSet =
-        new ListDataSet(
-            Collections.singletonList(new PartialPath(COLUMN_PIPESERVER_STATUS, false)),
-            Collections.singletonList(TSDataType.BOOLEAN));
-    RowRecord rowRecord = new RowRecord(0);
-    Field status = new Field(TSDataType.BOOLEAN);
-    status.setBoolV(syncInfoFetcher.isPipeServerEnable());
-    rowRecord.addField(status);
-    dataSet.putRecord(rowRecord);
-    return dataSet;
-  }
-
-  /** query by sql SHOW PIPE */
-  public QueryDataSet showPipe(ShowPipePlan plan, ListDataSet dataSet) {
-    // TODO: implement show pipe in receiver
-    return dataSet;
-  }
-
-  private ReceiverService() {}
-
-  public static ReceiverService getInstance() {
-    return ReceiverServiceHolder.INSTANCE;
-  }
-
-  /** IService * */
-  @Override
-  public void start() throws StartupException {
-    if (syncInfoFetcher.isPipeServerEnable()) {
-      try {
-        startPipeServer(true);
-      } catch (PipeServerException e) {
-        throw new StartupException(e.getMessage());
-      }
-    }
-  }
-
-  @Override
-  public void stop() {}
-
-  @Override
-  public ServiceType getID() {
-    return ServiceType.RECEIVER_SERVICE;
-  }
-
-  private static class ReceiverServiceHolder {
-    private static final ReceiverService INSTANCE = new ReceiverService();
-
-    private ReceiverServiceHolder() {}
-  }
-}
index 94aae7b40bfdc1a579bb2eea48d6c05cc8c5ef9f..caefd130eb47965326534a78cc9f470e92fed360 100644 (file)
@@ -21,8 +21,8 @@ package org.apache.iotdb.db.sync.sender.pipe;
 
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.exception.sync.PipeException;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
 import org.apache.iotdb.db.sync.transport.client.ITransportClient;
 
 /**
@@ -55,7 +55,7 @@ public interface Pipe {
 
   /**
    * Close this pipe, stop collecting data from IoTDB, but do not delete information about this pipe
-   * on disk. Used for {@linkplain SenderService#shutdown(long)}. Do not change the status of this
+   * on disk. Used for {@linkplain SyncService#shutdown(long)}. Do not change the status of this
    * pipe.
    *
    * @throws PipeException Some inside error happens(such as IOException about disk).
index e15ea9e8cc943e0dd1a91e8568509b65e5d38c00..b68cd44cf4c619d4206b7c0fd616dcce94cd3e63 100644 (file)
@@ -23,11 +23,11 @@ import org.apache.iotdb.commons.sync.SyncConstant;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.exception.SyncConnectionException;
+import org.apache.iotdb.db.sync.SyncService;
 import org.apache.iotdb.db.sync.pipedata.PipeData;
 import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
 import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
 import org.apache.iotdb.db.sync.sender.pipe.Pipe;
-import org.apache.iotdb.db.sync.sender.service.SenderService;
 import org.apache.iotdb.rpc.RpcTransportFactory;
 import org.apache.iotdb.service.transport.thrift.MetaInfo;
 import org.apache.iotdb.service.transport.thrift.TransportStatus;
@@ -427,7 +427,7 @@ public class IoTDBSInkTransportClient implements ITransportClient {
       while (!Thread.currentThread().isInterrupted()) {
         try {
           if (!handshake()) {
-            SenderService.getInstance()
+            SyncService.getInstance()
                 .receiveMsg(
                     PipeMessage.MsgType.ERROR,
                     String.format("Can not handshake with %s:%d.", ipAddress, port));
@@ -437,7 +437,7 @@ public class IoTDBSInkTransportClient implements ITransportClient {
             if (!senderTransport(pipeData)) {
               logger.error(String.format("Can not transfer pipedata %s, skip it.", pipeData));
               // can do something.
-              SenderService.getInstance()
+              SyncService.getInstance()
                   .receiveMsg(
                       PipeMessage.MsgType.WARN,
                       String.format(
index 4bf16d20cfc6c38381c44948a034ee523f515e40..05c8ce1dc7cf57cf997184e24b58f768bbd54844 100644 (file)
@@ -64,7 +64,7 @@ public class TransportServerManager extends ThriftService
 
   @Override
   public ServiceType getID() {
-    return ServiceType.SYNC_SERVICE;
+    return ServiceType.SYNC_RPC_SERVICE;
   }
 
   @Override