Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,17 @@ private void handleIncomingErrors() {

@Override
public Mono<Void> sendMessage(JSONRPCMessage message) {
if (this.outboundSink.tryEmitNext(message).isSuccess()) {
// TODO: essentially we could reschedule ourselves in some time and make
// another attempt with the already read data but pause reading until
// success
// In this approach we delegate the retry and the backpressure onto the
// caller. This might be enough for most cases.
try {
// busyLooping retries on FAIL_NON_SERIALIZED (concurrent tryEmitNext from
// another thread) instead of failing immediately. The contention window is
// microseconds (single CAS), so the spin resolves almost instantly; the
// duration is just a generous upper bound for pathological cases like GC
// pauses.
this.outboundSink.emitNext(message, Sinks.EmitFailureHandler.busyLooping(Duration.ofMillis(100)));
return Mono.empty();
}
else {
return Mono.error(new RuntimeException("Failed to enqueue message"));
catch (Sinks.EmissionException e) {
return Mono.error(new RuntimeException("Failed to enqueue message", e));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2024-2024 the original author or authors.
*/

package io.modelcontextprotocol.client.transport;

import io.modelcontextprotocol.spec.McpSchema;
import io.modelcontextprotocol.spec.json.gson.GsonMcpJsonMapper;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.RepeatedTest;
import reactor.core.publisher.Mono;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Reproduces a race condition in StdioClientTransport.sendMessage() when two threads call
* it concurrently on the same transport instance.
*
* <p>
* The outbound sink (Sinks.many().unicast()) is wrapped by Reactor's SinkManySerialized,
* which uses a CAS-based guard. When two threads call tryEmitNext concurrently, the CAS
* loser immediately gets FAIL_NON_SERIALIZED, causing "Failed to enqueue message".
*
* <p>
* This occurs when an MCP server proxies concurrent tool calls to a downstream MCP server
* via stdio transport — each call is dispatched on a separate thread and both call
* sendMessage() on the same transport.
*
* @see <a href="https://github.com/modelcontextprotocol/java-sdk/issues/875">Issue
* #875</a>
*/
class StdioClientTransportConcurrencyTest {

private StdioClientTransport transport;

@AfterEach
void tearDown() {
if (transport != null) {
transport.closeGracefully().block(Duration.ofSeconds(5));
}
}

@RepeatedTest(20)
void concurrent_sendMessage_should_not_fail() throws Exception {
var serverParams = ServerParameters.builder("cat").env(Map.of()).build();
transport = new StdioClientTransport(serverParams, new GsonMcpJsonMapper());

transport.connect(mono -> mono.flatMap(msg -> Mono.empty())).block(Duration.ofSeconds(5));

var msg1 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "1",
Map.of("name", "tool_a", "arguments", Map.of()));
var msg2 = new McpSchema.JSONRPCRequest("2.0", "tools/call", "2",
Map.of("name", "tool_b", "arguments", Map.of()));

var barrier = new CyclicBarrier(2);
var errors = new CopyOnWriteArrayList<Throwable>();
var latch = new CountDownLatch(2);

for (var msg : new McpSchema.JSONRPCMessage[] { msg1, msg2 }) {
new Thread(() -> {
try {
barrier.await(2, TimeUnit.SECONDS);
transport.sendMessage(msg).block(Duration.ofSeconds(2));
}
catch (Exception e) {
errors.add(e);
}
finally {
latch.countDown();
}
}).start();
}

latch.await(5, TimeUnit.SECONDS);

assertThat(errors)
.as("Concurrent sendMessage calls should both succeed, but the unicast sink "
+ "rejects one with FAIL_NON_SERIALIZED when two threads race on tryEmitNext")
.isEmpty();
}

}