diff --git a/alerting/alerting-core/pom.xml b/alerting/alerting-core/pom.xml index 883fa59dc..44936ab28 100644 --- a/alerting/alerting-core/pom.xml +++ b/alerting/alerting-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol alerting - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT @@ -35,7 +35,7 @@ uk.co.gresearch.siembol siembol-common - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT junit diff --git a/alerting/alerting-spark/pom.xml b/alerting/alerting-spark/pom.xml index b4cdaf808..a07cc1631 100644 --- a/alerting/alerting-spark/pom.xml +++ b/alerting/alerting-spark/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol alerting - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT @@ -23,7 +23,7 @@ uk.co.gresearch.siembol alerting-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT jackson-databind diff --git a/alerting/alerting-storm/pom.xml b/alerting/alerting-storm/pom.xml index f2171b131..d40de696e 100644 --- a/alerting/alerting-storm/pom.xml +++ b/alerting/alerting-storm/pom.xml @@ -9,13 +9,13 @@ uk.co.gresearch.siembol alerting - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT com.google.guava guava - 23.0 + ${storm_guava_version} com.fasterxml.jackson.core @@ -51,7 +51,7 @@ uk.co.gresearch.siembol alerting-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.slf4j diff --git a/alerting/pom.xml b/alerting/pom.xml index abf033785..1ac998502 100644 --- a/alerting/pom.xml +++ b/alerting/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT alerting-core diff --git a/config-editor/config-editor-core/pom.xml b/config-editor/config-editor-core/pom.xml index e0285bcb7..da6fd4d23 100644 --- a/config-editor/config-editor-core/pom.xml +++ b/config-editor/config-editor-core/pom.xml @@ -9,13 +9,13 @@ uk.co.gresearch.siembol config-editor - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol siembol-common - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.apache.commons diff --git a/config-editor/config-editor-rest/pom.xml b/config-editor/config-editor-rest/pom.xml index 34c16ff50..accc389f0 100644 --- a/config-editor/config-editor-rest/pom.xml +++ b/config-editor/config-editor-rest/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol config-editor - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT @@ -56,7 +56,7 @@ uk.co.gresearch.siembol siembol-common - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.slf4j @@ -67,22 +67,22 @@ uk.co.gresearch.siembol config-editor-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol config-editor-services - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol config-editor-sync - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.slf4j @@ -93,7 +93,7 @@ uk.co.gresearch.siembol parsing-app - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.slf4j @@ -104,7 +104,7 @@ uk.co.gresearch.siembol enriching-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.slf4j @@ -115,7 +115,7 @@ uk.co.gresearch.siembol responding-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.slf4j diff --git a/config-editor/config-editor-services/pom.xml b/config-editor/config-editor-services/pom.xml index ea0954e00..70b7052d8 100644 --- a/config-editor/config-editor-services/pom.xml +++ b/config-editor/config-editor-services/pom.xml @@ -10,53 +10,53 @@ uk.co.gresearch.siembol config-editor - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol siembol-common - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol config-editor-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol alerting-storm - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol parsing-storm - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol enriching-storm - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol parsing-app - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol enriching-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol responding-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT junit diff --git a/config-editor/config-editor-sync/pom.xml b/config-editor/config-editor-sync/pom.xml index 41af80403..d28d36368 100644 --- a/config-editor/config-editor-sync/pom.xml +++ b/config-editor/config-editor-sync/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol config-editor - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT @@ -20,17 +20,17 @@ uk.co.gresearch.siembol siembol-common - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol config-editor-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol parsing-app - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT provided diff --git a/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/service/EnrichmentTablesProviderImpl.java b/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/service/EnrichmentTablesProviderImpl.java index 23e92d2a7..ea19f3f33 100644 --- a/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/service/EnrichmentTablesProviderImpl.java +++ b/config-editor/config-editor-sync/src/main/java/uk/co/gresearch/siembol/configeditor/sync/service/EnrichmentTablesProviderImpl.java @@ -35,6 +35,12 @@ public class EnrichmentTablesProviderImpl implements EnrichmentTablesProvider { "in the service: {}, enrichment tables value: {}"; private static final String ADD_NEW_EXISTING_TABLE_MSG = "Table with name %s already exists"; private static final String UPDATE_NON_EXISTING_TABLE_MSG = "Table with name %s does not exist"; + private static final String UPDATE_TABLE_INIT_LOG = + "Trying to update enrichment table - name: {}, path: {}, service: {}"; + private static final String UPDATE_TABLE_COMPLETED_LOG = + "Updating enrichment table completed - name: {}, path: {}, service: {}"; + private static final String UPDATE_TABLE_EXCEPTION_LOG = + "Exception {} during updating enrichment table - name: {}, path: {}, service: {}"; private final Map zooKeeperConnectorMap; private final Map> enrichmentTablesCache; @@ -115,6 +121,7 @@ private ConfigEditorResult checkServiceName(String name) { private ConfigEditorResult updateEnrichmentTableInternally(String serviceName, EnrichmentTableDto enrichmentTable, boolean isNewTable) { + LOGGER.info(UPDATE_TABLE_INIT_LOG, enrichmentTable.getName(), enrichmentTable.getPath(), serviceName); if (enrichmentTable.getName() == null || enrichmentTable.getPath() == null) { return ConfigEditorResult.fromMessage(BAD_REQUEST, WRONG_TABLE_TO_UPDATE); } @@ -149,8 +156,11 @@ private ConfigEditorResult updateEnrichmentTableInternally(String serviceName, E String updatedTablesStr = ENRICHMENT_TABLES_UPDATE_MSG_WRITER.writeValueAsString(currentTables); zooKeeperConnectorMap.get(serviceName).setData(updatedTablesStr); enrichmentTablesCache.get(serviceName).set(updatedTablesStr); + LOGGER.info(UPDATE_TABLE_COMPLETED_LOG, enrichmentTable.getName(), enrichmentTable.getPath(), serviceName); return ConfigEditorResult.fromEnrichmentTables(currentTables.getEnrichmentTables()); } catch (Exception e) { + LOGGER.error(UPDATE_TABLE_EXCEPTION_LOG, + e, enrichmentTable.getName(), enrichmentTable.getPath(), serviceName); return ConfigEditorResult.fromException(e); } } diff --git a/config-editor/pom.xml b/config-editor/pom.xml index 814ae658d..41f40345c 100644 --- a/config-editor/pom.xml +++ b/config-editor/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT config-editor-core diff --git a/deployment/storm-topology-manager/pom.xml b/deployment/storm-topology-manager/pom.xml index 454340812..030fc019d 100644 --- a/deployment/storm-topology-manager/pom.xml +++ b/deployment/storm-topology-manager/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol siembol - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT ../../pom.xml @@ -43,7 +43,7 @@ uk.co.gresearch.siembol siembol-common - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.slf4j diff --git a/docs/introduction/how-tos/how_to_contribute.md b/docs/introduction/how-tos/how_to_contribute.md index bf3fc48c6..c37e4600c 100644 --- a/docs/introduction/how-tos/how_to_contribute.md +++ b/docs/introduction/how-tos/how_to_contribute.md @@ -7,7 +7,7 @@ How to contribute to the siembol Java project ### Environment - [Maven](https://maven.apache.org/guides/) - version `3.5+` -- [Java Development Kit 8](https://jdk.java.net/) +- [Java Development Kit 13+](https://adoptopenjdk.net/) ### How to compile and install diff --git a/enriching/enriching-core/pom.xml b/enriching/enriching-core/pom.xml index 3d57f1e11..1d1a7ece7 100644 --- a/enriching/enriching-core/pom.xml +++ b/enriching/enriching-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol enriching - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT @@ -35,12 +35,12 @@ uk.co.gresearch.siembol siembol-common - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT junit diff --git a/enriching/enriching-storm/pom.xml b/enriching/enriching-storm/pom.xml index 956459c6b..68fea121c 100644 --- a/enriching/enriching-storm/pom.xml +++ b/enriching/enriching-storm/pom.xml @@ -9,13 +9,13 @@ uk.co.gresearch.siembol enriching - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT com.google.guava guava - 23.0 + ${storm_guava_version} com.fasterxml.jackson.core @@ -67,7 +67,7 @@ uk.co.gresearch.siembol enriching-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.slf4j @@ -256,10 +256,7 @@ maven-compiler-plugin 3.8.1 - true - ${java_version} -Xlint:unchecked - ${java_version} true diff --git a/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/MemoryTableEnrichmentBolt.java b/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/MemoryTableEnrichmentBolt.java index d75f0e2b7..32b4901c2 100644 --- a/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/MemoryTableEnrichmentBolt.java +++ b/enriching/enriching-storm/src/main/java/uk/co/gresearch/siembol/enrichments/storm/MemoryTableEnrichmentBolt.java @@ -5,6 +5,7 @@ import com.fasterxml.jackson.databind.ObjectReader; import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; @@ -34,9 +35,8 @@ import java.lang.invoke.MethodHandles; import java.util.List; import java.util.Map; -import java.util.HashMap; import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.ConcurrentHashMap; public class MemoryTableEnrichmentBolt extends BaseRichBolt { private static final long serialVersionUID = 1L; @@ -51,14 +51,16 @@ public class MemoryTableEnrichmentBolt extends BaseRichBolt { private static final String TABLES_UPDATES_COMPLETED = "Updating enrichment tables completed"; private static final String TABLES_UPDATE_MESSAGE_FORMAT = "Updating enrichment tables: %s"; private static final String TABLES_UPDATE_EXCEPTION_FORMAT = "Exception during update of enrichment tables: {}"; + private static final String TABLE_UPDATE_EXCEPTION_FORMAT = "Exception during update of an enrichment table: {} " + + "path: {}, exception : {}"; private static final String TABLE_INIT_START = "Trying to initialise enrichment table: {} from the file: {}"; private static final String TABLE_INIT_COMPLETED = "Initialisation of enrichment table: {} completed"; private static final String TABLES_UPDATE_EMPTY_TABLES = "No enrichment tables provided"; private static final String INIT_EXCEPTION_MSG_FORMAT = "Exception during loading memory table: %s"; private static final String INVALID_TYPE_IN_TUPLES = "Invalid type in tuple provided"; - private final AtomicReference> enrichmentTables = new AtomicReference<>(); - private final ZooKeeperAttributesDto zooKeeeperAttributes; + private final ConcurrentHashMap> enrichmentTables = new ConcurrentHashMap<>(); + private final ZooKeeperAttributesDto zooKeeperAttributesDto; private final ZooKeeperConnectorFactory zooKeeperConnectorFactory; private final SiembolFileSystemFactory fileSystemFactory; @@ -68,7 +70,7 @@ public class MemoryTableEnrichmentBolt extends BaseRichBolt { MemoryTableEnrichmentBolt(StormEnrichmentAttributesDto attributes, ZooKeeperConnectorFactory zooKeeperConnectorFactory, SiembolFileSystemFactory fileSystemFactory) { - this.zooKeeeperAttributes = attributes.getEnrichingTablesAttributes(); + this.zooKeeperAttributesDto = attributes.getEnrichingTablesAttributes(); this.zooKeeperConnectorFactory = zooKeeperConnectorFactory; this.fileSystemFactory = fileSystemFactory; } @@ -85,12 +87,11 @@ public void prepare(Map map, TopologyContext topologyContext, OutputCollector ou try { LOG.info(TABLES_INIT_START); - zooKeeperConnector = zooKeeperConnectorFactory.createZookeeperConnector(zooKeeeperAttributes); + zooKeeperConnector = zooKeeperConnectorFactory.createZookeeperConnector(zooKeeperAttributesDto); updateTables(); - if (enrichmentTables.get() == null) { - LOG.error(TABLES_UPDATE_EMPTY_TABLES); - throw new IllegalStateException(TABLES_UPDATE_EMPTY_TABLES); + if (enrichmentTables.isEmpty()) { + LOG.warn(TABLES_UPDATE_EMPTY_TABLES); } zooKeeperConnector.addCacheListener(this::updateTables); @@ -108,21 +109,32 @@ private void updateTables() { String tablesUpdateStr = zooKeeperConnector.getData(); LOG.info(String.format(TABLES_UPDATE_MESSAGE_FORMAT, tablesUpdateStr)); - Map tables = new HashMap<>(); EnrichmentTablesUpdateDto enrichmentTablesUpdateDto = TABLES_UPDATE_READER.readValue(tablesUpdateStr); try (SiembolFileSystem fs = fileSystemFactory.create()) { - for (EnrichmentTableDto table : enrichmentTablesUpdateDto.getEnrichmentTables()) { + for (EnrichmentTableDto table : enrichmentTablesUpdateDto.getEnrichmentTables()) { + var currentTablePair = enrichmentTables.get(table.getName()); + if (currentTablePair != null && currentTablePair.getLeft().equals(table.getPath())) { + //NOTE: we already have a table loaded from the same path + continue; + } + LOG.info(TABLE_INIT_START, table.getName(), table.getPath()); try (InputStream is = fs.openInputStream(table.getPath())) { - tables.put(table.getName(), EnrichmentMemoryTable.fromJsonStream(is)); + var memoryTable = EnrichmentMemoryTable.fromJsonStream(is); + enrichmentTables.put(table.getName(), ImmutablePair.of(table.getPath(), memoryTable)); + LOG.info(TABLE_INIT_COMPLETED, table.getName()); + } catch (Exception e) { + LOG.error(TABLE_UPDATE_EXCEPTION_FORMAT, + table.getName(), + table.getPath(), + ExceptionUtils.getStackTrace(e)); } - LOG.info(TABLE_INIT_COMPLETED, table.getName()); } } - enrichmentTables.set(tables); LOG.info(TABLES_UPDATES_COMPLETED); } catch (Exception e) { LOG.error(TABLES_UPDATE_EXCEPTION_FORMAT, ExceptionUtils.getStackTrace(e)); + throw new IllegalStateException(e); } } @@ -145,14 +157,14 @@ public void execute(Tuple tuple) { EnrichmentExceptions exceptions = (EnrichmentExceptions)exceptionsObj; EnrichmentPairs enrichments = new EnrichmentPairs(); - Map currentTables = enrichmentTables.get(); + for (EnrichmentCommand command : commands) { - EnrichmentTable table = currentTables.get(command.getTableName()); - if (table == null) { + var tablePair = enrichmentTables.get(command.getTableName()); + if (tablePair == null) { continue; } - Optional>> result = table.getValues(command); + Optional>> result = tablePair.getRight().getValues(command); result.ifPresent(enrichments::addAll); } collector.emit(tuple, new Values(event, enrichments, exceptions)); diff --git a/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/MemoryTableEnrichmentBoltTest.java b/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/MemoryTableEnrichmentBoltTest.java index 37b8bfead..833d55bdd 100644 --- a/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/MemoryTableEnrichmentBoltTest.java +++ b/enriching/enriching-storm/src/test/java/uk/co/gresearch/siembol/enrichments/storm/MemoryTableEnrichmentBoltTest.java @@ -1,6 +1,5 @@ package uk.co.gresearch.siembol.enrichments.storm; - import org.apache.commons.lang3.tuple.Pair; import org.apache.storm.task.OutputCollector; import org.apache.storm.tuple.Tuple; @@ -14,17 +13,18 @@ import uk.co.gresearch.siembol.common.filesystem.SiembolFileSystemFactory; import uk.co.gresearch.siembol.common.model.StormEnrichmentAttributesDto; import uk.co.gresearch.siembol.common.model.ZooKeeperAttributesDto; -import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnectorFactory; -import uk.co.gresearch.siembol.common.zookeeper.ZooKeeperConnector; +import uk.co.gresearch.siembol.common.testing.TestingZooKeeperConnectorFactory; import uk.co.gresearch.siembol.enrichments.common.EnrichmentCommand; import uk.co.gresearch.siembol.enrichments.storm.common.*; import java.io.ByteArrayInputStream; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.when; public class MemoryTableEnrichmentBoltTest { @@ -37,7 +37,35 @@ public class MemoryTableEnrichmentBoltTest { "enrichment_tables" : [ { "name" : "test_table", - "path": "/siembol/tables/enrichment/test.json" + "path": "/siembol/tables/enrichment/test.json" + }] + } + """; + + private final String tablesUpdateSame = """ + { + "enrichment_tables" : [ + { + "name" : "test_table", + "path": "/siembol/tables/enrichment/test.json" + }, + { + "name" : "test_table_2", + "path": "/siembol/tables/enrichment/test2.json" + }] + } + """; + + private final String tablesUpdateDifferent = """ + { + "enrichment_tables" : [ + { + "name" : "test_table", + "path": "/siembol/tables/enrichment/different_test.json" + }, + { + "name" : "test_table_2", + "path": "/siembol/tables/enrichment/test2.json" }] } """; @@ -56,40 +84,44 @@ public class MemoryTableEnrichmentBoltTest { private OutputCollector collector; private EnrichmentExceptions exceptions; private EnrichmentCommands commands; - MemoryTableEnrichmentBolt memoryTableBolt; - ZooKeeperAttributesDto zookeperAttributes; - StormEnrichmentAttributesDto attributes; - ZooKeeperConnector zooKeeperConnector; - ZooKeeperConnectorFactory zooKeeperConnectorFactory; - SiembolFileSystemFactory fileSystemFactory; - SiembolFileSystem fileSystem; - ArgumentCaptor argumentEmitCaptor; + private MemoryTableEnrichmentBolt memoryTableBolt; + private final String zooKeeperPath = "path"; + private ZooKeeperAttributesDto zooKeeperAttributes; + private StormEnrichmentAttributesDto attributes; + private final TestingZooKeeperConnectorFactory zooKeeperConnectorFactory = new TestingZooKeeperConnectorFactory(); + private SiembolFileSystemFactory fileSystemFactory; + private SiembolFileSystem fileSystem; + private ArgumentCaptor argumentEmitCaptor; @Before public void setUp() throws Exception { - zookeperAttributes = new ZooKeeperAttributesDto(); + zooKeeperAttributes = new ZooKeeperAttributesDto(); + zooKeeperAttributes.setZkPath(zooKeeperPath); + zooKeeperConnectorFactory.setData(zooKeeperPath, tablesUpdate); attributes = new StormEnrichmentAttributesDto(); - attributes.setEnrichingTablesAttributes(zookeperAttributes); + attributes.setEnrichingTablesAttributes(zooKeeperAttributes); exceptions = new EnrichmentExceptions(); commands = new EnrichmentCommands(); tuple = Mockito.mock(Tuple.class); collector = Mockito.mock(OutputCollector.class); argumentEmitCaptor = ArgumentCaptor.forClass(Values.class); - zooKeeperConnectorFactory = Mockito.mock(ZooKeeperConnectorFactory.class); fileSystemFactory = Mockito.mock(SiembolFileSystemFactory.class); fileSystem = Mockito.mock(SiembolFileSystem.class); - zooKeeperConnector = Mockito.mock(ZooKeeperConnector.class); - when(zooKeeperConnectorFactory.createZookeeperConnector(zookeperAttributes)).thenReturn(zooKeeperConnector); when(fileSystemFactory.create()).thenReturn(fileSystem); when(tuple.getStringByField(eq(EnrichmentTuples.EVENT.toString()))).thenReturn(event); when(tuple.getValueByField(eq(EnrichmentTuples.COMMANDS.toString()))).thenReturn(commands); when(tuple.getValueByField(eq(EnrichmentTuples.EXCEPTIONS.toString()))).thenReturn(exceptions); when(collector.emit(eq(tuple), argumentEmitCaptor.capture())).thenReturn(new ArrayList<>()); - when(zooKeeperConnector.getData()).thenReturn(tablesUpdate); - when(fileSystem.openInputStream(anyString())).thenReturn(new ByteArrayInputStream(simpleOneField.getBytes())); + + when(fileSystem.openInputStream(eq("/siembol/tables/enrichment/test.json"))) + .thenReturn(new ByteArrayInputStream(simpleOneField.getBytes())); + when(fileSystem.openInputStream(eq("/siembol/tables/enrichment/test2.json"))) + .thenReturn(new ByteArrayInputStream(simpleOneField.getBytes())); + when(fileSystem.openInputStream(eq("/siembol/tables/enrichment/different_test.json"))) + .thenReturn(new ByteArrayInputStream(simpleOneField.getBytes())); memoryTableBolt = new MemoryTableEnrichmentBolt(attributes, zooKeeperConnectorFactory, fileSystemFactory); memoryTableBolt.prepare(null, null, collector); @@ -182,4 +214,67 @@ public void testEmptyExceptionsCommandNoMatch() { Assert.assertTrue(((EnrichmentPairs) values.get(1)).isEmpty()); Assert.assertTrue(((EnrichmentExceptions) values.get(2)).isEmpty()); } + + @Test + public void updateTablesWithSame() throws Exception { + zooKeeperConnectorFactory.getZooKeeperConnector(zooKeeperPath).setData(tablesUpdateSame); + Mockito.verify(fileSystem, times(1)) + .openInputStream(eq("/siembol/tables/enrichment/test.json")); + Mockito.verify(fileSystem, times(1)) + .openInputStream(eq("/siembol/tables/enrichment/test2.json")); + } + + @Test + public void updateTablesWithDifferent() throws Exception { + zooKeeperConnectorFactory.getZooKeeperConnector(zooKeeperPath).setData(tablesUpdateDifferent); + Mockito.verify(fileSystem, times(1)) + .openInputStream(eq("/siembol/tables/enrichment/test.json")); + Mockito.verify(fileSystem, times(1)) + .openInputStream(eq("/siembol/tables/enrichment/different_test.json")); + Mockito.verify(fileSystem, times(1)) + .openInputStream(eq("/siembol/tables/enrichment/test2.json")); + } + + @Test + public void updateTablesEmpty() throws IOException { + fileSystem = Mockito.mock(SiembolFileSystem.class); + when(fileSystemFactory.create()).thenReturn(fileSystem); + when(fileSystem.openInputStream(anyString())).thenReturn(new ByteArrayInputStream(simpleOneField.getBytes())); + zooKeeperConnectorFactory.setData(zooKeeperPath, "{}"); + memoryTableBolt = new MemoryTableEnrichmentBolt(attributes, zooKeeperConnectorFactory, fileSystemFactory); + memoryTableBolt.prepare(null, null, collector); + Mockito.verify(fileSystem, times(0)).openInputStream(anyString()); + } + + @Test(expected = IllegalStateException.class) + public void prepareInvalidAttributes() throws IOException { + attributes.setEnrichingTablesAttributes(null); + fileSystem = Mockito.mock(SiembolFileSystem.class); + when(fileSystemFactory.create()).thenReturn(fileSystem); + when(fileSystem.openInputStream(anyString())).thenReturn(new ByteArrayInputStream(simpleOneField.getBytes())); + memoryTableBolt = new MemoryTableEnrichmentBolt(attributes, zooKeeperConnectorFactory, fileSystemFactory); + memoryTableBolt.prepare(null, null, collector); + Mockito.verify(fileSystem, times(0)).openInputStream(anyString()); + } + + @Test + public void updateTablesWithDifferentAndException() throws Exception { + when(fileSystem.openInputStream(eq("/siembol/tables/enrichment/different_test.json"))) + .thenThrow(new IllegalArgumentException()); + zooKeeperConnectorFactory.getZooKeeperConnector(zooKeeperPath).setData(tablesUpdateDifferent); + + Mockito.verify(fileSystem, times(1)) + .openInputStream(eq("/siembol/tables/enrichment/test.json")); + Mockito.verify(fileSystem, times(1)) + .openInputStream(eq("/siembol/tables/enrichment/different_test.json")); + Mockito.verify(fileSystem, times(1)) + .openInputStream(eq("/siembol/tables/enrichment/test2.json")); + } + + @Test(expected = RuntimeException.class) + public void updateTablesInvalid() throws Exception { + Mockito.verify(fileSystem, times(1)) + .openInputStream(eq("/siembol/tables/enrichment/test.json")); + zooKeeperConnectorFactory.getZooKeeperConnector(zooKeeperPath).setData("INVALID"); + } } diff --git a/enriching/pom.xml b/enriching/pom.xml index 7ab8fda77..cded28afb 100644 --- a/enriching/pom.xml +++ b/enriching/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT enriching-core diff --git a/parsing/parsing-app/pom.xml b/parsing/parsing-app/pom.xml index 439d8705b..ec426ff33 100644 --- a/parsing/parsing-app/pom.xml +++ b/parsing/parsing-app/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol parsing - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT @@ -39,12 +39,12 @@ uk.co.gresearch.siembol siembol-common - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol parsing-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT junit diff --git a/parsing/parsing-core/pom.xml b/parsing/parsing-core/pom.xml index fe2426094..4820eb006 100644 --- a/parsing/parsing-core/pom.xml +++ b/parsing/parsing-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol parsing - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT @@ -45,7 +45,7 @@ uk.co.gresearch.siembol siembol-common - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT joda-time diff --git a/parsing/parsing-storm/pom.xml b/parsing/parsing-storm/pom.xml index 3633d872c..d0a8be936 100644 --- a/parsing/parsing-storm/pom.xml +++ b/parsing/parsing-storm/pom.xml @@ -9,13 +9,13 @@ uk.co.gresearch.siembol parsing - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT com.google.guava guava - 23.0 + ${storm_guava_version} com.fasterxml.jackson.core @@ -67,7 +67,7 @@ uk.co.gresearch.siembol parsing-app - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.slf4j diff --git a/parsing/pom.xml b/parsing/pom.xml index 503da072e..eb5851f85 100644 --- a/parsing/pom.xml +++ b/parsing/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT parsing-core diff --git a/pom.xml b/pom.xml index 05b3c11ed..339fe80d9 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ uk.co.gresearch.siembol siembol siembol - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT A scalable, advanced security analytics framework based on open-source big data technologies. 2019 https://siembol.io/ @@ -56,6 +56,7 @@ 1.2.14 2.5.6 31.0.1-jre + 24.1.1-jre 1.5.12 1.9.4 0.1.2 @@ -114,7 +115,6 @@ maven-compiler-plugin ${maven_compiler_version} - true -Xlint:all -Xlint:-processing diff --git a/responding/pom.xml b/responding/pom.xml index 6396ac0fb..aa613bdb1 100644 --- a/responding/pom.xml +++ b/responding/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol siembol - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT responding-core diff --git a/responding/responding-core/pom.xml b/responding/responding-core/pom.xml index dfc72040e..16b07e2ab 100644 --- a/responding/responding-core/pom.xml +++ b/responding/responding-core/pom.xml @@ -11,7 +11,7 @@ uk.co.gresearch.siembol responding - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT @@ -35,12 +35,12 @@ uk.co.gresearch.siembol siembol-common - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT uk.co.gresearch.siembol alerting-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT com.jayway.jsonpath diff --git a/responding/responding-stream/pom.xml b/responding/responding-stream/pom.xml index 0650591e8..3b1c41afc 100644 --- a/responding/responding-stream/pom.xml +++ b/responding/responding-stream/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol responding - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT @@ -51,7 +51,7 @@ uk.co.gresearch.siembol siembol-common - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.slf4j @@ -62,7 +62,7 @@ uk.co.gresearch.siembol responding-core - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT org.apache.kafka diff --git a/siembol-common/pom.xml b/siembol-common/pom.xml index b3dd6fa9c..4e8da0071 100644 --- a/siembol-common/pom.xml +++ b/siembol-common/pom.xml @@ -9,7 +9,7 @@ uk.co.gresearch.siembol siembol - 2.0.0-SNAPSHOT + 2.0.1-SNAPSHOT diff --git a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/testing/TestingZooKeeperConnectorFactory.java b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/testing/TestingZooKeeperConnectorFactory.java index edea6e982..5e27be16d 100644 --- a/siembol-common/src/main/java/uk/co/gresearch/siembol/common/testing/TestingZooKeeperConnectorFactory.java +++ b/siembol-common/src/main/java/uk/co/gresearch/siembol/common/testing/TestingZooKeeperConnectorFactory.java @@ -10,21 +10,25 @@ import java.util.List; import java.util.Map; import java.util.HashMap; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; public class TestingZooKeeperConnectorFactory implements ZooKeeperConnectorFactory { - private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final Map cache = new HashMap<>(); + private final Map connectors = new HashMap<>(); public ZooKeeperConnector createZookeeperConnector(ZooKeeperAttributesDto attributes) { - return new TestingZooKeeperConnector(attributes.getZkPath()); + var ret = new TestingZooKeeperConnector(attributes.getZkPath()); + connectors.put(attributes.getZkPath(), ret); + return ret; } public void setData(String path, String data) { cache.put(path, data); } + public ZooKeeperConnector getZooKeeperConnector(String path) { + return connectors.get(path); + } + public class TestingZooKeeperConnector implements ZooKeeperConnector { private final String path; private final List callBacks = new ArrayList<>(); @@ -41,14 +45,12 @@ public String getData() { @Override public void setData(String data) throws Exception { cache.put(path, data); - for (NodeCacheListener callBack: callBacks) { - executorService.submit(() -> { - try { - callBack.nodeChanged(); - } catch (Exception e) { - throw new RuntimeException(e); - } - }); + for (NodeCacheListener callBack : callBacks) { + try { + callBack.nodeChanged(); + } catch (Exception e) { + throw new RuntimeException(e); + } } }