diff options
Diffstat (limited to 'src/vpp-api/java/jvpp-registry')
-rw-r--r-- | src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java | 49 | ||||
-rw-r--r-- | src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java | 123 |
2 files changed, 122 insertions, 50 deletions
diff --git a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java index 6e938ae313f..baef14c3865 100644 --- a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java +++ b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java @@ -99,15 +99,24 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback final ControlPingCallback callback = (ControlPingCallback) pluginRegistry.get(clazz.getName()); assertPluginWasRegistered(name, callback); + // controlPing0 is sending function and can go to waiting in case of e. g. full queue + // because of that it cant be in same synchronization block as used by reply handler function + int context = controlPing0(); + if (context < 0) { + throw new VppInvocationException("controlPing", context); + } + synchronized (pingCalls) { - int context = controlPing0(); - if (context < 0) { - throw new VppInvocationException("controlPing", context); + // if callback is in map it's because reply was already received + EarlyControlPingReply earlyReplyCallback = (EarlyControlPingReply) pingCalls.remove(context); + if(earlyReplyCallback == null) { + pingCalls.put(context, callback); + } else { + callback.onControlPingReply(earlyReplyCallback.getReply()); } - - pingCalls.put(context, callback); - return context; } + + return context; } @Override @@ -116,8 +125,9 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback synchronized (pingCalls) { callback = pingCalls.remove(reply.context); if (callback == null) { - LOG.log(Level.WARNING, "No callback was registered for reply context=" + reply.context + " Contexts waiting=" - + pingCalls.keySet()); + // reply received early, because we don't know callback to call + // we wrap the reply and let the sender to call it + pingCalls.put(reply.context, new EarlyControlPingReply(reply)); return; } } @@ -151,4 +161,27 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback public void close() throws Exception { connection.close(); } + + private static class EarlyControlPingReply implements ControlPingCallback { + + private final ControlPingReply reply; + + public EarlyControlPingReply(final ControlPingReply reply) { + this.reply = reply; + } + + public ControlPingReply getReply() { + return reply; + } + + @Override + public void onError(VppCallbackException ex) { + throw new IllegalStateException("Calling onError in EarlyControlPingReply"); + } + + @Override + public void onControlPingReply(ControlPingReply reply) { + throw new IllegalStateException("Calling onControlPingReply in EarlyControlPingReply"); + } + } } diff --git a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java index e7df528ae30..ac85f5309ec 100644 --- a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java +++ b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; + import io.fd.vpp.jvpp.JVpp; import io.fd.vpp.jvpp.JVppRegistry; import io.fd.vpp.jvpp.VppInvocationException; @@ -62,27 +63,41 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker { @Override @SuppressWarnings("unchecked") public <REQ extends JVppRequest, REPLY extends JVppReply<REQ>> CompletionStage<REPLY> send(REQ req) { - synchronized(requests) { - try { - final CompletableFuture<REPLY> replyCompletableFuture; - final int contextId = jvpp.send(req); + try { + // jvpp.send() can go to waiting state if sending queue is full, putting it into same + // synchronization block as used by receiving part (synchronized(requests)) can lead + // to deadlock between these two sides or at least slowing sending process by slow + // reader + final CompletableFuture<REPLY> replyCompletableFuture; + final int contextId = jvpp.send(req); + + if(req instanceof JVppDump) { + throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls"); + } - if(req instanceof JVppDump) { - throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls"); + synchronized(requests) { + CompletableFuture<? extends JVppReply<?>> replyFuture = requests.get(contextId); + if (replyFuture == null) { + // reply not yet received, put new future into map + replyCompletableFuture = new CompletableFuture<>(); + requests.put(contextId, replyCompletableFuture); + } else { + // reply already received (should be completed by reader), + // remove future from map and return it to caller + replyCompletableFuture = (CompletableFuture<REPLY>) replyFuture; + requests.remove(contextId); } - replyCompletableFuture = new CompletableFuture<>(); - requests.put(contextId, replyCompletableFuture); - - // TODO in case of timeouts/missing replies, requests from the map are not removed - // consider adding cancel method, that would remove requests from the map and cancel - // associated replyCompletableFuture - - return replyCompletableFuture; - } catch (VppInvocationException ex) { - final CompletableFuture<REPLY> replyCompletableFuture = new CompletableFuture<>(); - replyCompletableFuture.completeExceptionally(ex); - return replyCompletableFuture; } + + // TODO in case of timeouts/missing replies, requests from the map are not removed + // consider adding cancel method, that would remove requests from the map and cancel + // associated replyCompletableFuture + + return replyCompletableFuture; + } catch (VppInvocationException ex) { + final CompletableFuture<REPLY> replyCompletableFuture = new CompletableFuture<>(); + replyCompletableFuture.completeExceptionally(ex); + return replyCompletableFuture; } } @@ -90,30 +105,54 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker { @SuppressWarnings("unchecked") public <REQ extends JVppRequest, REPLY extends JVppReply<REQ>, DUMP extends JVppReplyDump<REQ, REPLY>> CompletionStage<DUMP> send( REQ req, DUMP emptyReplyDump) { - synchronized(requests) { - try { - final CompletableDumpFuture<DUMP> replyCompletableFuture; - final int contextId = jvpp.send(req); - - if(!(req instanceof JVppDump)) { - throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls"); - } - replyCompletableFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump); - - requests.put(contextId, replyCompletableFuture); - requests.put(registry.controlPing(jvpp.getClass()), replyCompletableFuture); - - // TODO in case of timeouts/missing replies, requests from the map are not removed - // consider adding cancel method, that would remove requests from the map and cancel - // associated replyCompletableFuture - - return replyCompletableFuture; - } catch (VppInvocationException ex) { - final CompletableFuture<DUMP> replyCompletableFuture = new CompletableFuture<>(); - replyCompletableFuture.completeExceptionally(ex); - return replyCompletableFuture; - } - } + try { + // jvpp.send() and registry.controlPing() can go to waiting state if sending queue is full, + // putting it into same synchronization block as used by receiving part (synchronized(requests)) + // can lead to deadlock between these two sides or at least slowing sending process by slow reader + final CompletableDumpFuture<DUMP> replyDumpFuture; + final int contextId = jvpp.send(req); + + if(!(req instanceof JVppDump)) { + throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls"); + } + + synchronized(requests) { + CompletableFuture<? extends JVppReply<?>> replyFuture = requests.get(contextId); + if (replyFuture == null) { + // reply not received yet, put new future to map + replyDumpFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump); + requests.put(contextId, replyDumpFuture); + } else { + // reply already received, save existing future + replyDumpFuture = (CompletableDumpFuture<DUMP>) replyFuture; + } + } + + final int pingId = registry.controlPing(jvpp.getClass()); + + synchronized(requests) { + if (requests.remove(pingId) == null) { + // reply not received yet, put future into map under pingId + requests.put(pingId, replyDumpFuture); + } else { + // reply already received, complete future + // ping reply couldn't complete the future because it is not in map under + // ping id + replyDumpFuture.complete(replyDumpFuture.getReplyDump()); + requests.remove(contextId); + } + } + + // TODO in case of timeouts/missing replies, requests from the map are not removed + // consider adding cancel method, that would remove requests from the map and cancel + // associated replyCompletableFuture + + return replyDumpFuture; + } catch (VppInvocationException ex) { + final CompletableFuture<DUMP> replyCompletableFuture = new CompletableFuture<>(); + replyCompletableFuture.completeExceptionally(ex); + return replyCompletableFuture; + } } public static final class CompletableDumpFuture<T extends JVppReplyDump<?, ?>> extends CompletableFuture<T> { |