Skip to content

Commit 85af5c4

Browse files
authored
Merge pull request #1183 from ably/AIT-98/message-edits-deletes
[AIT-98] feat: realtime edits and deletes
2 parents 882cdbb + 4f2e3e5 commit 85af5c4

11 files changed

Lines changed: 816 additions & 94 deletions

File tree

lib/src/main/java/io/ably/lib/realtime/ChannelBase.java

Lines changed: 120 additions & 69 deletions
Large diffs are not rendered by default.

lib/src/main/java/io/ably/lib/realtime/Presence.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
import io.ably.lib.types.PresenceMessage;
1616
import io.ably.lib.types.PresenceSerializer;
1717
import io.ably.lib.types.ProtocolMessage;
18+
import io.ably.lib.types.PublishResult;
19+
import io.ably.lib.util.Listeners;
1820
import io.ably.lib.util.Log;
1921
import io.ably.lib.util.StringUtils;
2022

@@ -120,9 +122,9 @@ public synchronized PresenceMessage[] get(String clientId, boolean wait) throws
120122
return get(new Param(GET_WAITFORSYNC, String.valueOf(wait)), new Param(GET_CLIENTID, clientId));
121123
}
122124

123-
void addPendingPresence(PresenceMessage presenceMessage, CompletionListener listener) {
125+
void addPendingPresence(PresenceMessage presenceMessage, Callback<PublishResult> listener) {
124126
synchronized(channel) {
125-
final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage,listener);
127+
final QueuedPresence queuedPresence = new QueuedPresence(presenceMessage, Listeners.unwrap(listener));
126128
pendingPresence.add(queuedPresence);
127129
}
128130
}
@@ -763,7 +765,7 @@ public void updatePresence(PresenceMessage msg, CompletionListener listener) thr
763765
ProtocolMessage message = new ProtocolMessage(ProtocolMessage.Action.presence, channel.name);
764766
message.presence = new PresenceMessage[] { msg };
765767
ConnectionManager connectionManager = ably.connection.connectionManager;
766-
connectionManager.send(message, ably.options.queueMessages, listener);
768+
connectionManager.send(message, ably.options.queueMessages, Listeners.fromCompletionListener(listener));
767769
break;
768770
default:
769771
throw AblyException.fromErrorInfo(new ErrorInfo("Unable to enter presence channel in detached or failed state", 400, 91001));
@@ -892,7 +894,7 @@ private void sendQueuedMessages() {
892894
pendingPresence.clear();
893895

894896
try {
895-
connectionManager.send(message, queueMessages, listener);
897+
connectionManager.send(message, queueMessages, Listeners.fromCompletionListener(listener));
896898
} catch(AblyException e) {
897899
Log.e(TAG, "sendQueuedMessages(): Unexpected exception sending message", e);
898900
if(listener != null)

lib/src/main/java/io/ably/lib/transport/ConnectionManager.java

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -28,15 +28,18 @@
2828
import io.ably.lib.transport.ITransport.TransportParams;
2929
import io.ably.lib.transport.NetworkConnectivity.NetworkConnectivityListener;
3030
import io.ably.lib.types.AblyException;
31+
import io.ably.lib.types.Callback;
3132
import io.ably.lib.types.ClientOptions;
3233
import io.ably.lib.types.ConnectionDetails;
3334
import io.ably.lib.types.ErrorInfo;
3435
import io.ably.lib.types.Param;
3536
import io.ably.lib.types.ProtocolMessage;
3637
import io.ably.lib.types.ProtocolSerializer;
38+
import io.ably.lib.types.PublishResult;
3739
import io.ably.lib.util.Log;
3840
import io.ably.lib.util.PlatformAgentProvider;
3941
import io.ably.lib.util.ReconnectionStrategy;
42+
import org.jetbrains.annotations.Nullable;
4043

4144
public class ConnectionManager implements ConnectListener {
4245
final ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
@@ -1403,7 +1406,7 @@ private synchronized void onError(ProtocolMessage message) {
14031406
}
14041407

14051408
private void onAck(ProtocolMessage message) {
1406-
pendingMessages.ack(message.msgSerial, message.count, message.error);
1409+
pendingMessages.ack(message.msgSerial, message.count, message.res, message.error);
14071410
}
14081411

14091412
private void onNack(ProtocolMessage message) {
@@ -1724,14 +1727,14 @@ protected void setLastActivity(long lastActivityTime) {
17241727

17251728
public static class QueuedMessage {
17261729
public final ProtocolMessage msg;
1727-
public final CompletionListener listener;
1728-
public QueuedMessage(ProtocolMessage msg, CompletionListener listener) {
1730+
public final Callback<PublishResult> listener;
1731+
public QueuedMessage(ProtocolMessage msg, Callback<PublishResult> listener) {
17291732
this.msg = msg;
17301733
this.listener = listener;
17311734
}
17321735
}
17331736

1734-
public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener listener) throws AblyException {
1737+
public void send(ProtocolMessage msg, boolean queueEvents, Callback<PublishResult> listener) throws AblyException {
17351738
State state;
17361739
synchronized(this) {
17371740
state = this.currentState;
@@ -1747,7 +1750,7 @@ public void send(ProtocolMessage msg, boolean queueEvents, CompletionListener li
17471750
throw AblyException.fromErrorInfo(state.defaultErrorInfo);
17481751
}
17491752

1750-
private void sendImpl(ProtocolMessage message, CompletionListener listener) throws AblyException {
1753+
private void sendImpl(ProtocolMessage message, Callback<PublishResult> listener) throws AblyException {
17511754
if(transport == null) {
17521755
Log.v(TAG, "sendImpl(): Discarding message; transport unavailable");
17531756
return;
@@ -1825,7 +1828,7 @@ public synchronized void push(QueuedMessage msg) {
18251828
queue.add(msg);
18261829
}
18271830

1828-
public void ack(long msgSerial, int count, ErrorInfo reason) {
1831+
public void ack(long msgSerial, int count, @Nullable PublishResult[] results, ErrorInfo reason) {
18291832
QueuedMessage[] ackMessages = null, nackMessages = null;
18301833
synchronized(this) {
18311834
if (queue.isEmpty()) return;
@@ -1867,11 +1870,14 @@ public void ack(long msgSerial, int count, ErrorInfo reason) {
18671870
}
18681871
}
18691872
if(ackMessages != null) {
1870-
for(QueuedMessage msg : ackMessages) {
1873+
for (int i = 0; i < ackMessages.length; i++) {
1874+
QueuedMessage msg = ackMessages[i];
18711875
try {
1872-
if(msg.listener != null)
1873-
msg.listener.onSuccess();
1874-
} catch(Throwable t) {
1876+
if (msg.listener != null) {
1877+
PublishResult messageResult = results != null && results.length > i ? results[i] : null;
1878+
msg.listener.onSuccess(messageResult);
1879+
}
1880+
} catch (Throwable t) {
18751881
Log.e(TAG, "ack(): listener exception", t);
18761882
}
18771883
}

lib/src/main/java/io/ably/lib/types/ProtocolMessage.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,8 @@ public ProtocolMessage(Action action, String channel) {
137137
@JsonAdapter(ObjectsJsonSerializer.class)
138138
public Object[] state;
139139

140+
public @Nullable PublishResult[] res;
141+
140142
public boolean hasFlag(final Flag flag) {
141143
return (flags & flag.getMask()) == flag.getMask();
142144
}
@@ -161,6 +163,7 @@ void writeMsgpack(MessagePacker packer) throws IOException {
161163
if(channelSerial != null) ++fieldCount;
162164
if(annotations != null) ++fieldCount;
163165
if(state != null && ObjectsHelper.getSerializer() != null) ++fieldCount;
166+
if(res != null) ++fieldCount;
164167
packer.packMapHeader(fieldCount);
165168
packer.packString("action");
166169
packer.packInt(action.getValue());
@@ -209,6 +212,10 @@ void writeMsgpack(MessagePacker packer) throws IOException {
209212
Log.w(TAG, "Skipping 'state' field msgpack serialization because ObjectsSerializer not found");
210213
}
211214
}
215+
if (res != null) {
216+
packer.packString("res");
217+
PublishResult.writeMsgpackArray(res, packer);
218+
}
212219
}
213220

214221
ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException {
@@ -280,6 +287,9 @@ ProtocolMessage readMsgpack(MessageUnpacker unpacker) throws IOException {
280287
unpacker.skipValue();
281288
}
282289
break;
290+
case "res":
291+
res = PublishResult.readMsgpackArray(unpacker);
292+
break;
283293
default:
284294
Log.v(TAG, "Unexpected field: " + fieldName);
285295
unpacker.skipValue();
Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
package io.ably.lib.types;
2+
3+
import io.ably.lib.http.HttpCore;
4+
import io.ably.lib.util.Serialisation;
5+
import org.jetbrains.annotations.Nullable;
6+
import org.msgpack.core.MessageFormat;
7+
import org.msgpack.core.MessagePacker;
8+
import org.msgpack.core.MessageUnpacker;
9+
10+
import java.io.IOException;
11+
12+
/**
13+
* Contains the result of a publish operation.
14+
*/
15+
public class PublishResult {
16+
17+
private static final String SERIALS = "serials";
18+
19+
/**
20+
* An array of message serials corresponding 1:1 to the messages that were published.
21+
* A serial may be null if the message was discarded due to a configured conflation rule.
22+
*/
23+
public final @Nullable String[] serials;
24+
25+
public PublishResult(@Nullable String[] serials) {
26+
this.serials = serials;
27+
}
28+
29+
private static PublishResult readFromJson(byte[] packed) throws MessageDecodeException {
30+
return Serialisation.gson.fromJson(new String(packed), PublishResult.class);
31+
}
32+
33+
private static PublishResult readMsgpack(byte[] packed) throws AblyException {
34+
try {
35+
MessageUnpacker unpacker = Serialisation.msgpackUnpackerConfig.newUnpacker(packed);
36+
return readMsgpack(unpacker);
37+
} catch (IOException ioe) {
38+
throw AblyException.fromThrowable(ioe);
39+
}
40+
}
41+
42+
private static PublishResult readMsgpack(MessageUnpacker unpacker) throws IOException {
43+
int fieldCount = unpacker.unpackMapHeader();
44+
for (int i = 0; i < fieldCount; i++) {
45+
String fieldName = unpacker.unpackString();
46+
MessageFormat fieldFormat = unpacker.getNextFormat();
47+
if (fieldFormat.equals(MessageFormat.NIL)) {
48+
unpacker.unpackNil();
49+
continue;
50+
}
51+
52+
if (fieldName.equals(SERIALS)) {
53+
int count = unpacker.unpackArrayHeader();
54+
String[] serials = new String[count];
55+
for (int j = 0; j < count; j++) {
56+
if (unpacker.getNextFormat().equals(MessageFormat.NIL)) {
57+
unpacker.unpackNil();
58+
serials[j] = null;
59+
} else {
60+
serials[j] = unpacker.unpackString();
61+
}
62+
}
63+
return new PublishResult(serials);
64+
} else {
65+
unpacker.skipValue();
66+
}
67+
}
68+
return new PublishResult(new String[]{});
69+
}
70+
71+
static void writeMsgpackArray(PublishResult[] results, MessagePacker packer) {
72+
try {
73+
int count = results.length;
74+
packer.packArrayHeader(count);
75+
for (PublishResult result : results) {
76+
if (result != null) {
77+
result.writeMsgpack(packer);
78+
} else {
79+
packer.packNil();
80+
}
81+
}
82+
} catch (IOException e) {
83+
throw new RuntimeException(e.getMessage(), e);
84+
}
85+
}
86+
87+
static PublishResult[] readMsgpackArray(MessageUnpacker unpacker) throws IOException {
88+
int count = unpacker.unpackArrayHeader();
89+
PublishResult[] results = new PublishResult[count];
90+
for (int i = 0; i < count; i++) {
91+
results[i] = readMsgpack(unpacker);
92+
}
93+
return results;
94+
}
95+
96+
public static HttpCore.BodyHandler<String> getBodyHandler() {
97+
return new PublishResultBodyHandler();
98+
}
99+
100+
private void writeMsgpack(MessagePacker packer) throws IOException {
101+
int fieldCount = 0;
102+
if (serials != null) ++fieldCount;
103+
packer.packMapHeader(fieldCount);
104+
if (serials != null) {
105+
packer.packString(SERIALS);
106+
packer.packArrayHeader(serials.length);
107+
for (String serial : serials) {
108+
if (serial == null) {
109+
packer.packNil();
110+
} else {
111+
packer.packString(serial);
112+
}
113+
}
114+
}
115+
}
116+
117+
private static class PublishResultBodyHandler implements HttpCore.BodyHandler<String> {
118+
119+
@Override
120+
public String[] handleResponseBody(String contentType, byte[] body) throws AblyException {
121+
try {
122+
PublishResult publishResult = null;
123+
if ("application/json".equals(contentType))
124+
publishResult = readFromJson(body);
125+
else if ("application/x-msgpack".equals(contentType))
126+
publishResult = readMsgpack(body);
127+
return publishResult != null ? publishResult.serials : new String[]{};
128+
} catch (MessageDecodeException e) {
129+
throw AblyException.fromThrowable(e);
130+
}
131+
}
132+
}
133+
}
134+
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.ably.lib.util;
2+
3+
import io.ably.lib.realtime.CompletionListener;
4+
import io.ably.lib.types.Callback;
5+
import io.ably.lib.types.ErrorInfo;
6+
import io.ably.lib.types.PublishResult;
7+
import io.ably.lib.types.UpdateDeleteResult;
8+
9+
public class Listeners {
10+
11+
public static <T> Callback<T> fromCompletionListener(CompletionListener listener) {
12+
return new CompletionListenerWrapper<T>(listener);
13+
}
14+
15+
public static Callback<PublishResult> toPublishResultListener(Callback<UpdateDeleteResult> listener) {
16+
return new UpdateResultToPublishAdapter(listener);
17+
}
18+
19+
public static <T> CompletionListener unwrap(Callback<T> listener) {
20+
if (listener instanceof CompletionListenerWrapper) {
21+
return ((CompletionListenerWrapper<T>)listener).listener;
22+
} else {
23+
return null;
24+
}
25+
}
26+
27+
private static class CompletionListenerWrapper<T> implements Callback<T> {
28+
private final CompletionListener listener;
29+
30+
private CompletionListenerWrapper(CompletionListener listener) {
31+
this.listener = listener;
32+
}
33+
34+
@Override
35+
public void onSuccess(T result) {
36+
if (listener != null) {
37+
listener.onSuccess();
38+
}
39+
}
40+
41+
@Override
42+
public void onError(ErrorInfo reason) {
43+
if (listener != null) {
44+
listener.onError(reason);
45+
}
46+
}
47+
}
48+
49+
private static class UpdateResultToPublishAdapter implements Callback<PublishResult> {
50+
private final Callback<UpdateDeleteResult> listener;
51+
52+
private UpdateResultToPublishAdapter(Callback<UpdateDeleteResult> listener) {
53+
this.listener = listener;
54+
}
55+
56+
@Override
57+
public void onSuccess(PublishResult result) {
58+
if (listener != null) {
59+
String serial = result != null && result.serials != null && result.serials.length > 0
60+
? result.serials[0] : null;
61+
listener.onSuccess(new UpdateDeleteResult(serial));
62+
}
63+
}
64+
65+
@Override
66+
public void onError(ErrorInfo reason) {
67+
if (listener != null) {
68+
listener.onError(reason);
69+
}
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)