Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Refactor] Download for workflow #553

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@
</properties>

<dependencies>
<!-- https://mvnrepository.com/artifact/com.google.code.gson/gson -->
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>


<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
Expand Down
12 changes: 10 additions & 2 deletions src/main/java/com/gw/database/HistoryRepository.java
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
package com.gw.database;

import com.gw.jpa.History;
import com.gw.jpa.HistoryDTO;

import java.util.Collection;
import java.util.List;
import java.util.Set;

import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.query.Param;

import javax.transaction.Transactional;

Expand Down Expand Up @@ -163,4 +164,11 @@ public interface HistoryRepository extends JpaRepository<History, String> {
@Query(value = "SELECT * FROM history, gwprocess WHERE history.history_id = ?1 AND history.history_process = gwprocess.id", nativeQuery = true)
List<Object[]> findOneHistoryofProcess(String history_id);


@Query(
value = "SELECT * FROM history WHERE history_process IN (:processIds)",
nativeQuery = true)
List<History> findByProcessIds(@Param("processIds") List<String> processIds);


}
9 changes: 9 additions & 0 deletions src/main/java/com/gw/database/ProcessRepository.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

import com.gw.jpa.GWProcess;
import java.util.Collection;
import java.util.List;
import java.util.Set;

import org.springframework.data.jpa.repository.Query;
import org.springframework.data.repository.CrudRepository;
import org.springframework.data.repository.query.Param;
Expand Down Expand Up @@ -77,4 +80,10 @@ public interface ProcessRepository extends CrudRepository<GWProcess, String> {
*/
@Query(value = "select * from gwprocess where lang = 'jupyter'", nativeQuery = true)
Collection<GWProcess> findNotebookProcess();


@Query(
value = "SELECT * FROM gwprocess WHERE id IN :ids",
nativeQuery = true)
List<GWProcess> findProcessesByIds(@Param("ids") List<String> ids);
}
12 changes: 6 additions & 6 deletions src/main/java/com/gw/tools/ProcessTool.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,7 @@

import java.io.File;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.*;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -744,6 +741,9 @@ public String all_history(String pid, String mode) {
return all_history(pid, false, mode);

}




public List<GWProcess> getProcessesByIds(List<String> processIds) {
return processrepository.findProcessesByIds(processIds);
}
}
227 changes: 60 additions & 167 deletions src/main/java/com/gw/tools/WorkflowTool.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.gw.tools;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import com.gw.database.CheckpointRepository;
import com.gw.database.HistoryRepository;
import com.gw.database.ProcessRepository;
import com.gw.database.WorkflowRepository;
import com.gw.jpa.ExecutionStatus;
import com.gw.jpa.GWProcess;
Expand All @@ -14,13 +16,9 @@
import com.gw.utils.RandomString;
import java.io.File;
import java.nio.file.FileSystems;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.*;
import java.util.stream.Collectors;

import org.apache.log4j.Logger;
import org.json.simple.JSONArray;
import org.json.simple.JSONObject;
Expand All @@ -43,6 +41,9 @@ public class WorkflowTool {

@Autowired WorkflowRepository workflowrepository;

@Autowired
ProcessRepository processRepository;

@Autowired HistoryRepository historyrepository;

@Autowired CheckpointRepository checkpointrepository;
Expand Down Expand Up @@ -433,7 +434,7 @@ public String all_active_process() {
/**
* show the history of every execution of the workflow
*
* @param string
* @param workflow_id
* @return
*/
public String all_history(String workflow_id) {
Expand Down Expand Up @@ -587,198 +588,90 @@ public String getExportModeById(int mode_no) {
* @return
* @throws ParseException
*/
public String download(String wid, String option) throws ParseException {

public String download(String wid, String option) throws ParseException {
Workflow wf = this.getById(wid);

String fileurl = "download/temp/" + wf.getId() + ".zip";

String savefilepath =
bt.getFileTransferFolder() + wf.getId() + FileSystems.getDefault().getSeparator();
String savefilepath = bt.getFileTransferFolder() + wf.getId() + FileSystems.getDefault().getSeparator();

File tf = new File(savefilepath);
if (tf.exists()) {
bt.deleteDirectory(tf);
}
tf.mkdirs();

bt.deleteDirectory(tf);

if (!tf.exists()) tf.mkdirs();

String workflowstring = bt.toJSON(wf);
Gson gson = new Gson();

bt.writeString2File(workflowstring, savefilepath + "workflow.json");
// Write workflow to JSON
String workflowJson = gson.toJson(wf);
bt.writeString2File(workflowJson, savefilepath + "workflow.json");

if (option.contains("processcode")) {

JSONParser jsonParser = new JSONParser();

JSONArray arrayobj = (JSONArray) jsonParser.parse(wf.getNodes());

String codesavefile = savefilepath + "code" + FileSystems.getDefault().getSeparator();

File codef = new File(codesavefile);
if (!codef.exists()) {
codef.mkdirs();
}

if (!codef.exists()) codef.mkdirs();

StringBuffer processjson = new StringBuffer("[");

String prefix = "";

for (int i = 0; i < arrayobj.size(); i++) {

try {

JSONObject jsonObj = (JSONObject) arrayobj.get(i);

String process_workflow_id = (String) jsonObj.get("id");

String process_id = process_workflow_id.split("-")[0];

String targetsourcefile = codesavefile + pt.getProcessFileName(process_id);

if (new File(targetsourcefile).exists()) continue;

GWProcess p = pt.getProcessById(process_id);

bt.writeString2File(p.getCode(), targetsourcefile);

processjson.append(prefix);

prefix = ",";

processjson.append(pt.toJSON(p));

} catch (Exception e) {

e.printStackTrace();
}
List<String> processIds = new ArrayList<>();
for (Object obj : arrayobj) {
JSONObject jsonObj = (JSONObject) obj;
String process_workflow_id = (String) jsonObj.get("id");
String process_id = process_workflow_id.split("-")[0];
processIds.add(process_id);
}

processjson.append("]");
// Fetch all processes at once
List<GWProcess> processes = pt.getProcessesByIds(processIds);
String processJson = gson.toJson(processes);

bt.writeString2File(processJson, codesavefile + "process.json");

bt.writeString2File(processjson.toString(), codesavefile + "process.json");
// Write individual process code files
processes.parallelStream().forEach(process -> {
String targetSourceFile = codesavefile + pt.getProcessFileName(process.getId());
bt.writeString2File(process.getCode(), targetSourceFile);
});
}

if (option.contains("history")) {
String wfhistorysavefile = savefilepath + "history" + FileSystems.getDefault().getSeparator() + wid + ".json";

String wfhistorysavefile =
savefilepath + "history" + FileSystems.getDefault().getSeparator() + wid + ".json";

// first save all history of the workflow

// Fetch workflow history
List<History> histlist = historyrepository.findByWorkflowId(wid);
String workflowHistoryJson = gson.toJson(histlist);
bt.writeString2File(workflowHistoryJson, wfhistorysavefile);

StringBuffer workflowhistory = new StringBuffer("[");

String prefix = "";

for (History h : histlist) {

if ("workflowwithprocesscodegoodhistory".equals(option)
&& !ExecutionStatus.DONE.equals(h.getIndicator())) {

continue;
}

String historystr = bt.toJSON(h);

workflowhistory.append(prefix);

prefix = ",";

workflowhistory.append(historystr);
}
;

workflowhistory.append("]");

bt.writeString2File(workflowhistory.toString(), wfhistorysavefile);

// second, save process history of one workflow execution into a file
HashSet<String> process_id_set = new HashSet<>();

// Collect process IDs and fetch process histories
HashSet<String> processIdSet = new HashSet<>();
for (History h : histlist) {

if ("workflowwithprocesscodegoodhistory".equals(option)
&& !ExecutionStatus.DONE.equals(h.getIndicator())) {

if ("workflowwithprocesscodegoodhistory".equals(option) && !ExecutionStatus.DONE.equals(h.getIndicator())) {
continue;
}

String[] processhistorylist = h.getHistory_output().split(";");

prefix = "";

String processhistorysavefile =
savefilepath
+ "history"
+ FileSystems.getDefault().getSeparator()
+ h.getHistory_id()
+ ".json"; // all the process history of one workflow run

StringBuffer processhistorybuffer = new StringBuffer("[");

for (String processhitoryid : processhistorylist) {

Optional<History> hisop = historyrepository.findById(processhitoryid);

if (hisop.isPresent()) {

History hist = hisop.get();

if ("workflowwithprocesscodegoodhistory".equals(option)
&& !ExecutionStatus.DONE.equals(hist.getIndicator())) {

continue;
}

processhistorybuffer.append(prefix);

prefix = ",";

processhistorybuffer.append(bt.toJSON(hist));

if (!process_id_set.contains(hist.getHistory_process()))
process_id_set.add(hist.getHistory_process());
}
}

processhistorybuffer.append("]");

bt.writeString2File(processhistorybuffer.toString(), processhistorysavefile);
String[] processHistoryList = h.getHistory_output().split(";");
Collections.addAll(processIdSet, processHistoryList);
}

// if need all the history of the involved processes, go into this if
if (option.contains("allhistory") || "workflowwithprocesscodegoodhistory".equals(option)) {

for (String history_process_id : process_id_set) {

histlist = historyrepository.findByProcessIdFull(history_process_id);
if (!processIdSet.isEmpty()) {
// Fetch all process histories at once
List<String> processIdList = new ArrayList<>(processIdSet);
List<History> allProcessHistories = historyrepository.findByProcessIds(processIdList);
Map<String, List<History>> processHistoriesMap = allProcessHistories.stream()
.collect(Collectors.groupingBy(History::getHistory_process));

StringBuffer allprocesshistorybuffer = new StringBuffer("[");
// Write each process history to its respective file
processIdSet.parallelStream().forEach(processId -> {
List<History> processHistories = processHistoriesMap.get(processId);
String processHistoryJson = gson.toJson(processHistories);

// every process has a history file
String allprocesshistorysavefile =
savefilepath
+ "history"
+ FileSystems.getDefault().getSeparator()
+ "process_"
+ history_process_id
+ ".json";

for (History hist : histlist) {

if ("workflowwithprocesscodegoodhistory".equals(option)
&& !ExecutionStatus.DONE.equals(hist.getIndicator())) {

continue;
}

allprocesshistorybuffer.append(bt.toJSON(hist)).append(",");
}

allprocesshistorybuffer.append("]");

bt.writeString2File(allprocesshistorybuffer.toString(), allprocesshistorysavefile);
}
String processHistorySaveFile = savefilepath + "history" + FileSystems.getDefault().getSeparator() + "process_" + processId + ".json";
bt.writeString2File(processHistoryJson, processHistorySaveFile);
});
}
}

Expand Down
Loading