diff options
Diffstat (limited to 'infra/translate-impl/src/main')
-rw-r--r-- | infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java | 157 |
1 files changed, 53 insertions, 104 deletions
diff --git a/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java b/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java index e21297aa3..146ddb9c5 100644 --- a/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java +++ b/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java @@ -18,10 +18,8 @@ package io.fd.honeycomb.translate.impl.write.registry; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static io.fd.honeycomb.translate.util.RWUtils.makeIidWildcarded; import com.google.common.base.Optional; -import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -31,15 +29,15 @@ import io.fd.honeycomb.translate.TranslationException; import io.fd.honeycomb.translate.util.RWUtils; import io.fd.honeycomb.translate.write.DataObjectUpdate; import io.fd.honeycomb.translate.write.WriteContext; -import io.fd.honeycomb.translate.write.WriteFailedException; import io.fd.honeycomb.translate.write.Writer; +import io.fd.honeycomb.translate.write.registry.UpdateFailedException; import io.fd.honeycomb.translate.write.registry.WriterRegistry; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -66,9 +64,9 @@ final class FlatWriterRegistry implements WriterRegistry { /** * Create flat registry instance. * - * @param writers immutable, ordered map of writers to use to process updates. Order of the writers has to be - * one in which create and update operations should be handled. Deletes will be handled in reversed - * order. All deletes are handled before handling all the updates. + * @param writers immutable, ordered map of writers to use to process updates. Order of the writers has to be one in + * which create and update operations should be handled. Deletes will be handled in reversed order. + * All deletes are handled before handling all the updates. */ FlatWriterRegistry(@Nonnull final ImmutableMap<InstanceIdentifier<?>, Writer<?>> writers) { this.writers = writers; @@ -98,17 +96,22 @@ final class FlatWriterRegistry implements WriterRegistry { return; } - // Optimization + // ordered set of already processed nodes + final List<DataObjectUpdate> alreadyProcessed = new LinkedList<>(); + + // Optimization for single type updates, less consuming for pairing update with responsible writer,etc if (updates.containsOnlySingleType()) { // First process delete - singleUpdate(updates.getDeletes(), ctx); + singleUpdate(updates.getDeletes(), alreadyProcessed, ctx); + // Next is update - singleUpdate(updates.getUpdates(), ctx); + singleUpdate(updates.getUpdates(), alreadyProcessed, ctx); } else { // First process deletes - bulkUpdate(updates.getDeletes(), ctx, true, writersOrderReversed); + bulkUpdate(updates.getDeletes(), alreadyProcessed, ctx, writersOrderReversed); + // Next are updates - bulkUpdate(updates.getUpdates(), ctx, true, writersOrder); + bulkUpdate(updates.getUpdates(), alreadyProcessed, ctx, writersOrder); } LOG.debug("Update successful for types: {}", updates.getTypeIntersection()); @@ -119,19 +122,22 @@ final class FlatWriterRegistry implements WriterRegistry { public boolean writerSupportsUpdate(@Nonnull final InstanceIdentifier<?> type) { Writer writer = getWriter(type); - if(writer == null){ + if (writer == null) { writer = getSubtreeWriterResponsible(type); } return checkNotNull(writer, "Unable to find writer for %s", type).supportsDirectUpdate(); } - private void singleUpdate(@Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, - @Nonnull final WriteContext ctx) throws WriteFailedException { + private void singleUpdate( + @Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, + @Nonnull final List<DataObjectUpdate> alreadyProcessed, + @Nonnull final WriteContext ctx) throws UpdateFailedException { if (updates.isEmpty()) { return; } + DataObjectUpdate current = null; final InstanceIdentifier<?> singleType = updates.keySet().iterator().next(); LOG.debug("Performing single type update for: {}", singleType); Collection<? extends DataObjectUpdate> singleTypeUpdates = updates.get(singleType); @@ -145,9 +151,18 @@ final class FlatWriterRegistry implements WriterRegistry { singleTypeUpdates = getParentDataObjectUpdate(ctx, updates, writer); } - LOG.trace("Performing single type update with writer: {}", writer); - for (DataObjectUpdate singleUpdate : singleTypeUpdates) { - writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx); + try { + LOG.trace("Performing single type update with writer: {}", writer); + + for (DataObjectUpdate singleUpdate : singleTypeUpdates) { + current = singleUpdate; + writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(), + singleUpdate.getDataAfter(), + ctx); + alreadyProcessed.add(singleUpdate); + } + } catch (Exception e) { + throw new UpdateFailedException(e, alreadyProcessed, current); } } @@ -181,21 +196,20 @@ final class FlatWriterRegistry implements WriterRegistry { DataObjectUpdate.create(parentKeyedId, parentBefore.orNull(), parentAfter.orNull())); } - private void bulkUpdate(@Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, - @Nonnull final WriteContext ctx, - final boolean attemptRevert, - @Nonnull final Set<InstanceIdentifier<?>> writersOrder) throws BulkUpdateException { + private void bulkUpdate( + @Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, + @Nonnull final List<DataObjectUpdate> alreadyProcessed, + @Nonnull final WriteContext ctx, + @Nonnull final Set<InstanceIdentifier<?>> writersOrder) throws UpdateFailedException { if (updates.isEmpty()) { return; } - LOG.debug("Performing bulk update with revert attempt: {} for: {}", attemptRevert, updates.keySet()); - // Check that all updates can be handled checkAllTypesCanBeHandled(updates); - // Capture all changes successfully processed in case revert is needed - final Set<InstanceIdentifier<?>> processedNodes = new HashSet<>(); + LOG.debug("Performing bulk update for: {}", updates.keySet()); + DataObjectUpdate current = null; // Iterate over all writers and call update if there are any related updates for (InstanceIdentifier<?> writerType : writersOrder) { @@ -215,29 +229,20 @@ final class FlatWriterRegistry implements WriterRegistry { } } - LOG.debug("Performing update for: {}", writerType); + LOG.debug("Performing update for: {}", writerType); LOG.trace("Performing update with writer: {}", writer); for (DataObjectUpdate singleUpdate : writersData) { + current = singleUpdate; try { - writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx); - processedNodes.add(singleUpdate.getId()); - LOG.trace("Update successful for type: {}", writerType); - LOG.debug("Update successful for: {}", singleUpdate); + writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(), + singleUpdate.getDataAfter(), ctx); } catch (Exception e) { - // do not log this exception here,its logged in ModifiableDataTreeDelegator - - final Reverter reverter = attemptRevert - ? new ReverterImpl(processedNodes, updates, writersOrder) - : (final WriteContext writeContext) -> {};//NOOP reverter - - // Find out which changes left unprocessed - final Set<InstanceIdentifier<?>> unprocessedChanges = updates.values().stream() - .map(DataObjectUpdate::getId) - .filter(id -> !processedNodes.contains(id)) - .collect(Collectors.toSet()); - throw new BulkUpdateException(writerType, singleUpdate, unprocessedChanges, reverter, e); + throw new UpdateFailedException(e, alreadyProcessed, current); } + alreadyProcessed.add(singleUpdate); + LOG.trace("Update successful for type: {}", writerType); + LOG.debug("Update successful for: {}", singleUpdate); } } } @@ -254,11 +259,11 @@ final class FlatWriterRegistry implements WriterRegistry { /** * Check whether {@link SubtreeWriter} is affected by the updates. * - * @return true if there are any updates to SubtreeWriter's child nodes (those marked by SubtreeWriter - * as being taken care of) - * */ + * @return true if there are any updates to SubtreeWriter's child nodes (those marked by SubtreeWriter as being + * taken care of) + */ private static boolean isAffected(final SubtreeWriter<?> writer, - final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates) { + final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates) { return !Sets.intersection(writer.getHandledChildTypes(), updates.keySet()).isEmpty(); } @@ -267,60 +272,4 @@ final class FlatWriterRegistry implements WriterRegistry { return writers.get(singleType); } - private final class ReverterImpl implements Reverter { - - private final Collection<InstanceIdentifier<?>> processedNodes; - private final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates; - private final Set<InstanceIdentifier<?>> revertDeleteOrder; - - ReverterImpl(final Collection<InstanceIdentifier<?>> processedNodes, - final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, - final Set<InstanceIdentifier<?>> writersOrderOriginal) { - this.processedNodes = processedNodes; - this.updates = updates; - // Use opposite ordering when executing revert - this.revertDeleteOrder = writersOrderOriginal == FlatWriterRegistry.this.writersOrder - ? FlatWriterRegistry.this.writersOrderReversed - : FlatWriterRegistry.this.writersOrder; - } - - @Override - public void revert(@Nonnull final WriteContext writeContext) throws RevertFailedException { - checkNotNull(writeContext, "Cannot revert changes for null context"); - - Multimap<InstanceIdentifier<?>, DataObjectUpdate> updatesToRevert = - filterAndRevertProcessed(updates, processedNodes); - - LOG.info("Attempting revert for changes: {}", updatesToRevert); - try { - // Perform reversed bulk update without revert attempt - bulkUpdate(updatesToRevert, writeContext, true, revertDeleteOrder); - LOG.info("Revert successful"); - } catch (BulkUpdateException e) { - LOG.error("Revert failed", e); - throw new RevertFailedException(e); - } - } - - /** - * Create new updates map, but only keep already processed changes. Switching before and after data for each - * update. - */ - private Multimap<InstanceIdentifier<?>, DataObjectUpdate> filterAndRevertProcessed( - final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, - final Collection<InstanceIdentifier<?>> processedNodes) { - final Multimap<InstanceIdentifier<?>, DataObjectUpdate> filtered = HashMultimap.create(); - for (InstanceIdentifier<?> processedNode : processedNodes) { - final InstanceIdentifier<?> wildcardedIid = makeIidWildcarded(processedNode); - if (updates.containsKey(wildcardedIid)) { - updates.get(wildcardedIid).stream() - .filter(dataObjectUpdate -> processedNode.contains(dataObjectUpdate.getId())) - // putting under unkeyed identifier, to prevent failing of checkAllTypesCanBeHandled - .forEach(dataObjectUpdate -> filtered.put(wildcardedIid, dataObjectUpdate.reverse())); - } - } - return filtered; - } - } - } |