Skip to content

Commit 87f7ebf

Browse files
authored
Merge pull request #5 from eigr-labs/dev/venkatesh
Actor Registration protobuf updated
2 parents 61a1b12 + 753dcd3 commit 87f7ebf

5 files changed

Lines changed: 2159 additions & 664 deletions

File tree

protobuf/eigr/actor.proto

Lines changed: 74 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,28 +25,95 @@ message ActorSnapshotStrategy {
2525
}
2626

2727
// A strategy which a user function's entity is passivated.
28-
message ActorDeactivateStrategy {
28+
message ActorDeactivationStrategy {
2929
oneof strategy {
3030
// the timeout strategy.
3131
TimeoutStrategy timeout = 1;
3232
}
3333
}
3434

35-
// A strategy based on a timeout.
35+
// A strategy based on a timeout.
3636
message TimeoutStrategy {
3737
// The timeout in millis
3838
int64 timeout = 1;
3939
}
4040

41+
// A command represents an action that the user can perform on an Actor.
42+
// Commands in supporting languages are represented by functions or methods.
43+
// An Actor command has nothing to do with the semantics of Commands in a CQRS/EventSourced system.
44+
// It just represents an action that supporting languages can invoke.
45+
message Command {
46+
47+
// The name of the function or method in the supporting language that has been registered in Ator.
48+
string name = 1;
49+
}
50+
51+
// A FixedTimerCommand is similar to a regular Command, its main differences are that it is scheduled to run at regular intervals
52+
// and only takes the actor's state as an argument.
53+
// Timer Commands are good for executing loops that manipulate the actor's own state.
54+
// In Elixir or other languages in BEAM it would be similar to invoking Process.send_after(self(), atom, msg, timeout)
55+
message FixedTimerCommand {
56+
57+
// The time to wait until the command is triggered
58+
int32 seconds = 1;
59+
60+
// See Command description Above
61+
Command command = 2;
62+
}
63+
4164
message ActorState {
4265
map<string, string> tags = 1;
4366
google.protobuf.Any state = 2;
4467
}
4568

46-
message Actor {
47-
string name = 1;
69+
// TODO doc here
70+
message Metadata {
71+
// A channel group represents a way to send commands to various actors
72+
// that belong to a certain semantic group.
73+
string channel_group = 1;
74+
75+
map<string, string> tags = 2;
76+
}
77+
78+
message ActorSettings {
79+
80+
// Indicates if actor´s is abstract or non abstract.
81+
bool abstract = 1;
82+
83+
// Indicates whether an actor's state should be persisted in a definitive store.
4884
bool persistent = 2;
49-
ActorState state = 3;
50-
ActorSnapshotStrategy snapshot_strategy = 4;
51-
ActorDeactivateStrategy deactivate_strategy = 5;
85+
86+
// Snapshot strategy
87+
ActorSnapshotStrategy snapshot_strategy = 3;
88+
89+
// Deactivate strategy
90+
ActorDeactivationStrategy deactivation_strategy = 4;
91+
}
92+
93+
message ActorId {
94+
// The name of a Actor Entity.
95+
string name = 1;
96+
97+
// Name of a ActorSystem
98+
string system = 2;
99+
}
100+
101+
message Actor {
102+
// Actor Identification
103+
ActorId id = 1;
104+
105+
// A Actor state.
106+
ActorState state = 2;
107+
108+
// Actor metadata
109+
Metadata metadata = 6;
110+
111+
// Actor settings.
112+
ActorSettings settings = 3;
113+
114+
// The commands registered for an actor
115+
repeated Command commands = 4;
116+
117+
// The registered timer commands for an actor.
118+
repeated FixedTimerCommand timer_commands = 5;
52119
}

protobuf/eigr/protocol.proto

Lines changed: 87 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,14 @@ import "google/protobuf/any.proto";
104104
option java_package = "io.eigr.functions.protocol";
105105
option go_package = "github.com/eigr/go-support/eigr/protocol;protocol";
106106

107+
message SpawnRequest {
108+
eigr.actors.ActorSystem actor_system = 2;
109+
}
110+
111+
message SpawnResponse {
112+
RequestStatus status = 1;
113+
}
114+
107115
message RegistrationRequest {
108116

109117
ServiceInfo service_info = 1;
@@ -144,7 +152,7 @@ message ProxyInfo {
144152
int32 protocol_minor_version = 2;
145153

146154
string proxy_name = 3;
147-
155+
148156
string proxy_version = 4;
149157
}
150158

@@ -155,29 +163,91 @@ message RegistrationResponse {
155163
ProxyInfo proxy_info = 2;
156164
}
157165

158-
// Context is where current and/or updated state is stored
166+
// Context is where current and/or updated state is stored
159167
// to be transmitted to/from proxy and user function
160168
//
161169
// Params:
162-
// * state: Actor state passed back and forth between proxy and user function.
170+
// * state: Actor state passed back and forth between proxy and user function.
163171
message Context {
164172

165173
google.protobuf.Any state = 1;
166174
}
167175

176+
// When a Host Function is invoked it returns the updated state and return value to the call.
177+
// It can also return a number of side effects to other Actors as a result of its computation.
178+
// These side effects will be forwarded to the respective Actors asynchronously and should not affect the Host Function's response to its caller.
179+
// Internally side effects is just a special kind of InvocationRequest.
180+
// Useful for handle handle `recipient list` and `Composed Message Processor` patterns:
181+
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/RecipientList.html
182+
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/DistributionAggregate.html
183+
message SideEffect {
184+
InvocationRequest request = 1;
185+
}
186+
187+
// Broadcast a message to many Actors
188+
// Useful for handle `recipient list`, `publish-subscribe channel`, and `scatter-gatther` patterns:
189+
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/RecipientList.html
190+
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/PublishSubscribeChannel.html
191+
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/BroadcastAggregate.html
192+
message Broadcast {
193+
// Channel of target Actors
194+
string channel_group = 1;
195+
196+
// Command. Only Actors that have this command will run successfully
197+
string command_name = 2;
198+
199+
// Payload
200+
google.protobuf.Any value = 3;
201+
}
202+
203+
// Sends the output of a command of an Actor to the input of another command of an Actor
204+
// Useful for handle `pipes` pattern:
205+
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/PipesAndFilters.html
206+
message Pipe {
207+
// Target Actor
208+
string actor = 1;
209+
210+
// Command.
211+
string command_name = 2;
212+
}
213+
214+
// Sends the input of a command of an Actor to the input of another command of an Actor
215+
// Useful for handle `content-basead router` pattern
216+
// https://www.enterpriseintegrationpatterns.com/patterns/messaging/ContentBasedRouter.html
217+
message Forward {
218+
// Target Actor
219+
string actor = 1;
220+
221+
// Command.
222+
string command_name = 2;
223+
}
224+
225+
// Container for archicetural message patterns
226+
message Workflow {
227+
228+
Broadcast broadcast = 2;
229+
230+
repeated SideEffect effects = 1;
231+
232+
oneof routing {
233+
Pipe pipe = 3;
234+
Forward forward = 4;
235+
}
236+
}
237+
168238
// The user function when it wants to send a message to an Actor uses the InvocationRequest message type.
169239
//
170240
// Params:
171241
// * system: See ActorStstem message.
172242
// * actor: The target Actor, i.e. the one that the user function is calling to perform some computation.
173-
// * command_name: The function or method on the target Actor that will receive this request
243+
// * command_name: The function or method on the target Actor that will receive this request
174244
// and perform some useful computation with the sent data.
175245
// * value: This is the value sent by the user function to be computed by the request's target Actor command.
176-
// * async: Indicates whether the command should be processed synchronously, where a response should be sent back to the user function,
246+
// * async: Indicates whether the command should be processed synchronously, where a response should be sent back to the user function,
177247
// or whether the command should be processed asynchronously, i.e. no response sent to the caller and no waiting.
178248
message InvocationRequest {
179249

180-
eigr.actors.ActorSystem system =1;
250+
eigr.actors.ActorSystem system = 1;
181251

182252
eigr.actors.Actor actor = 2;
183253

@@ -188,16 +258,16 @@ message InvocationRequest {
188258
bool async = 5;
189259
}
190260

191-
// ActorInvocation is a translation message between a local invocation made via InvocationRequest
261+
// ActorInvocation is a translation message between a local invocation made via InvocationRequest
192262
// and the real Actor that intends to respond to this invocation and that can be located anywhere in the cluster.
193263
//
194264
// Params:
195265
// actor_name: The name of the Actor handling the InvocationRequest request, also called the target Actor.
196266
// actor_system: The name of ActorSystem registered in Registration step.
197-
// command_name: The function or method on the target Actor that will receive this request
267+
// command_name: The function or method on the target Actor that will receive this request
198268
// and perform some useful computation with the sent data.
199-
// current_context: The current Context with current state value of the target Actor.
200-
// That is, the same as found via matching in %Actor{name: target_actor, state: %ActorState{state: value} = actor_state}.
269+
// current_context: The current Context with current state value of the target Actor.
270+
// That is, the same as found via matching in %Actor{name: target_actor, state: %ActorState{state: value} = actor_state}.
201271
// In this case, the Context type will contain in the value attribute the same `value` as the matching above.
202272
// value: The value to be passed to the function or method corresponding to command_name.
203273
message ActorInvocation {
@@ -219,7 +289,7 @@ message ActorInvocation {
219289
// actor_name: The name of the Actor handling the InvocationRequest request, also called the target Actor.
220290
// actor_system: The name of ActorSystem registered in Registration step.
221291
// updated_context: The Context with updated state value of the target Actor after user function has processed a request.
222-
// value: The value that the original request proxy will forward in response to the InvocationRequest type request.
292+
// value: The value that the original request proxy will forward in response to the InvocationRequest type request.
223293
// This is the final response from the point of view of the user who invoked the Actor call and its subsequent processing.
224294
message ActorInvocationResponse {
225295

@@ -230,13 +300,15 @@ message ActorInvocationResponse {
230300
Context updated_context = 3;
231301

232302
google.protobuf.Any value = 4;
303+
304+
Workflow workflow = 5;
233305
}
234306

235307
// InvocationResponse is the response that the proxy that received the InvocationRequest request will forward to the request's original user function.
236308
//
237309
// Params:
238310
// status: Status of request. Could be one of [UNKNOWN, OK, ACTOR_NOT_FOUND, ERROR].
239-
// sytem: The original ActorSystem of the InvocationRequest request.
311+
// system: The original ActorSystem of the InvocationRequest request.
240312
// actor: The target Actor originally sent in the InvocationRequest message.
241313
// value: The value resulting from the request processing that the target Actor made.
242314
// This value must be passed by the user function to the one who requested the initial request in InvocationRequest.
@@ -267,4 +339,6 @@ message RequestStatus {
267339
Status status = 1;
268340

269341
string message = 2;
270-
}
342+
}
343+
344+

spawn/controller.py

Lines changed: 38 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,14 @@
44
"""
55
from spawn.eigr.actor_pb2 import (
66
Actor,
7-
ActorDeactivateStrategy,
8-
ActorSnapshotStrategy,
7+
ActorId,
98
ActorState,
9+
Metadata,
10+
ActorSettings,
11+
Command,
12+
FixedTimerCommand,
13+
ActorSnapshotStrategy,
14+
ActorDeactivationStrategy,
1015
ActorSystem,
1116
Registry,
1217
TimeoutStrategy,
@@ -59,18 +64,43 @@ def register(self, actors: List[ActorEntity]):
5964

6065
actor_state = ActorState()
6166

62-
deactivate_strategy = ActorDeactivateStrategy()
67+
deactivate_strategy = ActorDeactivationStrategy()
6368
deactivate_strategy.timeout.CopyFrom(deactivate_timeout_strategy)
6469

6570
snaphot_strategy = ActorSnapshotStrategy()
6671
snaphot_strategy.timeout.CopyFrom(snaphot_timeout_strategy)
6772

6873
actor_01 = Actor()
69-
actor_01.name = "user_actor_01"
70-
actor_01.persistent = True
74+
75+
actor_id = ActorId()
76+
actor_id.name = "user_actor_01"
77+
actor_id.system = "spawn-system"
78+
79+
actor_01.id.CopyFrom(actor_id)
80+
7181
actor_01.state.CopyFrom(actor_state)
72-
actor_01.deactivate_strategy.CopyFrom(deactivate_strategy)
73-
actor_01.snapshot_strategy.CopyFrom(snaphot_strategy)
82+
83+
actor_metatdata = Metadata()
84+
actor_metatdata.channel_group = "spawn-python"
85+
actor_metatdata.tags["actor"] = "user_actor_01"
86+
87+
actor_01.metadata.CopyFrom(actor_metatdata)
88+
89+
actor_settings = ActorSettings()
90+
actor_settings.abstract = True
91+
actor_settings.persistent = True
92+
actor_settings.snapshot_strategy.CopyFrom(snaphot_strategy)
93+
actor_settings.deactivation_strategy.CopyFrom(deactivate_strategy)
94+
95+
actor_01.settings.CopyFrom(actor_settings)
96+
97+
actor_command = actor_01.commands.add()
98+
actor_command.name = ""
99+
100+
actor_fixed_timer_command = actor_01.timer_commands.add()
101+
102+
actor_fixed_timer_command.seconds = 1
103+
actor_fixed_timer_command.command.CopyFrom(actor_command)
74104

75105
registry = Registry()
76106
registry.actors.get_or_create("user_actor_01").CopyFrom(actor_01)
@@ -110,4 +140,4 @@ def register(self, actors: List[ActorEntity]):
110140
logging.info("Actors register response %s", resp)
111141
except Exception as e:
112142
logging.error("ERROR: %s", e)
113-
logging.error("Shit %s", e.__cause__)
143+
logging.error("Shit %s", e.__cause__)

0 commit comments

Comments
 (0)