From 757222979bc02d0aaba1870eea36413383d15bde Mon Sep 17 00:00:00 2001 From: Maros Marsalek Date: Tue, 8 Nov 2016 10:13:36 +0100 Subject: HONEYCOMB-270 Add isPresent() to Readers/Customizers So that they can influence whether empty data is to be considered as present + Move registries implementations from util to impl + Introduce DelegatingReader trait + Extend GenericReader where possible to reduce duplication Change-Id: I5a416acd0c4eab1fbc30fcbe585719991dbe9215 Signed-off-by: Maros Marsalek --- .../impl/write/registry/FlatWriterRegistry.java | 314 +++++++++++++++++++++ 1 file changed, 314 insertions(+) create mode 100644 infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java (limited to 'infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java') diff --git a/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java b/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java new file mode 100644 index 000000000..990431a15 --- /dev/null +++ b/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java @@ -0,0 +1,314 @@ +/* + * Copyright (c) 2016 Cisco and/or its affiliates. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fd.honeycomb.translate.impl.write.registry; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static io.fd.honeycomb.translate.util.RWUtils.makeIidWildcarded; + +import com.google.common.base.Optional; +import com.google.common.collect.HashMultimap; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Lists; +import com.google.common.collect.Multimap; +import com.google.common.collect.Sets; +import io.fd.honeycomb.translate.TranslationException; +import io.fd.honeycomb.translate.util.RWUtils; +import io.fd.honeycomb.translate.write.DataObjectUpdate; +import io.fd.honeycomb.translate.write.WriteContext; +import io.fd.honeycomb.translate.write.WriteFailedException; +import io.fd.honeycomb.translate.write.Writer; +import io.fd.honeycomb.translate.write.registry.WriterRegistry; +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; +import org.opendaylight.yangtools.yang.binding.DataObject; +import org.opendaylight.yangtools.yang.binding.InstanceIdentifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Flat writer registry, delegating updates to writers in the order writers were submitted. + */ +@ThreadSafe +final class FlatWriterRegistry implements WriterRegistry { + + private static final Logger LOG = LoggerFactory.getLogger(FlatWriterRegistry.class); + + // All types handled by writers directly or as children + private final ImmutableSet> handledTypes; + + private final Set> writersOrderReversed; + private final Set> writersOrder; + private final Map, Writer> writers; + + /** + * Create flat registry instance. + * + * @param writers immutable, ordered map of writers to use to process updates. Order of the writers has to be + * one in which create and update operations should be handled. Deletes will be handled in reversed + * order. All deletes are handled before handling all the updates. + */ + FlatWriterRegistry(@Nonnull final ImmutableMap, Writer> writers) { + this.writers = writers; + this.writersOrderReversed = Sets.newLinkedHashSet(Lists.reverse(Lists.newArrayList(writers.keySet()))); + this.writersOrder = writers.keySet(); + this.handledTypes = getAllHandledTypes(writers); + } + + private static ImmutableSet> getAllHandledTypes( + @Nonnull final ImmutableMap, Writer> writers) { + final ImmutableSet.Builder> handledTypesBuilder = ImmutableSet.builder(); + for (Map.Entry, Writer> writerEntry : writers.entrySet()) { + final InstanceIdentifier writerType = writerEntry.getKey(); + final Writer writer = writerEntry.getValue(); + handledTypesBuilder.add(writerType); + if (writer instanceof SubtreeWriter) { + handledTypesBuilder.addAll(((SubtreeWriter) writer).getHandledChildTypes()); + } + } + return handledTypesBuilder.build(); + } + + @Override + public void update(@Nonnull final DataObjectUpdates updates, + @Nonnull final WriteContext ctx) throws TranslationException { + if (updates.isEmpty()) { + return; + } + + // Optimization + if (updates.containsOnlySingleType()) { + // First process delete + singleUpdate(updates.getDeletes(), ctx); + // Next is update + singleUpdate(updates.getUpdates(), ctx); + } else { + // First process deletes + bulkUpdate(updates.getDeletes(), ctx, true, writersOrderReversed); + // Next are updates + bulkUpdate(updates.getUpdates(), ctx, true, writersOrder); + } + + LOG.debug("Update successful for types: {}", updates.getTypeIntersection()); + LOG.trace("Update successful for: {}", updates); + } + + private void singleUpdate(@Nonnull final Multimap, ? extends DataObjectUpdate> updates, + @Nonnull final WriteContext ctx) throws WriteFailedException { + if (updates.isEmpty()) { + return; + } + + final InstanceIdentifier singleType = updates.keySet().iterator().next(); + LOG.debug("Performing single type update for: {}", singleType); + Collection singleTypeUpdates = updates.get(singleType); + Writer writer = getWriter(singleType); + + if (writer == null) { + // This node must be handled by a subtree writer, find it and call it or else fail + checkArgument(handledTypes.contains(singleType), "Unable to process update. Missing writers for: %s", + singleType); + writer = getSubtreeWriterResponsible(singleType); + singleTypeUpdates = getParentDataObjectUpdate(ctx, updates, writer); + } + + LOG.trace("Performing single type update with writer: {}", writer); + for (DataObjectUpdate singleUpdate : singleTypeUpdates) { + writer.update(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx); + } + } + + private Writer getSubtreeWriterResponsible(final InstanceIdentifier singleType) { + return writers.values().stream() + .filter(w -> w instanceof SubtreeWriter) + .filter(w -> ((SubtreeWriter) w).getHandledChildTypes().contains(singleType)) + .findFirst() + .get(); + } + + private Collection getParentDataObjectUpdate(final WriteContext ctx, + final Multimap, ? extends DataObjectUpdate> updates, + final Writer writer) { + // Now read data for subtree reader root, but first keyed ID is needed and that ID can be cut from updates + InstanceIdentifier firstAffectedChildId = ((SubtreeWriter) writer).getHandledChildTypes().stream() + .filter(updates::containsKey) + .map(unkeyedId -> updates.get(unkeyedId)) + .flatMap(doUpdates -> doUpdates.stream()) + .map(DataObjectUpdate::getId) + .findFirst() + .get(); + + final InstanceIdentifier parentKeyedId = + RWUtils.cutId(firstAffectedChildId, writer.getManagedDataObjectType()); + + final Optional parentBefore = ctx.readBefore(parentKeyedId); + final Optional parentAfter = ctx.readAfter(parentKeyedId); + return Collections.singleton( + DataObjectUpdate.create(parentKeyedId, parentBefore.orNull(), parentAfter.orNull())); + } + + private void bulkUpdate(@Nonnull final Multimap, ? extends DataObjectUpdate> updates, + @Nonnull final WriteContext ctx, + final boolean attemptRevert, + @Nonnull final Set> writersOrder) throws BulkUpdateException { + if (updates.isEmpty()) { + return; + } + + LOG.debug("Performing bulk update with revert attempt: {} for: {}", attemptRevert, updates.keySet()); + + // Check that all updates can be handled + checkAllTypesCanBeHandled(updates); + + // Capture all changes successfully processed in case revert is needed + final Set> processedNodes = new HashSet<>(); + + // Iterate over all writers and call update if there are any related updates + for (InstanceIdentifier writerType : writersOrder) { + Collection writersData = updates.get(writerType); + final Writer writer = getWriter(writerType); + + if (writersData.isEmpty()) { + // If there are no data for current writer, but it is a SubtreeWriter and there are updates to + // its children, still invoke it with its root data + if (writer instanceof SubtreeWriter && isAffected(((SubtreeWriter) writer), updates)) { + // Provide parent data for SubtreeWriter for further processing + writersData = getParentDataObjectUpdate(ctx, updates, writer); + } else { + // Skipping unaffected writer + // Alternative to this would be modification sort according to the order of writers + continue; + } + } + + LOG.debug("Performing update for: {}", writerType); + LOG.trace("Performing update with writer: {}", writer); + + for (DataObjectUpdate singleUpdate : writersData) { + try { + writer.update(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx); + processedNodes.add(singleUpdate.getId()); + LOG.trace("Update successful for type: {}", writerType); + LOG.debug("Update successful for: {}", singleUpdate); + } catch (Exception e) { + // do not log this exception here,its logged in ModifiableDataTreeDelegator + + final Reverter reverter = attemptRevert + ? new ReverterImpl(processedNodes, updates, writersOrder) + : (final WriteContext writeContext) -> {};//NOOP reverter + + // Find out which changes left unprocessed + final Set> unprocessedChanges = updates.values().stream() + .map(DataObjectUpdate::getId) + .filter(id -> !processedNodes.contains(id)) + .collect(Collectors.toSet()); + throw new BulkUpdateException(unprocessedChanges, reverter, e); + } + } + } + } + + private void checkAllTypesCanBeHandled( + @Nonnull final Multimap, ? extends DataObjectUpdate> updates) { + if (!handledTypes.containsAll(updates.keySet())) { + final Sets.SetView> missingWriters = Sets.difference(updates.keySet(), handledTypes); + LOG.warn("Unable to process update. Missing writers for: {}", missingWriters); + throw new IllegalArgumentException("Unable to process update. Missing writers for: " + missingWriters); + } + } + + /** + * Check whether {@link SubtreeWriter} is affected by the updates. + * + * @return true if there are any updates to SubtreeWriter's child nodes (those marked by SubtreeWriter + * as being taken care of) + * */ + private static boolean isAffected(final SubtreeWriter writer, + final Multimap, ? extends DataObjectUpdate> updates) { + return !Sets.intersection(writer.getHandledChildTypes(), updates.keySet()).isEmpty(); + } + + @Nullable + private Writer getWriter(@Nonnull final InstanceIdentifier singleType) { + return writers.get(singleType); + } + + private final class ReverterImpl implements Reverter { + + private final Collection> processedNodes; + private final Multimap, ? extends DataObjectUpdate> updates; + private final Set> revertDeleteOrder; + + ReverterImpl(final Collection> processedNodes, + final Multimap, ? extends DataObjectUpdate> updates, + final Set> writersOrderOriginal) { + this.processedNodes = processedNodes; + this.updates = updates; + // Use opposite ordering when executing revert + this.revertDeleteOrder = writersOrderOriginal == FlatWriterRegistry.this.writersOrder + ? FlatWriterRegistry.this.writersOrderReversed + : FlatWriterRegistry.this.writersOrder; + } + + @Override + public void revert(@Nonnull final WriteContext writeContext) throws RevertFailedException { + checkNotNull(writeContext, "Cannot revert changes for null context"); + + Multimap, DataObjectUpdate> updatesToRevert = + filterAndRevertProcessed(updates, processedNodes); + + LOG.info("Attempting revert for changes: {}", updatesToRevert); + try { + // Perform reversed bulk update without revert attempt + bulkUpdate(updatesToRevert, writeContext, true, revertDeleteOrder); + LOG.info("Revert successful"); + } catch (BulkUpdateException e) { + LOG.error("Revert failed", e); + throw new RevertFailedException(e.getFailedIds(), e); + } + } + + /** + * Create new updates map, but only keep already processed changes. Switching before and after data for each + * update. + */ + private Multimap, DataObjectUpdate> filterAndRevertProcessed( + final Multimap, ? extends DataObjectUpdate> updates, + final Collection> processedNodes) { + final Multimap, DataObjectUpdate> filtered = HashMultimap.create(); + for (InstanceIdentifier processedNode : processedNodes) { + final InstanceIdentifier wildcardedIid = makeIidWildcarded(processedNode); + if (updates.containsKey(wildcardedIid)) { + updates.get(wildcardedIid).stream() + .filter(dataObjectUpdate -> processedNode.contains(dataObjectUpdate.getId())) + // putting under unkeyed identifier, to prevent failing of checkAllTypesCanBeHandled + .forEach(dataObjectUpdate -> filtered.put(wildcardedIid, dataObjectUpdate.reverse())); + } + } + return filtered; + } + } + +} -- cgit 1.2.3-korg