Load balancing transfer requests across over agents master
authorDimuthu Wannipurage <dimuthu.upeksha2@gmail.com>
Tue, 17 May 2022 15:36:16 +0000 (11:36 -0400)
committerDimuthu Wannipurage <dimuthu.upeksha2@gmail.com>
Tue, 17 May 2022 15:36:16 +0000 (11:36 -0400)
agent/src/main/java/org/apache/airavata/mft/agent/MFTAgent.java
common/common-clients/src/main/java/org/apache/airavata/mft/admin/MFTConsulClient.java
common/common-clients/src/main/java/org/apache/airavata/mft/admin/models/AgentInfo.java
controller/src/main/java/org/apache/airavata/mft/controller/MFTController.java

index 18e7ba6f004b18848c63f6041e39f40c5be03010..9490225dd218624f8afb471d9d70418223108a46 100644 (file)
@@ -366,6 +366,7 @@ public class MFTAgent implements CommandLineRunner {
                     .setId(agentId)
                     .setHost(agentHost)
                     .setUser(agentUser)
+                    .setSessionId(this.session)
                     .setSupportedProtocols(Arrays.asList(supportedProtocols.split(",")))
                     .setLocalStorages(new ArrayList<>()));
         }
index e46fc9b4d4ae07ccbbd200ea4001404c2a112288..8fdeaaf044b183bedc16637cb0b5e7d920eb8bcd 100644 (file)
@@ -207,6 +207,27 @@ public class MFTConsulClient {
         }
     }
 
+    /**
+     * Lists all currently processing transfer id for the given agent
+     *
+     * @param agentInfo
+     * @return
+     * @throws MFTConsulClientException
+     */
+    public List<String> getAgentActiveTransferIds(AgentInfo agentInfo) throws MFTConsulClientException {
+        try {
+            List<String> keys = kvClient.getKeys(MFTConsulClient.AGENTS_SCHEDULED_PATH + agentInfo.getId() + "/" + agentInfo.getSessionId());
+            return keys.stream().map(key -> key.substring(key.lastIndexOf("/") + 1)).collect(Collectors.toList());
+        } catch (ConsulException e) {
+            if (e.getCode() == 404) {
+                return Collections.emptyList();
+            }
+            throw new MFTConsulClientException("Error in fetching active transfers for agent " + agentInfo.getId(), e);
+        } catch (Exception e) {
+            throw new MFTConsulClientException("Error in fetching active transfers for agent " + agentInfo.getId(), e);
+        }
+    }
+
     /**
      * Agents should call this method to submit {@link TransferState}. These status are received by the controller and reorder
      * status messages and put in the final status array.
@@ -237,12 +258,8 @@ public class MFTConsulClient {
      */
     public void saveTransferState(String transferId, TransferState transferState) throws MFTConsulClientException {
         try {
-            List<TransferState> allStates = getTransferStates(transferId);
-            // TODO implement sequence consistency
-            allStates.add(transferState);
-            String asStr = mapper.writeValueAsString(allStates);
-            kvClient.putValue(TRANSFER_STATE_PATH + transferId, asStr);
-
+            String asStr = mapper.writeValueAsString(transferState);
+            kvClient.putValue(TRANSFER_STATE_PATH + transferId + "/" + UUID.randomUUID().toString(), asStr);
             logger.info("Saved transfer status " + asStr);
 
         } catch (Exception e) {
@@ -287,15 +304,20 @@ public class MFTConsulClient {
      * @throws IOException
      */
     public List<TransferState> getTransferStates(String transferId) throws IOException {
-        Optional<Value> valueOp = kvClient.getValue(TRANSFER_STATE_PATH + transferId);
-        List<TransferState> allStates;
-        if (valueOp.isPresent()) {
-            String prevStates = valueOp.get().getValueAsString().get();
-            allStates = new ArrayList<>(Arrays.asList(mapper.readValue(prevStates, TransferState[].class)));
-        } else {
-            allStates = new ArrayList<>();
+        List<String> keys = kvClient.getKeys(TRANSFER_STATE_PATH + transferId);
+
+        List<TransferState> allStates = new ArrayList<>();
+
+        for (String key: keys) {
+            Optional<Value> valueOp = kvClient.getValue(key);
+            String stateAsStr = valueOp.get().getValueAsString().get();
+            TransferState transferState = mapper.readValue(stateAsStr, TransferState.class);
+            allStates.add(transferState);
         }
-        return allStates;
+        List<TransferState> sortedStates = allStates.stream().sorted((o1, o2) ->
+                (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) < 0 ? -1 :
+                (o1.getUpdateTimeMils() - o2.getUpdateTimeMils()) == 0 ? 0 : 1).collect(Collectors.toList());
+        return sortedStates;
     }
 
     public List<AgentInfo> getLiveAgentInfos() throws MFTConsulClientException {
index db350f72f6c859d6d63b35ff759321cbd0195fe4..e015702ed5534c1ad3e5ab15b8dbc22fd00c2af4 100644 (file)
@@ -24,6 +24,7 @@ public class AgentInfo {
     private String host;
     private String user;
     private boolean sudo;
+    private String sessionId;
     private List<String> supportedProtocols;
     private List<String> localStorages;
 
@@ -80,4 +81,13 @@ public class AgentInfo {
         this.localStorages = localStorages;
         return this;
     }
+
+    public String getSessionId() {
+        return sessionId;
+    }
+
+    public AgentInfo setSessionId(String sessionId) {
+        this.sessionId = sessionId;
+        return this;
+    }
 }
index 2576cee84c1458c6c4d7f204170a8eae600efbf1..77fb1231ca1136261536424e8abe1319a63f5b9f 100644 (file)
@@ -25,6 +25,7 @@ import com.orbitz.consul.cache.KVCache;
 import com.orbitz.consul.model.kv.Value;
 import org.apache.airavata.mft.admin.MFTConsulClient;
 import org.apache.airavata.mft.admin.MFTConsulClientException;
+import org.apache.airavata.mft.admin.models.AgentInfo;
 import org.apache.airavata.mft.admin.models.TransferState;
 import org.apache.airavata.mft.api.service.TransferApiRequest;
 import org.slf4j.Logger;
@@ -40,10 +41,12 @@ import javax.annotation.PreDestroy;
 import java.io.IOException;
 import java.util.List;
 import java.util.Optional;
+import java.util.Random;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 @SpringBootApplication()
 @ComponentScan(basePackages = {"org.apache.airavata.mft"})
@@ -247,7 +250,19 @@ public class MFTController implements CommandLineRunner {
                 selectedAgent = possibleAgent.get();
             }
         } else if (!transferRequest.getAffinityTransfer()){
-            selectedAgent = liveAgentIds.get(0);
+            List<Optional<AgentInfo>> agentInfos = liveAgentIds.stream().map(id -> mftConsulClient.getAgentInfo(id)).collect(Collectors.toList());
+            int transferCount = -1;
+            for (Optional<AgentInfo> agentInfo : agentInfos) {
+                if (agentInfo.isPresent()) {
+                    if (transferCount == -1) {
+                        transferCount = mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
+                        selectedAgent = agentInfo.get().getId();
+                    } else if (transferCount > mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size()) {
+                        transferCount = mftConsulClient.getAgentActiveTransferIds(agentInfo.get()).size();
+                        selectedAgent = agentInfo.get().getId();
+                    }
+                }
+            }
         }
 
         if (selectedAgent == null) {