Skip to content

Commit d92d9a4

Browse files
authored
[#4873] unified Flowable conversion to prevent internal typecasting exceptions. (#4909)
1 parent cacfb79 commit d92d9a4

2 files changed

Lines changed: 25 additions & 29 deletions

File tree

common/common-rest/src/main/java/org/apache/servicecomb/common/rest/filter/inner/ServerRestArgsFilter.java

Lines changed: 18 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import org.apache.servicecomb.common.rest.filter.HttpServerFilter;
3232
import org.apache.servicecomb.core.Invocation;
3333
import org.apache.servicecomb.core.definition.OperationMeta;
34+
import org.apache.servicecomb.foundation.common.concurrency.SuppressedRunnableWrapper;
3435
import org.apache.servicecomb.foundation.common.utils.PartUtils;
3536
import org.apache.servicecomb.foundation.vertx.http.HttpServletRequestEx;
3637
import org.apache.servicecomb.foundation.vertx.http.HttpServletResponseEx;
@@ -126,19 +127,15 @@ public void onSubscribe(Subscription s) {
126127

127128
@Override
128129
public void onNext(Object o) {
129-
try {
130-
writeResponse(responseEx, produceProcessor, o, response).whenComplete((r, e) -> {
131-
if (e != null) {
132-
subscription.cancel();
133-
result.completeExceptionally(e);
134-
return;
135-
}
130+
writeResponse(responseEx, produceProcessor, o, response).thenApply(r -> {
136131
subscription.request(1);
132+
return r;
133+
})
134+
.exceptionally(e -> {
135+
new SuppressedRunnableWrapper(() -> subscription.cancel()).run();
136+
new SuppressedRunnableWrapper(() -> result.completeExceptionally(e)).run();
137+
return response;
137138
});
138-
} catch (Throwable e) {
139-
LOGGER.warn("Failed to subscribe event: {}", o, e);
140-
result.completeExceptionally(e);
141-
}
142139
}
143140

144141
@Override
@@ -158,22 +155,18 @@ private static CompletableFuture<Response> writeResponse(
158155
HttpServletResponseEx responseEx, ProduceProcessor produceProcessor, Object data, Response response) {
159156
try (BufferOutputStream output = new BufferOutputStream(Buffer.buffer())) {
160157
produceProcessor.encodeResponse(output, data);
161-
CompletableFuture<Response> result = new CompletableFuture<>();
162-
responseEx.sendBuffer(output.getBuffer()).whenComplete((v, e) -> {
163-
if (e != null) {
164-
result.completeExceptionally(e);
165-
}
166-
try {
167-
responseEx.flushBuffer();
168-
} catch (IOException ex) {
169-
LOGGER.warn("Failed to flush buffer for Server Send Events", ex);
170-
}
171-
});
172-
result.complete(response);
173-
return result;
158+
return responseEx.sendBuffer(output.getBuffer())
159+
.thenApply(v -> {
160+
try {
161+
responseEx.flushBuffer();
162+
return response;
163+
} catch (IOException e) {
164+
LOGGER.warn("Failed to flush buffer for Server Send Events", e);
165+
throw new IllegalStateException("Failed to flush buffer for Server Send Events", e);
166+
}
167+
});
174168
} catch (Throwable e) {
175169
LOGGER.error("internal service error must be fixed.", e);
176-
responseEx.setStatus(500);
177170
return CompletableFuture.failedFuture(e);
178171
}
179172
}

swagger/swagger-invocation/invocation-core/src/main/java/org/apache/servicecomb/swagger/invocation/response/producer/PublisherProducerResponseMapper.java

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import org.apache.servicecomb.swagger.invocation.Response;
2020
import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
21+
import org.reactivestreams.Publisher;
2122

2223
import io.reactivex.rxjava3.core.Flowable;
2324
import jakarta.ws.rs.core.Response.StatusType;
@@ -31,12 +32,14 @@ public PublisherProducerResponseMapper(boolean shouldConstructEntity) {
3132

3233
@Override
3334
public Response mapResponse(StatusType status, Object result) {
35+
// Unified Flowable conversion to prevent internal typecasting exceptions.
36+
final Flowable<?> flowableResult = result instanceof Flowable ?
37+
(Flowable<?>) result : Flowable.fromPublisher(((Publisher<?>) result));
3438
if (shouldConstructEntity) {
35-
Flowable<SseEventResponseEntity<?>> responseEntity = ((Flowable<?>) result).map(obj ->
36-
new SseEventResponseEntity<>()
37-
.data(obj));
39+
Flowable<SseEventResponseEntity<?>> responseEntity = flowableResult
40+
.map(obj -> new SseEventResponseEntity<>().data(obj));
3841
return Response.create(status, responseEntity);
3942
}
40-
return Response.create(status, result);
43+
return Response.create(status, flowableResult);
4144
}
4245
}

0 commit comments

Comments
 (0)