diff options
Diffstat (limited to 'infra/translate-impl')
2 files changed, 91 insertions, 173 deletions
diff --git a/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java b/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java index e21297aa3..146ddb9c5 100644 --- a/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java +++ b/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java @@ -18,10 +18,8 @@ package io.fd.honeycomb.translate.impl.write.registry; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static io.fd.honeycomb.translate.util.RWUtils.makeIidWildcarded; import com.google.common.base.Optional; -import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -31,15 +29,15 @@ import io.fd.honeycomb.translate.TranslationException; import io.fd.honeycomb.translate.util.RWUtils; import io.fd.honeycomb.translate.write.DataObjectUpdate; import io.fd.honeycomb.translate.write.WriteContext; -import io.fd.honeycomb.translate.write.WriteFailedException; import io.fd.honeycomb.translate.write.Writer; +import io.fd.honeycomb.translate.write.registry.UpdateFailedException; import io.fd.honeycomb.translate.write.registry.WriterRegistry; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import java.util.Set; -import java.util.stream.Collectors; import javax.annotation.Nonnull; import javax.annotation.Nullable; import javax.annotation.concurrent.ThreadSafe; @@ -66,9 +64,9 @@ final class FlatWriterRegistry implements WriterRegistry { /** * Create flat registry instance. * - * @param writers immutable, ordered map of writers to use to process updates. Order of the writers has to be - * one in which create and update operations should be handled. Deletes will be handled in reversed - * order. All deletes are handled before handling all the updates. + * @param writers immutable, ordered map of writers to use to process updates. Order of the writers has to be one in + * which create and update operations should be handled. Deletes will be handled in reversed order. + * All deletes are handled before handling all the updates. */ FlatWriterRegistry(@Nonnull final ImmutableMap<InstanceIdentifier<?>, Writer<?>> writers) { this.writers = writers; @@ -98,17 +96,22 @@ final class FlatWriterRegistry implements WriterRegistry { return; } - // Optimization + // ordered set of already processed nodes + final List<DataObjectUpdate> alreadyProcessed = new LinkedList<>(); + + // Optimization for single type updates, less consuming for pairing update with responsible writer,etc if (updates.containsOnlySingleType()) { // First process delete - singleUpdate(updates.getDeletes(), ctx); + singleUpdate(updates.getDeletes(), alreadyProcessed, ctx); + // Next is update - singleUpdate(updates.getUpdates(), ctx); + singleUpdate(updates.getUpdates(), alreadyProcessed, ctx); } else { // First process deletes - bulkUpdate(updates.getDeletes(), ctx, true, writersOrderReversed); + bulkUpdate(updates.getDeletes(), alreadyProcessed, ctx, writersOrderReversed); + // Next are updates - bulkUpdate(updates.getUpdates(), ctx, true, writersOrder); + bulkUpdate(updates.getUpdates(), alreadyProcessed, ctx, writersOrder); } LOG.debug("Update successful for types: {}", updates.getTypeIntersection()); @@ -119,19 +122,22 @@ final class FlatWriterRegistry implements WriterRegistry { public boolean writerSupportsUpdate(@Nonnull final InstanceIdentifier<?> type) { Writer writer = getWriter(type); - if(writer == null){ + if (writer == null) { writer = getSubtreeWriterResponsible(type); } return checkNotNull(writer, "Unable to find writer for %s", type).supportsDirectUpdate(); } - private void singleUpdate(@Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, - @Nonnull final WriteContext ctx) throws WriteFailedException { + private void singleUpdate( + @Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, + @Nonnull final List<DataObjectUpdate> alreadyProcessed, + @Nonnull final WriteContext ctx) throws UpdateFailedException { if (updates.isEmpty()) { return; } + DataObjectUpdate current = null; final InstanceIdentifier<?> singleType = updates.keySet().iterator().next(); LOG.debug("Performing single type update for: {}", singleType); Collection<? extends DataObjectUpdate> singleTypeUpdates = updates.get(singleType); @@ -145,9 +151,18 @@ final class FlatWriterRegistry implements WriterRegistry { singleTypeUpdates = getParentDataObjectUpdate(ctx, updates, writer); } - LOG.trace("Performing single type update with writer: {}", writer); - for (DataObjectUpdate singleUpdate : singleTypeUpdates) { - writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx); + try { + LOG.trace("Performing single type update with writer: {}", writer); + + for (DataObjectUpdate singleUpdate : singleTypeUpdates) { + current = singleUpdate; + writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(), + singleUpdate.getDataAfter(), + ctx); + alreadyProcessed.add(singleUpdate); + } + } catch (Exception e) { + throw new UpdateFailedException(e, alreadyProcessed, current); } } @@ -181,21 +196,20 @@ final class FlatWriterRegistry implements WriterRegistry { DataObjectUpdate.create(parentKeyedId, parentBefore.orNull(), parentAfter.orNull())); } - private void bulkUpdate(@Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, - @Nonnull final WriteContext ctx, - final boolean attemptRevert, - @Nonnull final Set<InstanceIdentifier<?>> writersOrder) throws BulkUpdateException { + private void bulkUpdate( + @Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, + @Nonnull final List<DataObjectUpdate> alreadyProcessed, + @Nonnull final WriteContext ctx, + @Nonnull final Set<InstanceIdentifier<?>> writersOrder) throws UpdateFailedException { if (updates.isEmpty()) { return; } - LOG.debug("Performing bulk update with revert attempt: {} for: {}", attemptRevert, updates.keySet()); - // Check that all updates can be handled checkAllTypesCanBeHandled(updates); - // Capture all changes successfully processed in case revert is needed - final Set<InstanceIdentifier<?>> processedNodes = new HashSet<>(); + LOG.debug("Performing bulk update for: {}", updates.keySet()); + DataObjectUpdate current = null; // Iterate over all writers and call update if there are any related updates for (InstanceIdentifier<?> writerType : writersOrder) { @@ -215,29 +229,20 @@ final class FlatWriterRegistry implements WriterRegistry { } } - LOG.debug("Performing update for: {}", writerType); + LOG.debug("Performing update for: {}", writerType); LOG.trace("Performing update with writer: {}", writer); for (DataObjectUpdate singleUpdate : writersData) { + current = singleUpdate; try { - writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx); - processedNodes.add(singleUpdate.getId()); - LOG.trace("Update successful for type: {}", writerType); - LOG.debug("Update successful for: {}", singleUpdate); + writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(), + singleUpdate.getDataAfter(), ctx); } catch (Exception e) { - // do not log this exception here,its logged in ModifiableDataTreeDelegator - - final Reverter reverter = attemptRevert - ? new ReverterImpl(processedNodes, updates, writersOrder) - : (final WriteContext writeContext) -> {};//NOOP reverter - - // Find out which changes left unprocessed - final Set<InstanceIdentifier<?>> unprocessedChanges = updates.values().stream() - .map(DataObjectUpdate::getId) - .filter(id -> !processedNodes.contains(id)) - .collect(Collectors.toSet()); - throw new BulkUpdateException(writerType, singleUpdate, unprocessedChanges, reverter, e); + throw new UpdateFailedException(e, alreadyProcessed, current); } + alreadyProcessed.add(singleUpdate); + LOG.trace("Update successful for type: {}", writerType); + LOG.debug("Update successful for: {}", singleUpdate); } } } @@ -254,11 +259,11 @@ final class FlatWriterRegistry implements WriterRegistry { /** * Check whether {@link SubtreeWriter} is affected by the updates. * - * @return true if there are any updates to SubtreeWriter's child nodes (those marked by SubtreeWriter - * as being taken care of) - * */ + * @return true if there are any updates to SubtreeWriter's child nodes (those marked by SubtreeWriter as being + * taken care of) + */ private static boolean isAffected(final SubtreeWriter<?> writer, - final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates) { + final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates) { return !Sets.intersection(writer.getHandledChildTypes(), updates.keySet()).isEmpty(); } @@ -267,60 +272,4 @@ final class FlatWriterRegistry implements WriterRegistry { return writers.get(singleType); } - private final class ReverterImpl implements Reverter { - - private final Collection<InstanceIdentifier<?>> processedNodes; - private final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates; - private final Set<InstanceIdentifier<?>> revertDeleteOrder; - - ReverterImpl(final Collection<InstanceIdentifier<?>> processedNodes, - final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, - final Set<InstanceIdentifier<?>> writersOrderOriginal) { - this.processedNodes = processedNodes; - this.updates = updates; - // Use opposite ordering when executing revert - this.revertDeleteOrder = writersOrderOriginal == FlatWriterRegistry.this.writersOrder - ? FlatWriterRegistry.this.writersOrderReversed - : FlatWriterRegistry.this.writersOrder; - } - - @Override - public void revert(@Nonnull final WriteContext writeContext) throws RevertFailedException { - checkNotNull(writeContext, "Cannot revert changes for null context"); - - Multimap<InstanceIdentifier<?>, DataObjectUpdate> updatesToRevert = - filterAndRevertProcessed(updates, processedNodes); - - LOG.info("Attempting revert for changes: {}", updatesToRevert); - try { - // Perform reversed bulk update without revert attempt - bulkUpdate(updatesToRevert, writeContext, true, revertDeleteOrder); - LOG.info("Revert successful"); - } catch (BulkUpdateException e) { - LOG.error("Revert failed", e); - throw new RevertFailedException(e); - } - } - - /** - * Create new updates map, but only keep already processed changes. Switching before and after data for each - * update. - */ - private Multimap<InstanceIdentifier<?>, DataObjectUpdate> filterAndRevertProcessed( - final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates, - final Collection<InstanceIdentifier<?>> processedNodes) { - final Multimap<InstanceIdentifier<?>, DataObjectUpdate> filtered = HashMultimap.create(); - for (InstanceIdentifier<?> processedNode : processedNodes) { - final InstanceIdentifier<?> wildcardedIid = makeIidWildcarded(processedNode); - if (updates.containsKey(wildcardedIid)) { - updates.get(wildcardedIid).stream() - .filter(dataObjectUpdate -> processedNode.contains(dataObjectUpdate.getId())) - // putting under unkeyed identifier, to prevent failing of checkAllTypesCanBeHandled - .forEach(dataObjectUpdate -> filtered.put(wildcardedIid, dataObjectUpdate.reverse())); - } - } - return filtered; - } - } - } diff --git a/infra/translate-impl/src/test/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistryTest.java b/infra/translate-impl/src/test/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistryTest.java index 72a91cb00..151436975 100644 --- a/infra/translate-impl/src/test/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistryTest.java +++ b/infra/translate-impl/src/test/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistryTest.java @@ -16,13 +16,12 @@ package io.fd.honeycomb.translate.impl.write.registry; -import static org.hamcrest.CoreMatchers.hasItem; -import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.Matchers.hasSize; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; @@ -41,7 +40,9 @@ import io.fd.honeycomb.translate.util.DataObjects.DataObject2; import io.fd.honeycomb.translate.write.DataObjectUpdate; import io.fd.honeycomb.translate.write.WriteContext; import io.fd.honeycomb.translate.write.Writer; +import io.fd.honeycomb.translate.write.registry.UpdateFailedException; import io.fd.honeycomb.translate.write.registry.WriterRegistry; +import java.util.List; import org.junit.Before; import org.junit.Test; import org.mockito.InOrder; @@ -185,7 +186,7 @@ public class FlatWriterRegistryTest { } @Test - public void testMultipleUpdatesOneFailing() throws Exception { + public void testMultipleUpdatesFirstFailing() throws Exception { final FlatWriterRegistry flatWriterRegistry = new FlatWriterRegistry(ImmutableMap.of(DataObject1.IID, writer1, DataObject2.IID, writer2)); @@ -199,59 +200,38 @@ public class FlatWriterRegistryTest { try { flatWriterRegistry.processModifications(new WriterRegistry.DataObjectUpdates(updates, ImmutableMultimap.of()), ctx); - fail("Bulk update should have failed on writer1"); - } catch (WriterRegistry.BulkUpdateException e) { - assertThat(e.getUnrevertedSubtrees(), hasSize(2)); - assertThat(e.getUnrevertedSubtrees(), hasItem(InstanceIdentifier.create(DataObject2.class))); - assertThat(e.getUnrevertedSubtrees(), hasItem(InstanceIdentifier.create(DataObject1.class))); + fail("Bulk update should have failed on writer1 with UpdateFailedException"); + } catch (UpdateFailedException e) { + assertThat(e.getProcessed(), hasSize(0));// very first update failed } } @Test - public void testMultipleUpdatesOneFailingThenRevertWithSuccess() throws Exception { + public void testMultipleUpdatesSecondFailing() throws Exception { final FlatWriterRegistry flatWriterRegistry = - new FlatWriterRegistry( - ImmutableMap.of(DataObject1.IID, writer1, DataObject2.IID, writer2, DataObjects.DataObject3.IID, writer3)); + new FlatWriterRegistry(ImmutableMap.of(DataObject1.IID, writer1, DataObject2.IID, writer2)); - // Writer1 always fails - doThrow(new RuntimeException()).when(writer3) + // Writer2 always fails + doThrow(new RuntimeException()).when(writer2) .processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class), any(WriteContext.class)); final Multimap<InstanceIdentifier<?>, DataObjectUpdate> updates = HashMultimap.create(); addUpdate(updates, DataObject1.class); - addUpdate(updates, DataObjects.DataObject3.class); - final InstanceIdentifier<DataObject2> iid2 = InstanceIdentifier.create(DataObject2.class); - final DataObject2 before2 = mock(DataObject2.class); - final DataObject2 after2 = mock(DataObject2.class); - updates.put(DataObject2.IID, DataObjectUpdate.create(iid2, before2, after2)); + addUpdate(updates, DataObject2.class); try { flatWriterRegistry.processModifications(new WriterRegistry.DataObjectUpdates(updates, ImmutableMultimap.of()), ctx); - fail("Bulk update should have failed on writer1"); - } catch (WriterRegistry.BulkUpdateException e) { - assertThat(e.getUnrevertedSubtrees().size(), is(1)); - - final InOrder inOrder = inOrder(writer1, writer2, writer3); - inOrder.verify(writer1) - .processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class), any(WriteContext.class)); - inOrder.verify(writer2) - .processModification(iid2, before2, after2, ctx); - inOrder.verify(writer3) - .processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class), any(WriteContext.class)); - - e.revertChanges(revertWriteContext); - // Revert changes. Successful updates are iterated in reverse - // also binding other write context,to verify if update context is not reused - inOrder.verify(writer2) - .processModification(iid2, after2, before2, revertWriteContext); - inOrder.verify(writer1) - .processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class), eq(revertWriteContext)); - verifyNoMoreInteractions(writer3); + fail("Bulk update should have failed on writer1 with UpdateFailedException"); + } catch (UpdateFailedException e) { + final List<DataObjectUpdate> alreadyProcessed = e.getProcessed(); + assertThat(alreadyProcessed, hasSize(1));// very first update failed + assertEquals(updateData(DataObject1.class, DataObject1.IID), + e.getProcessed().iterator().next()); } } @Test - public void testMultipleUpdatesOneFailingThenRevertWithFail() throws Exception { + public void testMultipleUpdatesLastFailing() throws Exception { final FlatWriterRegistry flatWriterRegistry = new FlatWriterRegistry( ImmutableMap.of(DataObject1.IID, writer1, DataObject2.IID, writer2, DataObjects.DataObject3.IID, writer3)); @@ -267,18 +247,12 @@ public class FlatWriterRegistryTest { try { flatWriterRegistry.processModifications(new WriterRegistry.DataObjectUpdates(updates, ImmutableMultimap.of()), ctx); - fail("Bulk update should have failed on writer1"); - } catch (WriterRegistry.BulkUpdateException e) { - // Writer1 always fails from now - doThrow(new RuntimeException()).when(writer1) - .processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class), any(WriteContext.class)); - try { - e.revertChanges(revertWriteContext); - } catch (WriterRegistry.Reverter.RevertFailedException e1) { - assertThat(e1.getNotRevertedChanges().size(), is(1)); - assertThat(e1.getNotRevertedChanges(), - hasItem(InstanceIdentifier.create(DataObject1.class))); - } + fail("Bulk update should have failed on writer1 with UpdateFailedException"); + } catch (UpdateFailedException e) { + final List<DataObjectUpdate> alreadyProcessed = e.getProcessed(); + assertEquals(2, alreadyProcessed.size()); + assertTrue(alreadyProcessed.contains(updateData(DataObject1.class, DataObject1.IID))); + assertTrue(alreadyProcessed.contains(updateData(DataObject2.class, DataObject2.IID))); } } @@ -289,7 +263,7 @@ public class FlatWriterRegistryTest { final FlatWriterRegistry flatWriterRegistry = new FlatWriterRegistry( - ImmutableMap.of(DataObject1.IID, writer1, DataObjects.DataObject1ChildK.IID,writer4)); + ImmutableMap.of(DataObject1.IID, writer1, DataObjects.DataObject1ChildK.IID, writer4)); // Writer1 always fails doThrow(new RuntimeException()).when(writer1) @@ -297,23 +271,13 @@ public class FlatWriterRegistryTest { any(WriteContext.class)); final Multimap<InstanceIdentifier<?>, DataObjectUpdate> updates = HashMultimap.create(); - addKeyedUpdate(updates,DataObjects.DataObject1ChildK.class); + addKeyedUpdate(updates, DataObjects.DataObject1ChildK.class); addUpdate(updates, DataObject1.class); try { flatWriterRegistry.processModifications(new WriterRegistry.DataObjectUpdates(updates, ImmutableMultimap.of()), ctx); - fail("Bulk update should have failed on writer1"); - } catch (WriterRegistry.BulkUpdateException e) { - // Writer1 always fails from now - doThrow(new RuntimeException()).when(writer1) - .processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class), - any(WriteContext.class)); - try { - e.revertChanges(revertWriteContext); - } catch (WriterRegistry.Reverter.RevertFailedException e1) { - assertThat(e1.getNotRevertedChanges().size(), is(1)); - assertThat(e1.getNotRevertedChanges(), - hasItem(InstanceIdentifier.create(DataObject1.class))); - } + fail("Bulk update should have failed on writer1 with UpdateFailedException"); + } catch (UpdateFailedException e) { + assertTrue(e.getProcessed().isEmpty()); } } @@ -325,8 +289,13 @@ public class FlatWriterRegistryTest { } private <D extends DataObject> void addUpdate(final Multimap<InstanceIdentifier<?>, DataObjectUpdate> updates, - final Class<D> type) throws Exception { + final Class<D> type) throws Exception { final InstanceIdentifier<D> iid = (InstanceIdentifier<D>) type.getDeclaredField("IID").get(null); - updates.put(iid, DataObjectUpdate.create(iid, mock(type), mock(type))); + updates.put(iid, updateData(type, iid)); + } + + private static <D extends DataObject> DataObjectUpdate updateData(final Class<D> type, + final InstanceIdentifier<D> iid) { + return DataObjectUpdate.create(iid, mock(type), mock(type)); } }
\ No newline at end of file |