Skip to content

Commit 2867745

Browse files
committed
Merge pull request #213 from localshred/abrandoned/zmq_queue_work
allow queue size to be configurable and use an array as local queue as t...
2 parents 753b677 + 492f950 commit 2867745

2 files changed

Lines changed: 11 additions & 5 deletions

File tree

lib/protobuf/rpc/connectors/zmq.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,6 +123,8 @@ def lookup_server_uri
123123
def host_alive?(host)
124124
return true unless ping_port_enabled?
125125
socket = TCPSocket.new(host, ping_port.to_i)
126+
socket.setsockopt(::Socket::IPPROTO_TCP, ::Socket::TCP_NODELAY, 1)
127+
socket.setsockopt(::Socket::SOL_SOCKET, ::Socket::SO_LINGER, [1,0].pack('ii'))
126128

127129
true
128130
rescue

lib/protobuf/rpc/servers/zmq/broker.rb

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ def init_frontend_socket
6767
end
6868

6969
def init_local_queue
70-
@local_queue = ::Queue.new
70+
@local_queue = []
7171
end
7272

7373
def init_poller
@@ -88,6 +88,10 @@ def inproc?
8888
!!@server.try(:inproc?)
8989
end
9090

91+
def local_queue_max_size
92+
@local_queue_max_size ||= [ENV["PB_ZMQ_SERVER_QUEUE_MAX_SIZE"].to_i, 5].max
93+
end
94+
9195
def process_backend
9296
worker, ignore, *frames = read_from_backend
9397

@@ -102,16 +106,16 @@ def process_frontend
102106
address, _, message, *frames = read_from_frontend
103107

104108
if message == ::Protobuf::Rpc::Zmq::CHECK_AVAILABLE_MESSAGE
105-
if @idle_workers.any? || local_queue.size < 5 # Should make queue a SizedQueue and allow users to configure queue size
109+
if local_queue.size < local_queue_max_size
106110
write_to_frontend([address, "", ::Protobuf::Rpc::Zmq::WORKERS_AVAILABLE])
107111
else
108112
write_to_frontend([address, "", ::Protobuf::Rpc::Zmq::NO_WORKERS_AVAILABLE])
109113
end
110114
else
111-
if @idle_workers.any?
112-
write_to_backend([@idle_workers.shift, ""] + [address, "", message ] + frames)
113-
else
115+
if @idle_workers.empty?
114116
local_queue.push([address, "", message ] + frames)
117+
else
118+
write_to_backend([@idle_workers.shift, ""] + [address, "", message ] + frames)
115119
end
116120
end
117121
end

0 commit comments

Comments
 (0)