From 0578156b721fa01c8c645b8f9625ecebdb6449e4 Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Wed, 27 Jul 2016 11:05:51 +0200 Subject: HONEYCOMB-130: Separate v3po plugin from HC infra Creating folders: - common/ - infra/ - v3po/ - vpp-common/ Change-Id: I2c39e1b17e39e7c0f0628f44aa5fe08563fa06e4 Signed-off-by: Maros Marsalek --- .../impl/HoneycombNotificationCollector.java | 66 ++++++++++++ .../impl/NotificationProducerRegistry.java | 112 +++++++++++++++++++++ .../impl/NotificationProducerTracker.java | 109 ++++++++++++++++++++ 3 files changed, 287 insertions(+) create mode 100644 infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/HoneycombNotificationCollector.java create mode 100644 infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/NotificationProducerRegistry.java create mode 100644 infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/NotificationProducerTracker.java (limited to 'infra/notification/impl/src/main/java/io') diff --git a/infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/HoneycombNotificationCollector.java b/infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/HoneycombNotificationCollector.java new file mode 100644 index 000000000..e7d54e318 --- /dev/null +++ b/infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/HoneycombNotificationCollector.java @@ -0,0 +1,66 @@ +/* + * Copyright (c) 2016 Cisco and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fd.honeycomb.v3po.notification.impl; + +import io.fd.honeycomb.v3po.notification.NotificationCollector; +import java.util.Collection; +import javax.annotation.Nonnull; +import org.opendaylight.controller.md.sal.binding.api.NotificationPublishService; +import org.opendaylight.yangtools.yang.binding.Notification; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Notification collector based on MD-SAL's {@link NotificationPublishService}. + */ +public final class HoneycombNotificationCollector implements NotificationCollector, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(HoneycombNotificationCollector.class); + + private final NotificationPublishService bindingDOMNotificationPublishServiceAdapter; + private final NotificationProducerRegistry notificationProducerRegistry; + + public HoneycombNotificationCollector( + @Nonnull final NotificationPublishService bindingDOMNotificationPublishServiceAdapter, + @Nonnull final NotificationProducerRegistry notificationProducerRegistry) { + this.bindingDOMNotificationPublishServiceAdapter = bindingDOMNotificationPublishServiceAdapter; + this.notificationProducerRegistry = notificationProducerRegistry; + } + + @Override + public void close() throws Exception { + LOG.trace("Closing"); + } + + @Override + public void onNotification(@Nonnull final Notification notification) { + LOG.debug("Notification: {} pushed into collector", notification.getClass().getSimpleName()); + LOG.trace("Notification: {} pushed into collector", notification); + try { + bindingDOMNotificationPublishServiceAdapter.putNotification(notification); + } catch (InterruptedException e) { + LOG.warn("Interrupted", e); + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + @Override + @Nonnull + public Collection> getNotificationTypes() { + return notificationProducerRegistry.getNotificationTypes(); + } +} diff --git a/infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/NotificationProducerRegistry.java b/infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/NotificationProducerRegistry.java new file mode 100644 index 000000000..8fba700bd --- /dev/null +++ b/infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/NotificationProducerRegistry.java @@ -0,0 +1,112 @@ +/* + * Copyright (c) 2016 Cisco and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fd.honeycomb.v3po.notification.impl; + +import static com.google.common.base.Preconditions.checkArgument; + +import com.google.common.collect.HashMultimap; +import com.google.common.collect.Maps; +import com.google.common.collect.Multimap; +import io.fd.honeycomb.v3po.notification.ManagedNotificationProducer; +import io.fd.honeycomb.v3po.notification.NotificationProducer; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.concurrent.ThreadSafe; +import org.opendaylight.yangtools.yang.binding.Notification; +import org.opendaylight.yangtools.yang.common.QName; + +/** + * Holds the collection of registered notification producers. + * Provides additional information about the types of notifications produced per producer and overall. + */ +@ThreadSafe +public final class NotificationProducerRegistry { + + private final Set> notificationTypes; + private final Map notificationQNameToProducer; + private final Multimap notificationProducerQNames; + + public NotificationProducerRegistry(final List notificationProducersDependency) { + this.notificationTypes = toTypes(notificationProducersDependency); + this.notificationQNameToProducer = toQNameMap(notificationProducersDependency); + this.notificationProducerQNames = toQNameMapReversed(notificationProducersDependency); + } + + private static Multimap toQNameMapReversed(final List notificationProducers) { + final Multimap multimap = HashMultimap.create(); + + for (ManagedNotificationProducer producer : notificationProducers) { + for (Class aClass : producer.getNotificationTypes()) { + multimap.put(producer, getQName(aClass)); + } + } + return multimap; + } + + private static Set> toTypes(final List notificationProducersDependency) { + // Get all notification types registered from HC notification producers + return notificationProducersDependency + .stream() + .flatMap(producer -> producer.getNotificationTypes().stream()) + .collect(Collectors.toSet()); + } + + + private static Map toQNameMap(final List producerDependencies) { + // Only a single notification producer per notification type is allowed + final Map qNamesToProducers = Maps.newHashMap(); + for (ManagedNotificationProducer notificationProducer : producerDependencies) { + for (QName qName : typesToQNames(notificationProducer.getNotificationTypes())) { + final NotificationProducer previousProducer = qNamesToProducers.put(qName, notificationProducer); + checkArgument(previousProducer == null, "2 producers of the same notification type: %s. " + + "Producer 1: {} Producer 2: {}" , qName, previousProducer, notificationProducer); + } + } + return qNamesToProducers; + } + + + private static Set typesToQNames(final Collection> notificationTypes) { + return notificationTypes + .stream() + .map(NotificationProducerRegistry::getQName) + .collect(Collectors.toSet()); + } + + + public static QName getQName(final Class aClass) { + try { + return (QName) aClass.getField("QNAME").get(null); + } catch (NoSuchFieldException | IllegalAccessException e) { + throw new IllegalArgumentException("Unable to retrieve QName for notification of type: " + aClass, e); + } + } + + Set> getNotificationTypes() { + return notificationTypes; + } + + Map getNotificationQNameToProducer() { + return notificationQNameToProducer; + } + + Multimap getNotificationProducerQNames() { + return notificationProducerQNames; + } +} diff --git a/infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/NotificationProducerTracker.java b/infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/NotificationProducerTracker.java new file mode 100644 index 000000000..cefb50ac9 --- /dev/null +++ b/infra/notification/impl/src/main/java/io/fd/honeycomb/v3po/notification/impl/NotificationProducerTracker.java @@ -0,0 +1,109 @@ +/* + * Copyright (c) 2016 Cisco and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fd.honeycomb.v3po.notification.impl; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Sets; +import io.fd.honeycomb.v3po.notification.ManagedNotificationProducer; +import io.fd.honeycomb.v3po.notification.NotificationCollector; +import java.util.HashSet; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.concurrent.ThreadSafe; +import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListener; +import org.opendaylight.controller.md.sal.dom.spi.DOMNotificationSubscriptionListenerRegistry; +import org.opendaylight.yangtools.concepts.ListenerRegistration; +import org.opendaylight.yangtools.yang.common.QName; +import org.opendaylight.yangtools.yang.model.api.SchemaPath; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Starts & stops notification producer dependencies on demand. + * Uses {@link DOMNotificationSubscriptionListenerRegistry} to receive subscription change notifications. + */ +@ThreadSafe +public final class NotificationProducerTracker + implements DOMNotificationSubscriptionListener, AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(NotificationProducerTracker.class); + + private final ListenerRegistration subscriptionListener; + private final NotificationProducerRegistry registry; + private final NotificationCollector collector; + + private final Set alreadyStartedProducers = new HashSet<>(); + + public NotificationProducerTracker(@Nonnull final NotificationProducerRegistry registry, + @Nonnull final NotificationCollector collector, + @Nonnull final DOMNotificationSubscriptionListenerRegistry notificationRouter) { + this.registry = registry; + this.collector = collector; + this.subscriptionListener = notificationRouter.registerSubscriptionListener(this); + } + + @Override + public synchronized void onSubscriptionChanged(final Set set) { + LOG.debug("Subscriptions changed. Current subscriptions: {}", set); + final Set currentSubscriptions = set.stream().map(SchemaPath::getLastComponent).collect(Collectors.toSet()); + final Set startedQNames = getStartedQNames(alreadyStartedProducers); + final Sets.SetView newSubscriptions = Sets.difference(currentSubscriptions, startedQNames); + LOG.debug("Subscriptions changed. New subscriptions: {}", newSubscriptions); + final Sets.SetView deletedSubscriptions = Sets.difference(startedQNames, currentSubscriptions); + LOG.debug("Subscriptions changed. Deleted subscriptions: {}", deletedSubscriptions); + + newSubscriptions.stream().forEach(newSub -> { + if(!registry.getNotificationQNameToProducer().containsKey(newSub)) { + return; + } + final ManagedNotificationProducer producer = registry.getNotificationQNameToProducer().get(newSub); + if(alreadyStartedProducers.contains(producer)) { + return; + } + LOG.debug("Starting notification producer: {}", producer); + producer.start(collector); + alreadyStartedProducers.add(producer); + }); + + deletedSubscriptions.stream().forEach(newSub -> { + checkState(registry.getNotificationQNameToProducer().containsKey(newSub)); + final ManagedNotificationProducer producer = registry.getNotificationQNameToProducer().get(newSub); + checkState(alreadyStartedProducers.contains(producer)); + LOG.debug("Stopping notification producer: {}", producer); + producer.stop(); + alreadyStartedProducers.remove(producer); + }); + + } + + private Set getStartedQNames(final Set alreadyStartedProducers) { + return alreadyStartedProducers.stream() + .flatMap(p -> registry.getNotificationProducerQNames().get(p).stream()) + .collect(Collectors.toSet()); + } + + @Override + public synchronized void close() throws Exception { + LOG.trace("Closing"); + subscriptionListener.close(); + // Stop all producers + LOG.debug("Stopping all producers: {}", alreadyStartedProducers); + alreadyStartedProducers.forEach(ManagedNotificationProducer::stop); + alreadyStartedProducers.clear(); + } +} -- cgit 1.2.3-korg