Skip to content

Commit f6df3e7

Browse files
authored
Merge pull request #2686 from akto-api-security/feature/support_for_event_stream
add support for event stream response in api executor
2 parents e2f6d20 + d2ab257 commit f6df3e7

File tree

4 files changed

+112
-15
lines changed

4 files changed

+112
-15
lines changed

apps/testing/src/main/java/com/akto/test_editor/execution/Executor.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -492,6 +492,14 @@ public ExecutorSingleOperationResp invokeOperation(String operationType, Object
492492
queryData.put(TestExecutorModifier._OPERATION, operation);
493493
BasicDBObject generatedData = new TestExecutorModifier().handle(queryData);
494494
generatedOperationKeyValuePairs = parseGeneratedKeyValues(generatedData, operationTypeLower, value);
495+
496+
if (generatedOperationKeyValuePairs != null) {
497+
loggerMaker.infoAndAddToDb("Generated data in invokeOperation: operation:" + operation
498+
+ " output: " + generatedOperationKeyValuePairs.toString());
499+
} else {
500+
loggerMaker.errorAndAddToDb("Generated data is null for operation: " + operation);
501+
}
502+
495503
}
496504
}
497505

@@ -500,7 +508,7 @@ public ExecutorSingleOperationResp invokeOperation(String operationType, Object
500508
}
501509

502510
try {
503-
if(!generatedOperationKeyValuePairs.isEmpty()){
511+
if (generatedOperationKeyValuePairs != null && !generatedOperationKeyValuePairs.isEmpty()) {
504512
ExecutorSingleOperationResp resp = new ExecutorSingleOperationResp(false, "AI generated operation key value pairs, executing them");
505513
for (BasicDBObject generatedPair : generatedOperationKeyValuePairs) {
506514
String generatedKey = generatedPair.keySet().iterator().next();

libs/dao/src/main/java/com/akto/util/HttpRequestResponseUtils.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ public class HttpRequestResponseUtils {
3737

3838
public static final String FORM_URL_ENCODED_CONTENT_TYPE = "application/x-www-form-urlencoded";
3939
public static final String GRPC_CONTENT_TYPE = "application/grpc";
40+
public static final String TEXT_EVENT_STREAM_CONTENT_TYPE = "text/event-stream";
41+
public static final String CONTENT_TYPE = "CONTENT-TYPE";
4042

4143
public static List<SingleTypeInfo> generateSTIsFromPayload(int apiCollectionId, String url, String method,String body, int responseCode) {
4244
int now = Context.now();

libs/utils/src/main/java/com/akto/test_editor/filter/Filter.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public Filter() {
3939
public static Object generateQuerySet(FilterActionRequest filterActionRequest) {
4040
Object querySet = filterActionRequest.getQuerySet();
4141
String operationTypeLower = filterActionRequest.getOperand().toLowerCase();
42+
String operation = "";
4243
Object newQuerySet = querySet;
4344
try {
4445
int accountId = Context.accountId.get();
@@ -64,7 +65,7 @@ public static Object generateQuerySet(FilterActionRequest filterActionRequest) {
6465

6566
if(!operationPrompt.isEmpty()){
6667

67-
String operation = operationTypeLower + ": " + operationPrompt;
68+
operation = operationTypeLower + ": " + operationPrompt;
6869
if(filterActionRequest.getConcernedProperty() != null && !filterActionRequest.getConcernedProperty().isEmpty()) {
6970
operation = operation + " in " + filterActionRequest.getConcernedProperty();
7071
}
@@ -99,6 +100,16 @@ public static Object generateQuerySet(FilterActionRequest filterActionRequest) {
99100
} catch (Exception e) {
100101
loggerMaker.errorAndAddToDb(e, "error invoking operation " + operationTypeLower + " " + e.getMessage());
101102
}
103+
104+
if(newQuerySet == null) {
105+
loggerMaker.infoAndAddToDb("Generated query set: operation:" + operation + " output: null");
106+
} else if (newQuerySet instanceof List) {
107+
loggerMaker.infoAndAddToDb(
108+
"Generated query set: operation:" + operation + " output: " + newQuerySet.toString());
109+
} else {
110+
loggerMaker.infoAndAddToDb("Generated query set: operation:" + operation + " output: " + newQuerySet);
111+
}
112+
102113
return newQuerySet;
103114
}
104115

libs/utils/src/main/java/com/akto/testing/ApiExecutor.java

Lines changed: 89 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ private static OriginalHttpResponse common(Request request, boolean followRedire
4646
while (RateLimitHandler.getInstance(accountId).shouldWait(request)) {
4747
if(rateLimitHit){
4848
if (!(request.url().toString().contains("insertRuntimeLog") || request.url().toString().contains("insertTestingLog") || request.url().toString().contains("insertProtectionLog"))) {
49-
loggerMaker.infoAndAddToDb("Rate limit hit, sleeping", LogDb.TESTING);
49+
loggerMaker.infoAndAddToDb("Rate limit hit, sleeping");
5050
}else {
5151
System.out.println("Rate limit hit, sleeping");
5252
}
@@ -57,7 +57,7 @@ private static OriginalHttpResponse common(Request request, boolean followRedire
5757

5858
if (i%30 == 0) {
5959
if (!(request.url().toString().contains("insertRuntimeLog") || request.url().toString().contains("insertTestingLog") || request.url().toString().contains("insertProtectionLog"))) {
60-
loggerMaker.infoAndAddToDb("waiting for rate limit availability", LogDb.TESTING);
60+
loggerMaker.infoAndAddToDb("waiting for rate limit availability");
6161
}else{
6262
System.out.println("waiting for rate limit availability");
6363
}
@@ -81,13 +81,27 @@ private static OriginalHttpResponse common(Request request, boolean followRedire
8181

8282
Call call = client.newCall(request);
8383
Response response = null;
84-
String body;
84+
String body = null;
8585
byte[] grpcBody = null;
86+
Map<String, List<String>> responseHeaders = new HashMap<>();
8687
try {
8788
response = call.execute();
88-
ResponseBody responseBody = response.peekBody(MAX_RESPONSE_SIZE);
89-
if (responseBody == null) {
90-
throw new Exception("Couldn't read response body");
89+
Headers headers = response.headers();
90+
responseHeaders = generateHeadersMapFromHeadersObject(headers);
91+
92+
String contentTypeHeader = getHeaderValue(responseHeaders, HttpRequestResponseUtils.CONTENT_TYPE);
93+
94+
if (contentTypeHeader != null && !contentTypeHeader.isEmpty() &&
95+
contentTypeHeader.equalsIgnoreCase(HttpRequestResponseUtils.TEXT_EVENT_STREAM_CONTENT_TYPE)) {
96+
body = getEventStreamResponseBodyWithTimeout(response, 20000); // 20 seconds timeout
97+
}
98+
99+
ResponseBody responseBody = null;
100+
if (body == null) {
101+
responseBody = response.peekBody(MAX_RESPONSE_SIZE);
102+
if (responseBody == null) {
103+
throw new Exception("Couldn't read response body");
104+
}
91105
}
92106
try {
93107
if (requestProtocol != null && requestProtocol.contains(HttpRequestResponseUtils.GRPC_CONTENT_TYPE)) {//GRPC request
@@ -98,13 +112,13 @@ private static OriginalHttpResponse common(Request request, boolean followRedire
98112
builder.append(b).append(",");
99113
}
100114
if (!(request.url().toString().contains("insertRuntimeLog") || request.url().toString().contains("insertTestingLog") || request.url().toString().contains("insertProtectionLog"))) {
101-
loggerMaker.infoAndAddToDb(builder.toString(), LogDb.TESTING);
115+
loggerMaker.infoAndAddToDb(builder.toString());
102116
}else {
103117
System.out.println(builder.toString());
104118
}
105119
String responseBase64Encoded = Base64.getEncoder().encodeToString(grpcBody);
106120
if (!(request.url().toString().contains("insertRuntimeLog") || request.url().toString().contains("insertTestingLog") || request.url().toString().contains("insertProtectionLog"))) {
107-
loggerMaker.infoAndAddToDb("grpc response base64 encoded:" + responseBase64Encoded, LogDb.TESTING);
121+
loggerMaker.infoAndAddToDb("grpc response base64 encoded:" + responseBase64Encoded);
108122
}else {
109123
System.out.println("grpc response base64 encoded:" + responseBase64Encoded);
110124
}
@@ -115,7 +129,13 @@ private static OriginalHttpResponse common(Request request, boolean followRedire
115129
body = HttpRequestResponseUtils.convertXmlToJson(responseBody.string());
116130
}
117131
else {
118-
body = responseBody.string();
132+
if (body == null) {
133+
if(responseBody != null){
134+
body = responseBody.string();
135+
} else {
136+
body = "{}"; // default to empty json if response body is null
137+
}
138+
}
119139
}
120140
} catch (IOException e) {
121141
if (!(request.url().toString().contains("insertRuntimeLog") || request.url().toString().contains("insertTestingLog") || request.url().toString().contains("insertProtectionLog"))) {
@@ -138,14 +158,36 @@ private static OriginalHttpResponse common(Request request, boolean followRedire
138158
}
139159
}
140160

141-
int statusCode = response.code();
142-
Headers headers = response.headers();
143-
144-
Map<String, List<String>> responseHeaders = generateHeadersMapFromHeadersObject(headers);
161+
int statusCode = -1;
162+
if (response != null) {
163+
statusCode = response.code();
164+
} else {
165+
loggerMaker.errorAndAddToDb("Response is null when trying to access status code and headers", LogDb.TESTING);
166+
}
145167
return new OriginalHttpResponse(body, responseHeaders, statusCode);
146168
}
147169

170+
private static String getHeaderValue(Map<String, List<String>> responseHeaders, String headerName) {
171+
if (responseHeaders == null || responseHeaders.isEmpty()) {
172+
return null;
173+
}
174+
175+
for (String key : responseHeaders.keySet()) {
176+
if (key.equalsIgnoreCase(headerName)) {
177+
List<String> values = responseHeaders.get(key);
178+
if (values != null && !values.isEmpty()) {
179+
return values.get(0).toUpperCase();
180+
}
181+
}
182+
}
183+
return null;
184+
}
185+
148186
public static Map<String, List<String>> generateHeadersMapFromHeadersObject(Headers headers) {
187+
if (headers == null || headers.size() == 0) {
188+
return Collections.emptyMap();
189+
}
190+
149191
Iterator<Pair<String, String>> headersIterator = headers.iterator();
150192
Map<String, List<String>> responseHeaders = new HashMap<>();
151193
while (headersIterator.hasNext()) {
@@ -556,6 +598,39 @@ private static boolean isJsonRpcRequest(OriginalHttpRequest request) {
556598
return false;
557599
}
558600
}
601+
602+
private static String getEventStreamResponseBodyWithTimeout(Response response, long timeoutMs) throws IOException {
603+
StringBuilder sb = new StringBuilder();
604+
InputStream is = response.body().byteStream();
605+
Scanner scanner = new Scanner(is);
606+
607+
long startTime = System.currentTimeMillis();
608+
int waitLoopCount = 0;
609+
610+
try {
611+
while (System.currentTimeMillis() - startTime < timeoutMs) {
612+
if (scanner.hasNextLine()) {
613+
String line = scanner.nextLine();
614+
sb.append(line).append("\n");
615+
waitLoopCount = 0; // Reset wait loop count on new data
616+
} else {
617+
// Avoid tight CPU loop; sleep briefly
618+
waitLoopCount++;
619+
if (waitLoopCount > 20) { // If no data for a while, break
620+
break;
621+
}
622+
Thread.sleep(100);
623+
}
624+
}
625+
} catch (Exception e) {
626+
// Read timeout or interruption: end gracefully
627+
sb.append("\n[Stream ended early: ").append(e.getMessage()).append("]");
628+
} finally {
629+
scanner.close();
630+
}
631+
return sb.toString();
632+
}
633+
559634

560635
private static class SseSession {
561636
String sessionId;
@@ -597,6 +672,7 @@ private static SseSession openSseSession(String host, boolean debug) throws Exce
597672
}
598673
}
599674
}
675+
scanner.close();
600676
// Keep the stream open for later reading
601677
session.messages = Collections.synchronizedList(new ArrayList<>());
602678
new Thread(() -> {

0 commit comments

Comments
 (0)