summaryrefslogtreecommitdiffstats
path: root/infra/translate-impl/src
diff options
context:
space:
mode:
Diffstat (limited to 'infra/translate-impl/src')
-rw-r--r--infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java157
-rw-r--r--infra/translate-impl/src/test/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistryTest.java107
2 files changed, 91 insertions, 173 deletions
diff --git a/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java b/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java
index e21297aa3..146ddb9c5 100644
--- a/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java
+++ b/infra/translate-impl/src/main/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistry.java
@@ -18,10 +18,8 @@ package io.fd.honeycomb.translate.impl.write.registry;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
-import static io.fd.honeycomb.translate.util.RWUtils.makeIidWildcarded;
import com.google.common.base.Optional;
-import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
@@ -31,15 +29,15 @@ import io.fd.honeycomb.translate.TranslationException;
import io.fd.honeycomb.translate.util.RWUtils;
import io.fd.honeycomb.translate.write.DataObjectUpdate;
import io.fd.honeycomb.translate.write.WriteContext;
-import io.fd.honeycomb.translate.write.WriteFailedException;
import io.fd.honeycomb.translate.write.Writer;
+import io.fd.honeycomb.translate.write.registry.UpdateFailedException;
import io.fd.honeycomb.translate.write.registry.WriterRegistry;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
@@ -66,9 +64,9 @@ final class FlatWriterRegistry implements WriterRegistry {
/**
* Create flat registry instance.
*
- * @param writers immutable, ordered map of writers to use to process updates. Order of the writers has to be
- * one in which create and update operations should be handled. Deletes will be handled in reversed
- * order. All deletes are handled before handling all the updates.
+ * @param writers immutable, ordered map of writers to use to process updates. Order of the writers has to be one in
+ * which create and update operations should be handled. Deletes will be handled in reversed order.
+ * All deletes are handled before handling all the updates.
*/
FlatWriterRegistry(@Nonnull final ImmutableMap<InstanceIdentifier<?>, Writer<?>> writers) {
this.writers = writers;
@@ -98,17 +96,22 @@ final class FlatWriterRegistry implements WriterRegistry {
return;
}
- // Optimization
+ // ordered set of already processed nodes
+ final List<DataObjectUpdate> alreadyProcessed = new LinkedList<>();
+
+ // Optimization for single type updates, less consuming for pairing update with responsible writer,etc
if (updates.containsOnlySingleType()) {
// First process delete
- singleUpdate(updates.getDeletes(), ctx);
+ singleUpdate(updates.getDeletes(), alreadyProcessed, ctx);
+
// Next is update
- singleUpdate(updates.getUpdates(), ctx);
+ singleUpdate(updates.getUpdates(), alreadyProcessed, ctx);
} else {
// First process deletes
- bulkUpdate(updates.getDeletes(), ctx, true, writersOrderReversed);
+ bulkUpdate(updates.getDeletes(), alreadyProcessed, ctx, writersOrderReversed);
+
// Next are updates
- bulkUpdate(updates.getUpdates(), ctx, true, writersOrder);
+ bulkUpdate(updates.getUpdates(), alreadyProcessed, ctx, writersOrder);
}
LOG.debug("Update successful for types: {}", updates.getTypeIntersection());
@@ -119,19 +122,22 @@ final class FlatWriterRegistry implements WriterRegistry {
public boolean writerSupportsUpdate(@Nonnull final InstanceIdentifier<?> type) {
Writer writer = getWriter(type);
- if(writer == null){
+ if (writer == null) {
writer = getSubtreeWriterResponsible(type);
}
return checkNotNull(writer, "Unable to find writer for %s", type).supportsDirectUpdate();
}
- private void singleUpdate(@Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
- @Nonnull final WriteContext ctx) throws WriteFailedException {
+ private void singleUpdate(
+ @Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
+ @Nonnull final List<DataObjectUpdate> alreadyProcessed,
+ @Nonnull final WriteContext ctx) throws UpdateFailedException {
if (updates.isEmpty()) {
return;
}
+ DataObjectUpdate current = null;
final InstanceIdentifier<?> singleType = updates.keySet().iterator().next();
LOG.debug("Performing single type update for: {}", singleType);
Collection<? extends DataObjectUpdate> singleTypeUpdates = updates.get(singleType);
@@ -145,9 +151,18 @@ final class FlatWriterRegistry implements WriterRegistry {
singleTypeUpdates = getParentDataObjectUpdate(ctx, updates, writer);
}
- LOG.trace("Performing single type update with writer: {}", writer);
- for (DataObjectUpdate singleUpdate : singleTypeUpdates) {
- writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx);
+ try {
+ LOG.trace("Performing single type update with writer: {}", writer);
+
+ for (DataObjectUpdate singleUpdate : singleTypeUpdates) {
+ current = singleUpdate;
+ writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(),
+ singleUpdate.getDataAfter(),
+ ctx);
+ alreadyProcessed.add(singleUpdate);
+ }
+ } catch (Exception e) {
+ throw new UpdateFailedException(e, alreadyProcessed, current);
}
}
@@ -181,21 +196,20 @@ final class FlatWriterRegistry implements WriterRegistry {
DataObjectUpdate.create(parentKeyedId, parentBefore.orNull(), parentAfter.orNull()));
}
- private void bulkUpdate(@Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
- @Nonnull final WriteContext ctx,
- final boolean attemptRevert,
- @Nonnull final Set<InstanceIdentifier<?>> writersOrder) throws BulkUpdateException {
+ private void bulkUpdate(
+ @Nonnull final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
+ @Nonnull final List<DataObjectUpdate> alreadyProcessed,
+ @Nonnull final WriteContext ctx,
+ @Nonnull final Set<InstanceIdentifier<?>> writersOrder) throws UpdateFailedException {
if (updates.isEmpty()) {
return;
}
- LOG.debug("Performing bulk update with revert attempt: {} for: {}", attemptRevert, updates.keySet());
-
// Check that all updates can be handled
checkAllTypesCanBeHandled(updates);
- // Capture all changes successfully processed in case revert is needed
- final Set<InstanceIdentifier<?>> processedNodes = new HashSet<>();
+ LOG.debug("Performing bulk update for: {}", updates.keySet());
+ DataObjectUpdate current = null;
// Iterate over all writers and call update if there are any related updates
for (InstanceIdentifier<?> writerType : writersOrder) {
@@ -215,29 +229,20 @@ final class FlatWriterRegistry implements WriterRegistry {
}
}
- LOG.debug("Performing update for: {}", writerType);
+ LOG.debug("Performing update for: {}", writerType);
LOG.trace("Performing update with writer: {}", writer);
for (DataObjectUpdate singleUpdate : writersData) {
+ current = singleUpdate;
try {
- writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(), singleUpdate.getDataAfter(), ctx);
- processedNodes.add(singleUpdate.getId());
- LOG.trace("Update successful for type: {}", writerType);
- LOG.debug("Update successful for: {}", singleUpdate);
+ writer.processModification(singleUpdate.getId(), singleUpdate.getDataBefore(),
+ singleUpdate.getDataAfter(), ctx);
} catch (Exception e) {
- // do not log this exception here,its logged in ModifiableDataTreeDelegator
-
- final Reverter reverter = attemptRevert
- ? new ReverterImpl(processedNodes, updates, writersOrder)
- : (final WriteContext writeContext) -> {};//NOOP reverter
-
- // Find out which changes left unprocessed
- final Set<InstanceIdentifier<?>> unprocessedChanges = updates.values().stream()
- .map(DataObjectUpdate::getId)
- .filter(id -> !processedNodes.contains(id))
- .collect(Collectors.toSet());
- throw new BulkUpdateException(writerType, singleUpdate, unprocessedChanges, reverter, e);
+ throw new UpdateFailedException(e, alreadyProcessed, current);
}
+ alreadyProcessed.add(singleUpdate);
+ LOG.trace("Update successful for type: {}", writerType);
+ LOG.debug("Update successful for: {}", singleUpdate);
}
}
}
@@ -254,11 +259,11 @@ final class FlatWriterRegistry implements WriterRegistry {
/**
* Check whether {@link SubtreeWriter} is affected by the updates.
*
- * @return true if there are any updates to SubtreeWriter's child nodes (those marked by SubtreeWriter
- * as being taken care of)
- * */
+ * @return true if there are any updates to SubtreeWriter's child nodes (those marked by SubtreeWriter as being
+ * taken care of)
+ */
private static boolean isAffected(final SubtreeWriter<?> writer,
- final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates) {
+ final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates) {
return !Sets.intersection(writer.getHandledChildTypes(), updates.keySet()).isEmpty();
}
@@ -267,60 +272,4 @@ final class FlatWriterRegistry implements WriterRegistry {
return writers.get(singleType);
}
- private final class ReverterImpl implements Reverter {
-
- private final Collection<InstanceIdentifier<?>> processedNodes;
- private final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates;
- private final Set<InstanceIdentifier<?>> revertDeleteOrder;
-
- ReverterImpl(final Collection<InstanceIdentifier<?>> processedNodes,
- final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
- final Set<InstanceIdentifier<?>> writersOrderOriginal) {
- this.processedNodes = processedNodes;
- this.updates = updates;
- // Use opposite ordering when executing revert
- this.revertDeleteOrder = writersOrderOriginal == FlatWriterRegistry.this.writersOrder
- ? FlatWriterRegistry.this.writersOrderReversed
- : FlatWriterRegistry.this.writersOrder;
- }
-
- @Override
- public void revert(@Nonnull final WriteContext writeContext) throws RevertFailedException {
- checkNotNull(writeContext, "Cannot revert changes for null context");
-
- Multimap<InstanceIdentifier<?>, DataObjectUpdate> updatesToRevert =
- filterAndRevertProcessed(updates, processedNodes);
-
- LOG.info("Attempting revert for changes: {}", updatesToRevert);
- try {
- // Perform reversed bulk update without revert attempt
- bulkUpdate(updatesToRevert, writeContext, true, revertDeleteOrder);
- LOG.info("Revert successful");
- } catch (BulkUpdateException e) {
- LOG.error("Revert failed", e);
- throw new RevertFailedException(e);
- }
- }
-
- /**
- * Create new updates map, but only keep already processed changes. Switching before and after data for each
- * update.
- */
- private Multimap<InstanceIdentifier<?>, DataObjectUpdate> filterAndRevertProcessed(
- final Multimap<InstanceIdentifier<?>, ? extends DataObjectUpdate> updates,
- final Collection<InstanceIdentifier<?>> processedNodes) {
- final Multimap<InstanceIdentifier<?>, DataObjectUpdate> filtered = HashMultimap.create();
- for (InstanceIdentifier<?> processedNode : processedNodes) {
- final InstanceIdentifier<?> wildcardedIid = makeIidWildcarded(processedNode);
- if (updates.containsKey(wildcardedIid)) {
- updates.get(wildcardedIid).stream()
- .filter(dataObjectUpdate -> processedNode.contains(dataObjectUpdate.getId()))
- // putting under unkeyed identifier, to prevent failing of checkAllTypesCanBeHandled
- .forEach(dataObjectUpdate -> filtered.put(wildcardedIid, dataObjectUpdate.reverse()));
- }
- }
- return filtered;
- }
- }
-
}
diff --git a/infra/translate-impl/src/test/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistryTest.java b/infra/translate-impl/src/test/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistryTest.java
index 72a91cb00..151436975 100644
--- a/infra/translate-impl/src/test/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistryTest.java
+++ b/infra/translate-impl/src/test/java/io/fd/honeycomb/translate/impl/write/registry/FlatWriterRegistryTest.java
@@ -16,13 +16,12 @@
package io.fd.honeycomb.translate.impl.write.registry;
-import static org.hamcrest.CoreMatchers.hasItem;
-import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.Matchers.hasSize;
+import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
@@ -41,7 +40,9 @@ import io.fd.honeycomb.translate.util.DataObjects.DataObject2;
import io.fd.honeycomb.translate.write.DataObjectUpdate;
import io.fd.honeycomb.translate.write.WriteContext;
import io.fd.honeycomb.translate.write.Writer;
+import io.fd.honeycomb.translate.write.registry.UpdateFailedException;
import io.fd.honeycomb.translate.write.registry.WriterRegistry;
+import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
@@ -185,7 +186,7 @@ public class FlatWriterRegistryTest {
}
@Test
- public void testMultipleUpdatesOneFailing() throws Exception {
+ public void testMultipleUpdatesFirstFailing() throws Exception {
final FlatWriterRegistry flatWriterRegistry =
new FlatWriterRegistry(ImmutableMap.of(DataObject1.IID, writer1, DataObject2.IID, writer2));
@@ -199,59 +200,38 @@ public class FlatWriterRegistryTest {
try {
flatWriterRegistry.processModifications(new WriterRegistry.DataObjectUpdates(updates, ImmutableMultimap.of()), ctx);
- fail("Bulk update should have failed on writer1");
- } catch (WriterRegistry.BulkUpdateException e) {
- assertThat(e.getUnrevertedSubtrees(), hasSize(2));
- assertThat(e.getUnrevertedSubtrees(), hasItem(InstanceIdentifier.create(DataObject2.class)));
- assertThat(e.getUnrevertedSubtrees(), hasItem(InstanceIdentifier.create(DataObject1.class)));
+ fail("Bulk update should have failed on writer1 with UpdateFailedException");
+ } catch (UpdateFailedException e) {
+ assertThat(e.getProcessed(), hasSize(0));// very first update failed
}
}
@Test
- public void testMultipleUpdatesOneFailingThenRevertWithSuccess() throws Exception {
+ public void testMultipleUpdatesSecondFailing() throws Exception {
final FlatWriterRegistry flatWriterRegistry =
- new FlatWriterRegistry(
- ImmutableMap.of(DataObject1.IID, writer1, DataObject2.IID, writer2, DataObjects.DataObject3.IID, writer3));
+ new FlatWriterRegistry(ImmutableMap.of(DataObject1.IID, writer1, DataObject2.IID, writer2));
- // Writer1 always fails
- doThrow(new RuntimeException()).when(writer3)
+ // Writer2 always fails
+ doThrow(new RuntimeException()).when(writer2)
.processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class), any(WriteContext.class));
final Multimap<InstanceIdentifier<?>, DataObjectUpdate> updates = HashMultimap.create();
addUpdate(updates, DataObject1.class);
- addUpdate(updates, DataObjects.DataObject3.class);
- final InstanceIdentifier<DataObject2> iid2 = InstanceIdentifier.create(DataObject2.class);
- final DataObject2 before2 = mock(DataObject2.class);
- final DataObject2 after2 = mock(DataObject2.class);
- updates.put(DataObject2.IID, DataObjectUpdate.create(iid2, before2, after2));
+ addUpdate(updates, DataObject2.class);
try {
flatWriterRegistry.processModifications(new WriterRegistry.DataObjectUpdates(updates, ImmutableMultimap.of()), ctx);
- fail("Bulk update should have failed on writer1");
- } catch (WriterRegistry.BulkUpdateException e) {
- assertThat(e.getUnrevertedSubtrees().size(), is(1));
-
- final InOrder inOrder = inOrder(writer1, writer2, writer3);
- inOrder.verify(writer1)
- .processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class), any(WriteContext.class));
- inOrder.verify(writer2)
- .processModification(iid2, before2, after2, ctx);
- inOrder.verify(writer3)
- .processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class), any(WriteContext.class));
-
- e.revertChanges(revertWriteContext);
- // Revert changes. Successful updates are iterated in reverse
- // also binding other write context,to verify if update context is not reused
- inOrder.verify(writer2)
- .processModification(iid2, after2, before2, revertWriteContext);
- inOrder.verify(writer1)
- .processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class), eq(revertWriteContext));
- verifyNoMoreInteractions(writer3);
+ fail("Bulk update should have failed on writer1 with UpdateFailedException");
+ } catch (UpdateFailedException e) {
+ final List<DataObjectUpdate> alreadyProcessed = e.getProcessed();
+ assertThat(alreadyProcessed, hasSize(1));// very first update failed
+ assertEquals(updateData(DataObject1.class, DataObject1.IID),
+ e.getProcessed().iterator().next());
}
}
@Test
- public void testMultipleUpdatesOneFailingThenRevertWithFail() throws Exception {
+ public void testMultipleUpdatesLastFailing() throws Exception {
final FlatWriterRegistry flatWriterRegistry =
new FlatWriterRegistry(
ImmutableMap.of(DataObject1.IID, writer1, DataObject2.IID, writer2, DataObjects.DataObject3.IID, writer3));
@@ -267,18 +247,12 @@ public class FlatWriterRegistryTest {
try {
flatWriterRegistry.processModifications(new WriterRegistry.DataObjectUpdates(updates, ImmutableMultimap.of()), ctx);
- fail("Bulk update should have failed on writer1");
- } catch (WriterRegistry.BulkUpdateException e) {
- // Writer1 always fails from now
- doThrow(new RuntimeException()).when(writer1)
- .processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class), any(WriteContext.class));
- try {
- e.revertChanges(revertWriteContext);
- } catch (WriterRegistry.Reverter.RevertFailedException e1) {
- assertThat(e1.getNotRevertedChanges().size(), is(1));
- assertThat(e1.getNotRevertedChanges(),
- hasItem(InstanceIdentifier.create(DataObject1.class)));
- }
+ fail("Bulk update should have failed on writer1 with UpdateFailedException");
+ } catch (UpdateFailedException e) {
+ final List<DataObjectUpdate> alreadyProcessed = e.getProcessed();
+ assertEquals(2, alreadyProcessed.size());
+ assertTrue(alreadyProcessed.contains(updateData(DataObject1.class, DataObject1.IID)));
+ assertTrue(alreadyProcessed.contains(updateData(DataObject2.class, DataObject2.IID)));
}
}
@@ -289,7 +263,7 @@ public class FlatWriterRegistryTest {
final FlatWriterRegistry flatWriterRegistry =
new FlatWriterRegistry(
- ImmutableMap.of(DataObject1.IID, writer1, DataObjects.DataObject1ChildK.IID,writer4));
+ ImmutableMap.of(DataObject1.IID, writer1, DataObjects.DataObject1ChildK.IID, writer4));
// Writer1 always fails
doThrow(new RuntimeException()).when(writer1)
@@ -297,23 +271,13 @@ public class FlatWriterRegistryTest {
any(WriteContext.class));
final Multimap<InstanceIdentifier<?>, DataObjectUpdate> updates = HashMultimap.create();
- addKeyedUpdate(updates,DataObjects.DataObject1ChildK.class);
+ addKeyedUpdate(updates, DataObjects.DataObject1ChildK.class);
addUpdate(updates, DataObject1.class);
try {
flatWriterRegistry.processModifications(new WriterRegistry.DataObjectUpdates(updates, ImmutableMultimap.of()), ctx);
- fail("Bulk update should have failed on writer1");
- } catch (WriterRegistry.BulkUpdateException e) {
- // Writer1 always fails from now
- doThrow(new RuntimeException()).when(writer1)
- .processModification(any(InstanceIdentifier.class), any(DataObject.class), any(DataObject.class),
- any(WriteContext.class));
- try {
- e.revertChanges(revertWriteContext);
- } catch (WriterRegistry.Reverter.RevertFailedException e1) {
- assertThat(e1.getNotRevertedChanges().size(), is(1));
- assertThat(e1.getNotRevertedChanges(),
- hasItem(InstanceIdentifier.create(DataObject1.class)));
- }
+ fail("Bulk update should have failed on writer1 with UpdateFailedException");
+ } catch (UpdateFailedException e) {
+ assertTrue(e.getProcessed().isEmpty());
}
}
@@ -325,8 +289,13 @@ public class FlatWriterRegistryTest {
}
private <D extends DataObject> void addUpdate(final Multimap<InstanceIdentifier<?>, DataObjectUpdate> updates,
- final Class<D> type) throws Exception {
+ final Class<D> type) throws Exception {
final InstanceIdentifier<D> iid = (InstanceIdentifier<D>) type.getDeclaredField("IID").get(null);
- updates.put(iid, DataObjectUpdate.create(iid, mock(type), mock(type)));
+ updates.put(iid, updateData(type, iid));
+ }
+
+ private static <D extends DataObject> DataObjectUpdate updateData(final Class<D> type,
+ final InstanceIdentifier<D> iid) {
+ return DataObjectUpdate.create(iid, mock(type), mock(type));
}
} \ No newline at end of file