[FEATURE] WebSocket-based Concurrency Architecture#239
Conversation
…erver capabilities - Introduced WebSocketEnvClient for persistent sessions with multi-step interactions. - Updated HTTPEnvServer to support WebSocket connections and manage multiple concurrent environments. - Added WebSocket message types and responses for better communication. - Enhanced Environment interface with concurrency safety attributes.
|
@burtenshaw draft PR for the ws and concurrency. I have merged the #238 into this as well. Few notes, before #232 gets merged:
|
|
Amazing work @rycerzes . Thanks
I'll integrate this in a new PR for you to merge here.
I think we can leave this for a subsequent PR. Also, this env might be useful to you. It's basically just a benchmarking env that let's you test concurrency asynchronously like this. |
|
@rycerzes could you help me to understand this please:
What do you mean by old structure? afaik #232 |
|
Thanks for the clarification! You're absolutely right - I need to correct my earlier comment.
I must have run I just verified this by running
Thanks! That benchmark env would be perfect for testing the concurrency implementation. I'll take a look at it. |
Wauplin
left a comment
There was a problem hiding this comment.
Thanks for working on this very important piece @rycerzes! I've left quite some comments on how I would do things but some parts are left to the maintainers' decisions 🤗 Especially:
- should we allow "instantiate a server by passing an env instead of an env factory" to keep backward compatibility? => I would say "no" since project is still in early phase
- should we maintain both a "HTTP-based interface" and a "websocket-based interface"? => same, I would say "no" at it means doubling the amount of work (2 paths in the http server and 2 very similar clients to maintain with same interface with different internal logic). Better to keep only 1 interface that is more robust for the future. End users should not be impacted by this decision (except for the breaking change to adapt).
Apart from that, I usually tend to advice to simplify logic by not adding too many optional features at first. More options usually means more internal logic and more maintenance burden on the long run. So if something is not explicitly required, let's keep it for later.
Note that I haven't run the code myself. Will give it a try soon!
|
@pankit-eng @zkwentz Can you validate these two backward compatibility points from @Wauplin on this PR . In short, should we go all in on websockets or maintain a http implementation?
Server side app will look like this: # Factory mode for concurrent sessions
app = create_app(
env=MyEnvironment, # Pass class, not instance
max_concurrent_envs=4
)
iiuc, it client code will only look like this: from envs.echo_env import EchoEnv, EchoAction
client = EchoEnv(base_url="ws://localhost:8000/ws")
result = await client.reset()
result = await client.step(EchoAction(...)) |
|
@rycerzes I tested out this branch and it worked well. I updated the PR description myself with a high-level before and after snippet and some benchmarking info. |
|
Thanks @Wauplin for the detailed review! Really appreciate all the feedback - the suggestions on simplifying the message types with discriminators, refactoring the capacity status, and cleaning up the validation logic make a lot of sense. I'll work through these and have them resolved by end of Friday. @burtenshaw Thanks for testing the branch and updating the PR description with the benchmarking info! |
|
Implemented the changes according to @Wauplin's extensive code review comments. Summary of recent changes:
Still waiting on feedback from @pankit-eng and @zkwentz regarding the backward compatibility points (HTTP vs WS exclusivity), but otherwise, the implementation is complete. |
|
Great work @rycerzes . I'll review tomorrow. could we update the status 'Ready for Review ' please. |
burtenshaw
left a comment
There was a problem hiding this comment.
Great work @rycerzes. From me this is functionally complete and ready to merge. I tested the implementation on a series of benchmark envs and pushed to the hub.
@pankit-eng will have the final say on deprecating http, and you can merge this PR to do that.
|
I asked Opus to do a review of this PR and this is what it came up with: Pull Request #239 ReviewSummaryThis PR adds WebSocket-based concurrency support to OpenEnv, enabling persistent sessions with configurable concurrency limits. The core architecture is well-designed with proper session isolation, factory pattern enforcement, and clean error handling. However, there are critical issues that need to be addressed before merge. Overall Assessment: Functionally strong architecture with critical bugs that will break existing environments. Critical Issues1. All Existing Environments Pass Instances Instead of ClassesSeverity: CRITICAL - Will cause TypeError at startup for all existing environments The new
Fix Required: Update all environment For environments with constructor args (chat_env, browsergym_env), create factory functions: # chat_env example
def create_chat_environment():
tokenizer = get_tokenizer()
return ChatEnvironment(tokenizer=tokenizer, system_prompt=system_prompt)
app = create_app(create_chat_environment, ChatAction, ChatObservation, env_name="chat_env")2. Duplicate
|
| File | Status | Notes |
|---|---|---|
src/openenv/core/env_server/http_server.py |
Good | Clean architecture, but HTTP handlers create new env per request |
src/openenv/core/env_server/types.py |
Good | Well-typed WebSocket message protocol |
src/openenv/core/env_server/interfaces.py |
Good | Clean SUPPORTS_CONCURRENT_SESSIONS flag |
src/openenv/core/env_server/exceptions.py |
Good | Rich exception context |
src/openenv/core/env_server/web_interface.py |
Broken | /ws endpoint conflict |
src/openenv/core/ws_env_client.py |
Good | Clean WebSocket client with context manager |
CLI Templates
| File | Status | Notes |
|---|---|---|
src/openenv/cli/templates/openenv_env/server/app.py |
Good | Correctly uses class, has max_concurrent_envs=1 |
src/openenv/cli/templates/openenv_env/client.py |
Good | Includes both HTTP and WS clients |
Existing Environments
| Environment | Status | Issue |
|---|---|---|
| echo_env | Broken | Passes instance, missing WS client |
| coding_env | Broken | Passes instance |
| chat_env | Broken | Passes instance |
| browsergym_env | Broken | Passes instance |
| atari_env | Broken | Passes instance |
| dipg_safety_env | Broken | Passes instance |
| git_env | Broken | Passes instance |
| openspiel_env | Broken | Passes instance |
| textarena_env | Broken | Passes instance |
Action Items
- CRITICAL: Fix all environment
app.pyfiles to pass classes/factories instead of instances - CRITICAL: Fix
/wsendpoint conflict inweb_interface.py(use/ws/uior similar) - Add
EchoEnvWSWebSocket client to echo_env as reference implementation - Clarify HTTP behavior: intentionally stateless or should maintain session?
- Consider adding integration tests for WebSocket concurrency
- Update documentation to explain factory pattern requirement
Review Decision
REQUEST_CHANGES - The critical issues will break all existing environments and the WebSocket concurrency feature when the web interface is enabled. These must be fixed before merge.
Darktex
left a comment
There was a problem hiding this comment.
Left comments inline and verified the issues that Claude found. Let's please fix them!
I have also reviewed the PR myself (I swear I'm not only a Claude reseller! Lmao), and things look otherwise good. I'm generally allergic to factory patterns but I agree that we need them here.
@rycerzes what is the plan to migrate the existing envs and examples? (we can also try to give this task to copilot, should be very straightforward...).
| def __init__( | ||
| self, | ||
| env: Environment, | ||
| env: Callable[[], Environment], |
There was a problem hiding this comment.
This should be env_factory. I think the more pythonic way of doing factories is arguably passing the class and the args and kwargs, no?
so like this?
app = create_app(
env_class=ChatEnvironment,
env_kwargs={"tokenizer": tokenizer, "system_prompt": prompt},
...
)|
|
||
| Args: | ||
| env: The Environment instance to wrap | ||
| env: Environment factory (callable) that creates new instances. |
There was a problem hiding this comment.
I think that the pattern of passing env_cls also works better given that we are already passing action_cls and observation_cls so it resonates more imho
There was a problem hiding this comment.
Claude is right, this now becomes a problem
There was a problem hiding this comment.
I think this should be /web/ws
Thank you for the review @Darktex, for now, I had planned to migrate the existing envs and examples manually but I think we could try using copilot for this as well. |
Darktex
left a comment
There was a problem hiding this comment.
Let's land it, but let's at least make the change to the env arg in HTTPServer to env_factory to reduce confusion
Add WebSocket support with concurrent session management
Adds WebSocket endpoints for persistent environment sessions with configurable concurrency limits #194
High-level Diff
These are the results on the server side:
On the client side, it requires a change of URL scheme:
This leads to high concurrency with limited resources:
Changes
/wswith message protocol for reset/step/state/closeConcurrencyConfigfor setting max concurrent sessions and session timeout.SUPPORTS_CONCURRENT_SESSIONSflag on environments (defaults toFalse) with startup validation.WebSocketEnvClientfor persistent connections.API
New types:
ConcurrencyConfig(max_concurrent_envs, session_timeout)SessionInfoandServerCapacityStatusfor session metadata.WSIncomingMessage(discriminated union ofWSResetMessage,WSStepMessage,WSStateMessage,WSCloseMessage).WSObservationResponse,WSStateResponse,WSErrorResponse.Usage:
Defaults to
max_concurrent_envs=1for backward compatibility. Environments must setSUPPORTS_CONCURRENT_SESSIONS=Trueto allow higher concurrency.TODO
openenv initneeds the WebSocket code integrated into the templateFuture Work (Separate PRs)
SUPPORTS_CONCURRENT_SESSIONS=True