Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.rsocket.Payload;
import io.rsocket.frame.FrameType;
import io.rsocket.plugins.RequestInterceptor;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import org.reactivestreams.Subscription;
import reactor.core.CoreSubscriber;
Expand Down Expand Up @@ -179,104 +178,6 @@ public void cancel() {
markTerminated(STATE, this);
}

@Override
@Nullable
public Void block(Duration m) {
return block();
}

@Override
@Nullable
public Void block() {
long previousState = markSubscribed(STATE, this);
if (isSubscribedOrTerminated(previousState)) {
final IllegalStateException e =
new IllegalStateException("FireAndForgetMono allows only a single Subscriber");
final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
}
throw e;
}

final Payload p = this.payload;
try {
if (!isValid(this.mtu, this.maxFrameLength, p, false)) {
lazyTerminate(STATE, this);

final IllegalArgumentException e =
new IllegalArgumentException(
String.format(INVALID_PAYLOAD_ERROR_MESSAGE, this.maxFrameLength));

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, p.metadata());
}

p.release();

throw e;
}
} catch (IllegalReferenceCountException e) {
lazyTerminate(STATE, this);

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(e, FrameType.REQUEST_FNF, null);
}

throw Exceptions.propagate(e);
}

final int streamId;
try {
streamId = this.requesterResponderSupport.getNextStreamId();
} catch (Throwable t) {
lazyTerminate(STATE, this);

final RequestInterceptor requestInterceptor = this.requestInterceptor;
if (requestInterceptor != null) {
requestInterceptor.onReject(Exceptions.unwrap(t), FrameType.REQUEST_FNF, p.metadata());
}

p.release();

throw Exceptions.propagate(t);
}

final RequestInterceptor interceptor = this.requestInterceptor;
if (interceptor != null) {
interceptor.onStart(streamId, FrameType.REQUEST_FNF, p.metadata());
}

try {
sendReleasingPayload(
streamId,
FrameType.REQUEST_FNF,
this.mtu,
this.payload,
this.connection,
this.allocator,
true);
} catch (Throwable e) {
lazyTerminate(STATE, this);

if (interceptor != null) {
interceptor.onTerminate(streamId, FrameType.REQUEST_FNF, e);
}

throw Exceptions.propagate(e);
}

lazyTerminate(STATE, this);

if (interceptor != null) {
interceptor.onTerminate(streamId, FrameType.REQUEST_FNF, null);
}

return null;
}

@Override
public Object scanUnsafe(Scannable.Attr key) {
return null; // no particular key to be represented, still useful in hooks
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@
import io.rsocket.DuplexConnection;
import io.rsocket.Payload;
import io.rsocket.frame.MetadataPushFrameCodec;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
import reactor.core.CoreSubscriber;
import reactor.core.Scannable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Operators;
import reactor.util.annotation.NonNull;
import reactor.util.annotation.Nullable;

final class MetadataPushRequesterMono extends Mono<Void> implements Scannable {

Expand Down Expand Up @@ -114,63 +112,6 @@ public void subscribe(CoreSubscriber<? super Void> actual) {
Operators.complete(actual);
}

@Override
@Nullable
public Void block(Duration m) {
return block();
}

@Override
@Nullable
public Void block() {
long previousState = markSubscribed(STATE, this);
if (isSubscribedOrTerminated(previousState)) {
throw new IllegalStateException("MetadataPushMono allows only a single Subscriber");
}

final Payload p = this.payload;
final ByteBuf metadata;
try {
final boolean hasMetadata = p.hasMetadata();
metadata = p.metadata();
if (hasMetadata) {
lazyTerminate(STATE, this);
p.release();
throw new IllegalArgumentException("Metadata push does not support metadata field");
}
if (!isValidMetadata(this.maxFrameLength, metadata)) {
lazyTerminate(STATE, this);
p.release();
throw new IllegalArgumentException("Too Big Payload size");
}
} catch (IllegalReferenceCountException e) {
lazyTerminate(STATE, this);
throw e;
}

final ByteBuf metadataRetainedSlice;
try {
metadataRetainedSlice = metadata.retainedSlice();
} catch (IllegalReferenceCountException e) {
lazyTerminate(STATE, this);
throw e;
}

try {
p.release();
} catch (IllegalReferenceCountException e) {
lazyTerminate(STATE, this);
metadataRetainedSlice.release();
throw e;
}

final ByteBuf requestFrame =
MetadataPushFrameCodec.encode(this.allocator, metadataRetainedSlice);
this.connection.sendFrame(0, requestFrame);

return null;
}

@Override
public Object scanUnsafe(Attr key) {
return null; // no particular key to be represented, still useful in hooks
Expand Down