diff options
author | Matej <matej.perina@pantheon.tech> | 2017-10-23 12:55:11 +0200 |
---|---|---|
committer | Damjan Marion <dmarion.lists@gmail.com> | 2017-10-30 11:06:57 +0000 |
commit | 7008da2667d2876bc58aa5ae57c4a9de48cc756b (patch) | |
tree | b68ab5e0363733d848161279efb9f353a2e40f62 /src/vpp-api/java/jvpp-registry | |
parent | 04b68bdc481f21f864da63dfe36b2f05b64714a8 (diff) |
jvpp: bugfix for deadlock in java future API (VPP-1037)
- message sending method inside synchronization blocks causes
deadlock between sending and receiving part
- breaking atomicity of sending message and putting future with
corresponding id to map needs additional handling by writer and receiver,
regardless which part get access to sync block first will create
new future and second one will complete it and remove from map,
in case of dump calls where control ping reply is required
as confirmation that all information were send, if ping reply is
received before writer put future in map, reader will create
regular control ping future instead and writer needs to made association
between these two futures
Change-Id: Id29a19be7a5319291a5e07cf931080610178f00c
Signed-off-by: Matej <matej.perina@pantheon.tech>
Diffstat (limited to 'src/vpp-api/java/jvpp-registry')
-rw-r--r-- | src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java | 49 | ||||
-rw-r--r-- | src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java | 123 |
2 files changed, 122 insertions, 50 deletions
diff --git a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java index 6e938ae313f..baef14c3865 100644 --- a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java +++ b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java @@ -99,15 +99,24 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback final ControlPingCallback callback = (ControlPingCallback) pluginRegistry.get(clazz.getName()); assertPluginWasRegistered(name, callback); + // controlPing0 is sending function and can go to waiting in case of e. g. full queue + // because of that it cant be in same synchronization block as used by reply handler function + int context = controlPing0(); + if (context < 0) { + throw new VppInvocationException("controlPing", context); + } + synchronized (pingCalls) { - int context = controlPing0(); - if (context < 0) { - throw new VppInvocationException("controlPing", context); + // if callback is in map it's because reply was already received + EarlyControlPingReply earlyReplyCallback = (EarlyControlPingReply) pingCalls.remove(context); + if(earlyReplyCallback == null) { + pingCalls.put(context, callback); + } else { + callback.onControlPingReply(earlyReplyCallback.getReply()); } - - pingCalls.put(context, callback); - return context; } + + return context; } @Override @@ -116,8 +125,9 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback synchronized (pingCalls) { callback = pingCalls.remove(reply.context); if (callback == null) { - LOG.log(Level.WARNING, "No callback was registered for reply context=" + reply.context + " Contexts waiting=" - + pingCalls.keySet()); + // reply received early, because we don't know callback to call + // we wrap the reply and let the sender to call it + pingCalls.put(reply.context, new EarlyControlPingReply(reply)); return; } } @@ -151,4 +161,27 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback public void close() throws Exception { connection.close(); } + + private static class EarlyControlPingReply implements ControlPingCallback { + + private final ControlPingReply reply; + + public EarlyControlPingReply(final ControlPingReply reply) { + this.reply = reply; + } + + public ControlPingReply getReply() { + return reply; + } + + @Override + public void onError(VppCallbackException ex) { + throw new IllegalStateException("Calling onError in EarlyControlPingReply"); + } + + @Override + public void onControlPingReply(ControlPingReply reply) { + throw new IllegalStateException("Calling onControlPingReply in EarlyControlPingReply"); + } + } } diff --git a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java index e7df528ae30..ac85f5309ec 100644 --- a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java +++ b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; + import io.fd.vpp.jvpp.JVpp; import io.fd.vpp.jvpp.JVppRegistry; import io.fd.vpp.jvpp.VppInvocationException; @@ -62,27 +63,41 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker { @Override @SuppressWarnings("unchecked") public <REQ extends JVppRequest, REPLY extends JVppReply<REQ>> CompletionStage<REPLY> send(REQ req) { - synchronized(requests) { - try { - final CompletableFuture<REPLY> replyCompletableFuture; - final int contextId = jvpp.send(req); + try { + // jvpp.send() can go to waiting state if sending queue is full, putting it into same + // synchronization block as used by receiving part (synchronized(requests)) can lead + // to deadlock between these two sides or at least slowing sending process by slow + // reader + final CompletableFuture<REPLY> replyCompletableFuture; + final int contextId = jvpp.send(req); + + if(req instanceof JVppDump) { + throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls"); + } - if(req instanceof JVppDump) { - throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls"); + synchronized(requests) { + CompletableFuture<? extends JVppReply<?>> replyFuture = requests.get(contextId); + if (replyFuture == null) { + // reply not yet received, put new future into map + replyCompletableFuture = new CompletableFuture<>(); + requests.put(contextId, replyCompletableFuture); + } else { + // reply already received (should be completed by reader), + // remove future from map and return it to caller + replyCompletableFuture = (CompletableFuture<REPLY>) replyFuture; + requests.remove(contextId); } - replyCompletableFuture = new CompletableFuture<>(); - requests.put(contextId, replyCompletableFuture); - - // TODO in case of timeouts/missing replies, requests from the map are not removed - // consider adding cancel method, that would remove requests from the map and cancel - // associated replyCompletableFuture - - return replyCompletableFuture; - } catch (VppInvocationException ex) { - final CompletableFuture<REPLY> replyCompletableFuture = new CompletableFuture<>(); - replyCompletableFuture.completeExceptionally(ex); - return replyCompletableFuture; } + + // TODO in case of timeouts/missing replies, requests from the map are not removed + // consider adding cancel method, that would remove requests from the map and cancel + // associated replyCompletableFuture + + return replyCompletableFuture; + } catch (VppInvocationException ex) { + final CompletableFuture<REPLY> replyCompletableFuture = new CompletableFuture<>(); + replyCompletableFuture.completeExceptionally(ex); + return replyCompletableFuture; } } @@ -90,30 +105,54 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker { @SuppressWarnings("unchecked") public <REQ extends JVppRequest, REPLY extends JVppReply<REQ>, DUMP extends JVppReplyDump<REQ, REPLY>> CompletionStage<DUMP> send( REQ req, DUMP emptyReplyDump) { - synchronized(requests) { - try { - final CompletableDumpFuture<DUMP> replyCompletableFuture; - final int contextId = jvpp.send(req); - - if(!(req instanceof JVppDump)) { - throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls"); - } - replyCompletableFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump); - - requests.put(contextId, replyCompletableFuture); - requests.put(registry.controlPing(jvpp.getClass()), replyCompletableFuture); - - // TODO in case of timeouts/missing replies, requests from the map are not removed - // consider adding cancel method, that would remove requests from the map and cancel - // associated replyCompletableFuture - - return replyCompletableFuture; - } catch (VppInvocationException ex) { - final CompletableFuture<DUMP> replyCompletableFuture = new CompletableFuture<>(); - replyCompletableFuture.completeExceptionally(ex); - return replyCompletableFuture; - } - } + try { + // jvpp.send() and registry.controlPing() can go to waiting state if sending queue is full, + // putting it into same synchronization block as used by receiving part (synchronized(requests)) + // can lead to deadlock between these two sides or at least slowing sending process by slow reader + final CompletableDumpFuture<DUMP> replyDumpFuture; + final int contextId = jvpp.send(req); + + if(!(req instanceof JVppDump)) { + throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls"); + } + + synchronized(requests) { + CompletableFuture<? extends JVppReply<?>> replyFuture = requests.get(contextId); + if (replyFuture == null) { + // reply not received yet, put new future to map + replyDumpFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump); + requests.put(contextId, replyDumpFuture); + } else { + // reply already received, save existing future + replyDumpFuture = (CompletableDumpFuture<DUMP>) replyFuture; + } + } + + final int pingId = registry.controlPing(jvpp.getClass()); + + synchronized(requests) { + if (requests.remove(pingId) == null) { + // reply not received yet, put future into map under pingId + requests.put(pingId, replyDumpFuture); + } else { + // reply already received, complete future + // ping reply couldn't complete the future because it is not in map under + // ping id + replyDumpFuture.complete(replyDumpFuture.getReplyDump()); + requests.remove(contextId); + } + } + + // TODO in case of timeouts/missing replies, requests from the map are not removed + // consider adding cancel method, that would remove requests from the map and cancel + // associated replyCompletableFuture + + return replyDumpFuture; + } catch (VppInvocationException ex) { + final CompletableFuture<DUMP> replyCompletableFuture = new CompletableFuture<>(); + replyCompletableFuture.completeExceptionally(ex); + return replyCompletableFuture; + } } public static final class CompletableDumpFuture<T extends JVppReplyDump<?, ?>> extends CompletableFuture<T> { |