diff options
3 files changed, 160 insertions, 72 deletions
diff --git a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java index 6e938ae313f..baef14c3865 100644 --- a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java +++ b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/JVppRegistryImpl.java @@ -99,15 +99,24 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback final ControlPingCallback callback = (ControlPingCallback) pluginRegistry.get(clazz.getName()); assertPluginWasRegistered(name, callback); + // controlPing0 is sending function and can go to waiting in case of e. g. full queue + // because of that it cant be in same synchronization block as used by reply handler function + int context = controlPing0(); + if (context < 0) { + throw new VppInvocationException("controlPing", context); + } + synchronized (pingCalls) { - int context = controlPing0(); - if (context < 0) { - throw new VppInvocationException("controlPing", context); + // if callback is in map it's because reply was already received + EarlyControlPingReply earlyReplyCallback = (EarlyControlPingReply) pingCalls.remove(context); + if(earlyReplyCallback == null) { + pingCalls.put(context, callback); + } else { + callback.onControlPingReply(earlyReplyCallback.getReply()); } - - pingCalls.put(context, callback); - return context; } + + return context; } @Override @@ -116,8 +125,9 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback synchronized (pingCalls) { callback = pingCalls.remove(reply.context); if (callback == null) { - LOG.log(Level.WARNING, "No callback was registered for reply context=" + reply.context + " Contexts waiting=" - + pingCalls.keySet()); + // reply received early, because we don't know callback to call + // we wrap the reply and let the sender to call it + pingCalls.put(reply.context, new EarlyControlPingReply(reply)); return; } } @@ -151,4 +161,27 @@ public final class JVppRegistryImpl implements JVppRegistry, ControlPingCallback public void close() throws Exception { connection.close(); } + + private static class EarlyControlPingReply implements ControlPingCallback { + + private final ControlPingReply reply; + + public EarlyControlPingReply(final ControlPingReply reply) { + this.reply = reply; + } + + public ControlPingReply getReply() { + return reply; + } + + @Override + public void onError(VppCallbackException ex) { + throw new IllegalStateException("Calling onError in EarlyControlPingReply"); + } + + @Override + public void onControlPingReply(ControlPingReply reply) { + throw new IllegalStateException("Calling onControlPingReply in EarlyControlPingReply"); + } + } } diff --git a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java index e7df528ae30..ac85f5309ec 100644 --- a/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java +++ b/src/vpp-api/java/jvpp-registry/io/fd/vpp/jvpp/future/AbstractFutureJVppInvoker.java @@ -21,6 +21,7 @@ import java.util.Map; import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; + import io.fd.vpp.jvpp.JVpp; import io.fd.vpp.jvpp.JVppRegistry; import io.fd.vpp.jvpp.VppInvocationException; @@ -62,27 +63,41 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker { @Override @SuppressWarnings("unchecked") public <REQ extends JVppRequest, REPLY extends JVppReply<REQ>> CompletionStage<REPLY> send(REQ req) { - synchronized(requests) { - try { - final CompletableFuture<REPLY> replyCompletableFuture; - final int contextId = jvpp.send(req); + try { + // jvpp.send() can go to waiting state if sending queue is full, putting it into same + // synchronization block as used by receiving part (synchronized(requests)) can lead + // to deadlock between these two sides or at least slowing sending process by slow + // reader + final CompletableFuture<REPLY> replyCompletableFuture; + final int contextId = jvpp.send(req); + + if(req instanceof JVppDump) { + throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls"); + } - if(req instanceof JVppDump) { - throw new IllegalArgumentException("Send with empty reply dump has to be used in case of dump calls"); + synchronized(requests) { + CompletableFuture<? extends JVppReply<?>> replyFuture = requests.get(contextId); + if (replyFuture == null) { + // reply not yet received, put new future into map + replyCompletableFuture = new CompletableFuture<>(); + requests.put(contextId, replyCompletableFuture); + } else { + // reply already received (should be completed by reader), + // remove future from map and return it to caller + replyCompletableFuture = (CompletableFuture<REPLY>) replyFuture; + requests.remove(contextId); } - replyCompletableFuture = new CompletableFuture<>(); - requests.put(contextId, replyCompletableFuture); - - // TODO in case of timeouts/missing replies, requests from the map are not removed - // consider adding cancel method, that would remove requests from the map and cancel - // associated replyCompletableFuture - - return replyCompletableFuture; - } catch (VppInvocationException ex) { - final CompletableFuture<REPLY> replyCompletableFuture = new CompletableFuture<>(); - replyCompletableFuture.completeExceptionally(ex); - return replyCompletableFuture; } + + // TODO in case of timeouts/missing replies, requests from the map are not removed + // consider adding cancel method, that would remove requests from the map and cancel + // associated replyCompletableFuture + + return replyCompletableFuture; + } catch (VppInvocationException ex) { + final CompletableFuture<REPLY> replyCompletableFuture = new CompletableFuture<>(); + replyCompletableFuture.completeExceptionally(ex); + return replyCompletableFuture; } } @@ -90,30 +105,54 @@ public abstract class AbstractFutureJVppInvoker implements FutureJVppInvoker { @SuppressWarnings("unchecked") public <REQ extends JVppRequest, REPLY extends JVppReply<REQ>, DUMP extends JVppReplyDump<REQ, REPLY>> CompletionStage<DUMP> send( REQ req, DUMP emptyReplyDump) { - synchronized(requests) { - try { - final CompletableDumpFuture<DUMP> replyCompletableFuture; - final int contextId = jvpp.send(req); - - if(!(req instanceof JVppDump)) { - throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls"); - } - replyCompletableFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump); - - requests.put(contextId, replyCompletableFuture); - requests.put(registry.controlPing(jvpp.getClass()), replyCompletableFuture); - - // TODO in case of timeouts/missing replies, requests from the map are not removed - // consider adding cancel method, that would remove requests from the map and cancel - // associated replyCompletableFuture - - return replyCompletableFuture; - } catch (VppInvocationException ex) { - final CompletableFuture<DUMP> replyCompletableFuture = new CompletableFuture<>(); - replyCompletableFuture.completeExceptionally(ex); - return replyCompletableFuture; - } - } + try { + // jvpp.send() and registry.controlPing() can go to waiting state if sending queue is full, + // putting it into same synchronization block as used by receiving part (synchronized(requests)) + // can lead to deadlock between these two sides or at least slowing sending process by slow reader + final CompletableDumpFuture<DUMP> replyDumpFuture; + final int contextId = jvpp.send(req); + + if(!(req instanceof JVppDump)) { + throw new IllegalArgumentException("Send without empty reply dump has to be used in case of regular calls"); + } + + synchronized(requests) { + CompletableFuture<? extends JVppReply<?>> replyFuture = requests.get(contextId); + if (replyFuture == null) { + // reply not received yet, put new future to map + replyDumpFuture = new CompletableDumpFuture<>(contextId, emptyReplyDump); + requests.put(contextId, replyDumpFuture); + } else { + // reply already received, save existing future + replyDumpFuture = (CompletableDumpFuture<DUMP>) replyFuture; + } + } + + final int pingId = registry.controlPing(jvpp.getClass()); + + synchronized(requests) { + if (requests.remove(pingId) == null) { + // reply not received yet, put future into map under pingId + requests.put(pingId, replyDumpFuture); + } else { + // reply already received, complete future + // ping reply couldn't complete the future because it is not in map under + // ping id + replyDumpFuture.complete(replyDumpFuture.getReplyDump()); + requests.remove(contextId); + } + } + + // TODO in case of timeouts/missing replies, requests from the map are not removed + // consider adding cancel method, that would remove requests from the map and cancel + // associated replyCompletableFuture + + return replyDumpFuture; + } catch (VppInvocationException ex) { + final CompletableFuture<DUMP> replyCompletableFuture = new CompletableFuture<>(); + replyCompletableFuture.completeExceptionally(ex); + return replyCompletableFuture; + } } public static final class CompletableDumpFuture<T extends JVppReplyDump<?, ?>> extends CompletableFuture<T> { diff --git a/src/vpp-api/java/jvpp/gen/jvppgen/jvpp_future_facade_gen.py b/src/vpp-api/java/jvpp/gen/jvppgen/jvpp_future_facade_gen.py index a31287b3333..a45a532d7f2 100644 --- a/src/vpp-api/java/jvpp/gen/jvppgen/jvpp_future_facade_gen.py +++ b/src/vpp-api/java/jvpp/gen/jvppgen/jvpp_future_facade_gen.py @@ -61,26 +61,28 @@ public final class FutureJVpp${plugin_name}FacadeCallback implements $plugin_pac @Override @SuppressWarnings("unchecked") public void onControlPingReply(final $base_package.$dto_package.ControlPingReply reply) { - final java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<?>> completableFuture; + java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<?>> completableFuture; final int replyId = reply.context; synchronized(requests) { completableFuture = (java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<?>>) requests.get(replyId); - } - if(completableFuture != null) { - // Finish dump call - if (completableFuture instanceof $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) { - completableFuture.complete((($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) completableFuture).getReplyDump()); - // Remove future mapped to dump call context id - synchronized(requests) { + if(completableFuture != null) { + // Finish dump call + if (completableFuture instanceof $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) { + completableFuture.complete((($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) completableFuture).getReplyDump()); + // Remove future mapped to dump call context id requests.remove((($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture) completableFuture).getContextId()); + } else { + // reply to regular control ping, complete the future + completableFuture.complete(reply); } + requests.remove(replyId); } else { + // future not yet created by writer, create new future, complete it and put to map under ping id + completableFuture = new java.util.concurrent.CompletableFuture<>(); completableFuture.complete(reply); - } - synchronized(requests) { - requests.remove(replyId); + requests.put(replyId, completableFuture); } } } @@ -93,20 +95,26 @@ jvpp_facade_callback_method_template = Template(""" @Override @SuppressWarnings("unchecked") public void on$callback_dto(final $plugin_package.$dto_package.$callback_dto reply) { - final java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<?>> completableFuture; + java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<$plugin_package.$dto_package.$request_dto>> completableFuture; final int replyId = reply.context; if (LOG.isLoggable(java.util.logging.Level.FINE)) { LOG.fine(String.format("Received $callback_dto event message: %s", reply)); } synchronized(requests) { - completableFuture = (java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<?>>) requests.get(replyId); - } + completableFuture = + (java.util.concurrent.CompletableFuture<$base_package.$dto_package.JVppReply<$plugin_package.$dto_package.$request_dto>>) requests.get(replyId); - if(completableFuture != null) { - completableFuture.complete(reply); - - synchronized(requests) { + if(completableFuture != null) { + // received reply on request, complete future created by sender and remove it from map + completableFuture.complete(reply); requests.remove(replyId); + } else { + // reply received before writer created future, + // create new future, complete it and put into map to + // notify sender that reply is already received + completableFuture = new java.util.concurrent.CompletableFuture<>(); + completableFuture.complete(reply); + requests.put(replyId, completableFuture); } } } @@ -126,16 +134,22 @@ jvpp_facade_details_callback_method_template = Template(""" @Override @SuppressWarnings("unchecked") public void on$callback_dto(final $plugin_package.$dto_package.$callback_dto reply) { - final $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<$plugin_package.$dto_package.$callback_dto_reply_dump> completableFuture; + $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<$plugin_package.$dto_package.$callback_dto_reply_dump> completableFuture; final int replyId = reply.context; if (LOG.isLoggable(java.util.logging.Level.FINE)) { LOG.fine(String.format("Received $callback_dto event message: %s", reply)); } synchronized(requests) { completableFuture = ($base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<$plugin_package.$dto_package.$callback_dto_reply_dump>) requests.get(replyId); - } - if(completableFuture != null) { + if(completableFuture == null) { + // reply received before writer created future, + // create new future, and put into map to notify sender that reply is already received, + // following details replies will add information to this future + completableFuture = new $base_package.$future_package.AbstractFutureJVppInvoker.CompletableDumpFuture<>(replyId, + new $plugin_package.$dto_package.$callback_dto_reply_dump()); + requests.put(replyId, completableFuture); + } completableFuture.getReplyDump().$callback_dto_field.add(reply); } } @@ -165,6 +179,7 @@ def generate_jvpp(func_list, base_package, plugin_package, plugin_name, dto_pack if not util.is_notification(func["name"]): camel_case_request_method_name = util.remove_reply_suffix(util.underscore_to_camelcase(func['name'])) + request_dto = util.remove_reply_suffix(util.underscore_to_camelcase_upper(func['name'])) if util.is_details(camel_case_name_with_suffix): camel_case_reply_name = get_standard_dump_reply_name(util.underscore_to_camelcase_upper(func['name']), func['name']) @@ -208,7 +223,8 @@ def generate_jvpp(func_list, base_package, plugin_package, plugin_name, dto_pack callbacks.append(jvpp_facade_callback_method_template.substitute(base_package=base_package, plugin_package=plugin_package, dto_package=dto_package, - callback_dto=camel_case_name_with_suffix)) + callback_dto=camel_case_name_with_suffix, + request_dto=request_dto)) if util.is_notification(func["name"]): callbacks.append(jvpp_facade_callback_notification_method_template.substitute(plugin_package=plugin_package, |