diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java index 0429af3598..85aecdff6a 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/AmazonAsyncEventService.java @@ -25,20 +25,20 @@ import java.util.List; import java.util.concurrent.atomic.AtomicLong; -import com.codahale.metrics.Histogram; -import com.google.common.base.Preconditions; - -import org.apache.usergrid.corepersistence.asyncevents.model.*; -import org.apache.usergrid.corepersistence.index.*; -import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; -import org.apache.usergrid.persistence.index.EntityIndex; -import org.apache.usergrid.persistence.index.EntityIndexFactory; -import org.apache.usergrid.persistence.index.IndexLocationStrategy; -import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.usergrid.corepersistence.asyncevents.model.AsyncEvent; +import org.apache.usergrid.corepersistence.asyncevents.model.EdgeDeleteEvent; +import org.apache.usergrid.corepersistence.asyncevents.model.EdgeIndexEvent; +import org.apache.usergrid.corepersistence.asyncevents.model.EntityDeleteEvent; +import org.apache.usergrid.corepersistence.asyncevents.model.EntityIndexEvent; +import org.apache.usergrid.corepersistence.asyncevents.model.InitializeApplicationIndexEvent; +import org.apache.usergrid.corepersistence.index.EntityIndexOperation; +import org.apache.usergrid.corepersistence.index.IndexLocationStrategyFactory; +import org.apache.usergrid.corepersistence.index.IndexProcessorFig; +import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy; +import org.apache.usergrid.corepersistence.rx.impl.EdgeScope; import org.apache.usergrid.persistence.collection.EntityCollectionManager; import org.apache.usergrid.persistence.collection.EntityCollectionManagerFactory; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; @@ -46,6 +46,10 @@ import org.apache.usergrid.persistence.core.rx.RxTaskScheduler; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; +import org.apache.usergrid.persistence.index.EntityIndex; +import org.apache.usergrid.persistence.index.EntityIndexFactory; +import org.apache.usergrid.persistence.index.IndexLocationStrategy; +import org.apache.usergrid.persistence.index.impl.IndexOperationMessage; import org.apache.usergrid.persistence.model.entity.Entity; import org.apache.usergrid.persistence.model.entity.Id; import org.apache.usergrid.persistence.queue.QueueManager; @@ -56,7 +60,9 @@ import com.codahale.metrics.Counter; import com.codahale.metrics.Gauge; +import com.codahale.metrics.Histogram; import com.codahale.metrics.Timer; +import com.google.common.base.Preconditions; import com.google.inject.Inject; import com.google.inject.Singleton; @@ -211,43 +217,42 @@ public void ack(final QueueMessage message) { } - private void handleMessages(final List messages) { - if (logger.isDebugEnabled()) logger.debug("handleMessages with {} message", messages.size()); - for (QueueMessage message : messages) { - final AsyncEvent event = (AsyncEvent) message.getBody(); - - if (logger.isDebugEnabled()) logger.debug("Processing {} event", event.getEventType()); - - if (event == null || event.getEventType() == null) { - logger.error("AsyncEvent type or event is null!"); - } else { - switch (event.getEventType()) { + private void handleMessages( final List messages ) { + if ( logger.isDebugEnabled() ) { + logger.debug( "handleMessages with {} message", messages.size() ); + } - case EDGE_DELETE: - handleEdgeDelete(message); - break; + for ( QueueMessage message : messages ) { + final AsyncEvent event = ( AsyncEvent ) message.getBody(); - case EDGE_INDEX: - handleEdgeIndex(message); - break; + logger.debug( "Processing {} event", event ); - case ENTITY_DELETE: - handleEntityDelete(message); - break; + if ( event == null ) { + logger.error( "AsyncEvent type or event is null!" ); + continue; + } - case ENTITY_INDEX: - handleEntityIndexUpdate(message); - break; - case APPLICATION_INDEX: - handleInitializeApplicationIndex(message); - break; + if ( event instanceof EdgeDeleteEvent ) { + handleEdgeDelete( message ); + } + else if ( event instanceof EdgeIndexEvent ) { + handleEdgeIndex( message ); + } - default: - logger.error("Unknown EventType: {}", event.getEventType()); + else if ( event instanceof EntityDeleteEvent ) { + handleEntityDelete( message ); + } + else if ( event instanceof EntityIndexEvent ) { + handleEntityIndexUpdate( message ); + } - } + else if ( event instanceof InitializeApplicationIndexEvent ) { + handleInitializeApplicationIndex( message ); + } + else { + logger.error( "Unknown EventType: {}", event ); } messageCycle.update( System.currentTimeMillis() - event.getCreationTime() ); @@ -257,7 +262,8 @@ private void handleMessages(final List messages) { @Override public void queueInitializeApplicationIndex( final ApplicationScope applicationScope) { - IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope); + IndexLocationStrategy indexLocationStrategy = indexLocationStrategyFactory.getIndexLocationStrategy( + applicationScope ); offer(new InitializeApplicationIndexEvent(new ReplicatedIndexLocationStrategy(indexLocationStrategy))); } @@ -272,19 +278,22 @@ public void queueEntityIndexUpdate(final ApplicationScope applicationScope, public void handleEntityIndexUpdate(final QueueMessage message) { - Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityIndexUpdate"); + Preconditions.checkNotNull( message, "Queue Message cannot be null for handleEntityIndexUpdate" ); - final EntityIndexEvent event = (EntityIndexEvent) message.getBody(); + final AsyncEvent event = ( AsyncEvent ) message.getBody(); Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityIndexUpdate"); - Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.ENTITY_INDEX, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getEventType())); + Preconditions.checkArgument(event instanceof EntityIndexEvent, String.format("Event Type for handleEntityIndexUpdate must be ENTITY_INDEX, got %s", event.getClass())); + + final EntityIndexEvent entityIndexEvent = (EntityIndexEvent) event; + //process the entity immediately //only process the same version, otherwise ignore - final EntityIdScope entityIdScope = event.getEntityIdScope(); + final EntityIdScope entityIdScope = entityIndexEvent.getEntityIdScope(); final ApplicationScope applicationScope = entityIdScope.getApplicationScope(); final Id entityId = entityIdScope.getId(); - final long updatedAfter = event.getUpdatedAfter(); + final long updatedAfter = entityIndexEvent.getUpdatedAfter(); final EntityIndexOperation entityIndexOperation = new EntityIndexOperation( applicationScope, entityId, updatedAfter); @@ -310,17 +319,19 @@ public void handleEdgeIndex(final QueueMessage message) { final AsyncEvent event = (AsyncEvent) message.getBody(); - Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeIndex"); - Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_INDEX, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getEventType())); + Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeIndex" ); + Preconditions.checkArgument(event instanceof EdgeIndexEvent, String.format("Event Type for handleEdgeIndex must be EDGE_INDEX, got %s", event.getClass())); + + final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event; - final ApplicationScope applicationScope = event.getApplicationScope(); - final Edge edge = event.getEdge(); + final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope(); + final Edge edge = edgeIndexEvent.getEdge(); final EntityCollectionManager ecm = entityCollectionManagerFactory.createCollectionManager( applicationScope ); - final Observable edgeIndexObservable = ecm.load(event.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge( + final Observable edgeIndexObservable = ecm.load(edgeIndexEvent.getEntityId()).flatMap( entity -> eventBuilder.buildNewEdge( applicationScope, entity, edge ) ); subscibeAndAck( edgeIndexObservable, message ); @@ -339,11 +350,14 @@ public void handleEdgeDelete(final QueueMessage message) { final AsyncEvent event = (AsyncEvent) message.getBody(); - Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEdgeDelete"); - Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.EDGE_DELETE, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getEventType())); + Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEdgeDelete" ); + Preconditions.checkArgument(event instanceof EdgeDeleteEvent, String.format("Event Type for handleEdgeDelete must be EDGE_DELETE, got %s", event.getClass())); - final ApplicationScope applicationScope = event.getApplicationScope(); - final Edge edge = event.getEdge(); + + final EdgeIndexEvent edgeIndexEvent = ( EdgeIndexEvent ) event; + + final ApplicationScope applicationScope = edgeIndexEvent.getApplicationScope(); + final Edge edge = edgeIndexEvent.getEdge(); if (logger.isDebugEnabled()) logger.debug("Deleting in app scope {} with edge {}", applicationScope, edge); @@ -364,12 +378,14 @@ public void handleEntityDelete(final QueueMessage message) { Preconditions.checkNotNull(message, "Queue Message cannot be null for handleEntityDelete"); final AsyncEvent event = (AsyncEvent) message.getBody(); - Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleEntityDelete"); - Preconditions.checkArgument( event.getEventType() == AsyncEvent.EventType.ENTITY_DELETE, - String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getEventType() ) ); + Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleEntityDelete" ); + Preconditions.checkArgument( event instanceof EntityDeleteEvent, + String.format( "Event Type for handleEntityDelete must be ENTITY_DELETE, got %s", event.getClass() ) ); - final ApplicationScope applicationScope = event.getApplicationScope(); - final Id entityId = event.getEntityId(); + + final EntityDeleteEvent entityDeleteEvent = ( EntityDeleteEvent ) event; + final ApplicationScope applicationScope = entityDeleteEvent.getEntityIdScope().getApplicationScope(); + final Id entityId = entityDeleteEvent.getEntityIdScope().getId(); if (logger.isDebugEnabled()) logger.debug("Deleting entity id from index in app scope {} with entityId {}", applicationScope, entityId); @@ -391,10 +407,13 @@ public void handleInitializeApplicationIndex(final QueueMessage message) { Preconditions.checkNotNull(message, "Queue Message cannot be null for handleInitializeApplicationIndex"); final AsyncEvent event = (AsyncEvent) message.getBody(); - Preconditions.checkNotNull(message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex"); - Preconditions.checkArgument(event.getEventType() == AsyncEvent.EventType.APPLICATION_INDEX, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getEventType())); + Preconditions.checkNotNull( message, "QueueMessage Body cannot be null for handleInitializeApplicationIndex" ); + Preconditions.checkArgument(event instanceof InitializeApplicationIndexEvent, String.format("Event Type for handleInitializeApplicationIndex must be APPLICATION_INDEX, got %s", event.getClass())); + + final InitializeApplicationIndexEvent initializeApplicationIndexEvent = + ( InitializeApplicationIndexEvent ) event; - final IndexLocationStrategy indexLocationStrategy = event.getIndexLocationStrategy(); + final IndexLocationStrategy indexLocationStrategy = initializeApplicationIndexEvent.getIndexLocationStrategy(); final EntityIndex index = entityIndexFactory.createEntityIndex( indexLocationStrategy ); index.initialize(); ack( message ); diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java index 46cec2e445..4bf56953bb 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/EventBuilderImpl.java @@ -174,8 +174,15 @@ public Observable buildEntityIndex( final EntityIndexOper entity -> { final Field modified = entity.getField( Schema.PROPERTY_MODIFIED ); + /** + * We don't have a modified field, so we can't check, pass it through + */ + if ( modified == null ) { + return true; + } + //only re-index if it has been updated and been updated after our timestamp - return modified != null && modified.getValue() >= entityIndexOperation.getUpdatedSince(); + return modified.getValue() >= entityIndexOperation.getUpdatedSince(); } ) //perform indexing on the task scheduler and start it .flatMap( entity -> indexService.indexEntity( applicationScope, entity ) ); diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java index 3d229862c1..6b452973ea 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/AsyncEvent.java @@ -19,155 +19,42 @@ package org.apache.usergrid.corepersistence.asyncevents.model; + +import java.io.Serializable; + import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import com.fasterxml.jackson.annotation.JsonProperty; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; -import com.fasterxml.jackson.databind.annotation.JsonSerialize; -import org.apache.usergrid.corepersistence.index.ReplicatedIndexLocationStrategy; -import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -import org.apache.usergrid.persistence.core.scope.ApplicationScope; -import org.apache.usergrid.persistence.graph.Edge; -import org.apache.usergrid.persistence.index.IndexLocationStrategy; -import org.apache.usergrid.persistence.model.entity.Id; +import com.fasterxml.jackson.annotation.JsonSubTypes; +import com.fasterxml.jackson.annotation.JsonTypeInfo; -import java.io.Serializable; /** - * Created by Jeff West on 5/25/15. + * Marker class for serialization */ -@JsonIgnoreProperties(ignoreUnknown = true) -public class AsyncEvent implements Serializable { - - @JsonProperty - protected IndexLocationStrategy indexLocationStrategy; - - @JsonProperty - protected EventType eventType; - - @JsonProperty - protected EntityIdScope entityIdScope; - - @JsonProperty - protected ApplicationScope applicationScope; - - @JsonProperty - protected Id entityId; +@JsonIgnoreProperties( ignoreUnknown = true ) +@JsonTypeInfo( use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.WRAPPER_OBJECT, property = "type" ) +@JsonSubTypes( { + @JsonSubTypes.Type( value = EdgeDeleteEvent.class, name = "edgeDeleteEvent" ), + @JsonSubTypes.Type( value = EdgeIndexEvent.class, name = "edgeIndexEvent" ), + @JsonSubTypes.Type( value = EntityDeleteEvent.class, name = "entityDeleteEvent" ), + @JsonSubTypes.Type( value = EntityIndexEvent.class, name = "entityIndexEvent" ), + @JsonSubTypes.Type( value = InitializeApplicationIndexEvent.class, name = "initializeApplicationIndexEvent" ) +} ) - @JsonProperty - protected Edge edge; +public abstract class AsyncEvent implements Serializable { @JsonProperty protected long creationTime; - /** - * required for jackson, do not delete - */ + //set by default, will be overridden when de-serializing protected AsyncEvent() { + creationTime = System.currentTimeMillis(); } - public AsyncEvent(final EventType eventType) { - - this.eventType = eventType; - this.creationTime = System.currentTimeMillis(); - } - - public AsyncEvent(final EventType eventType, - final EntityIdScope entityIdScope) { - - this.eventType = eventType; - this.entityIdScope = entityIdScope; - this.creationTime = System.currentTimeMillis(); - } - - public AsyncEvent(EventType eventType, IndexLocationStrategy indexLocationStrategy) { - this.eventType = eventType; - this.indexLocationStrategy = indexLocationStrategy; - this.creationTime = System.currentTimeMillis(); - } - - public AsyncEvent(EventType eventType, ApplicationScope applicationScope, Edge edge) { - this.eventType = eventType; - this.applicationScope = applicationScope; - this.edge = edge; - this.creationTime = System.currentTimeMillis(); - } - - public AsyncEvent(EventType eventType, ApplicationScope applicationScope, Id entityId, Edge edge) { - this.eventType = eventType; - this.applicationScope = applicationScope; - this.edge = edge; - this.entityId = entityId; - this.creationTime = System.currentTimeMillis(); - } - - @JsonSerialize() - public final Id getEntityId() { - return entityId; - } - - protected void setEntityId(Id entityId) { - this.entityId = entityId; - } - - @JsonSerialize() - public final EventType getEventType() { - return eventType; - } - - protected void setEventType(EventType eventType) { - this.eventType = eventType; - } - - @JsonSerialize() - public EntityIdScope getEntityIdScope() { - return entityIdScope; - } - - protected void setEntityIdScope(EntityIdScope entityIdScope) { - this.entityIdScope = entityIdScope; - } - - @JsonSerialize() - public ApplicationScope getApplicationScope() { - return applicationScope; - } - - protected void setApplicationScope(ApplicationScope applicationScope) { - this.applicationScope = applicationScope; - } - - @JsonSerialize() - @JsonDeserialize(as=ReplicatedIndexLocationStrategy.class) - public IndexLocationStrategy getIndexLocationStrategy() { return indexLocationStrategy; } - - protected void setIndexLocationStrategy( IndexLocationStrategy indexLocationStrategy ){ - this.indexLocationStrategy = indexLocationStrategy; - } - - @JsonSerialize() - public Edge getEdge() { - return edge; - } - - @JsonSerialize() - public long getCreationTime() { return creationTime; } - - protected void setEdge(Edge edge) { - this.edge = edge; - } - - public enum EventType { - EDGE_DELETE, - EDGE_INDEX, - ENTITY_DELETE, - ENTITY_INDEX, - APPLICATION_INDEX; - - public String asString() { - return toString(); - } + public long getCreationTime() { + return creationTime; } } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java index 3af981813b..af16baca97 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeDeleteEvent.java @@ -19,19 +19,41 @@ package org.apache.usergrid.corepersistence.asyncevents.model; -import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; -/** - * Created by Jeff West on 5/25/15. - */ -@JsonDeserialize(as = AsyncEvent.class) +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + + public final class EdgeDeleteEvent extends AsyncEvent { + + @JsonProperty + protected ApplicationScope applicationScope; + + + @JsonProperty + protected Edge edge; + + public EdgeDeleteEvent() { } - public EdgeDeleteEvent(ApplicationScope applicationScope, Edge edge) { - super(EventType.EDGE_DELETE, applicationScope, edge); + + public EdgeDeleteEvent( ApplicationScope applicationScope, Edge edge ) { + this.applicationScope = applicationScope; + this.edge = edge; + } + + + public ApplicationScope getApplicationScope() { + return applicationScope; + } + + + public Edge getEdge() { + return edge; } } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java index cd0118fdc6..c89b8282c8 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EdgeIndexEvent.java @@ -20,6 +20,8 @@ package org.apache.usergrid.corepersistence.asyncevents.model; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.graph.Edge; @@ -28,13 +30,19 @@ import java.io.Serializable; -/** - * Created by Jeff West on 5/25/15. - */ -@JsonDeserialize(as = AsyncEvent.class) + public final class EdgeIndexEvent - extends AsyncEvent - implements Serializable { + extends AsyncEvent { + + + @JsonProperty + protected ApplicationScope applicationScope; + + @JsonProperty + protected Id entityId; + + @JsonProperty + protected Edge edge; /** * Needed by jackson @@ -43,6 +51,23 @@ public EdgeIndexEvent() { } public EdgeIndexEvent(ApplicationScope applicationScope, Id entityId, Edge edge) { - super(EventType.EDGE_INDEX, applicationScope, entityId, edge); + this.applicationScope = applicationScope; + this.entityId = entityId; + this.edge = edge; + } + + + public ApplicationScope getApplicationScope() { + return applicationScope; + } + + + public Edge getEdge() { + return edge; + } + + + public Id getEntityId() { + return entityId; } } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java index 606deaea90..847a07d9fa 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityDeleteEvent.java @@ -19,20 +19,28 @@ package org.apache.usergrid.corepersistence.asyncevents.model; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.model.entity.Id; -/** - * Created by Jeff West on 5/25/15. - */ -@JsonDeserialize(as = AsyncEvent.class) public final class EntityDeleteEvent extends AsyncEvent { + + + @JsonProperty + protected EntityIdScope entityIdScope; + public EntityDeleteEvent() { } public EntityDeleteEvent(EntityIdScope entityIdScope) { - super(EventType.ENTITY_DELETE, entityIdScope); + this.entityIdScope = entityIdScope; + } + + + public EntityIdScope getEntityIdScope() { + return entityIdScope; } } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java index 81961a0d3b..a04326a492 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/EntityIndexEvent.java @@ -19,22 +19,26 @@ package org.apache.usergrid.corepersistence.asyncevents.model; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; -/** - * Created by Jeff West on 5/25/15. - */ -@JsonDeserialize(as = AsyncEvent.class) + public final class EntityIndexEvent extends AsyncEvent { + + @JsonProperty + protected EntityIdScope entityIdScope; + + @JsonProperty private long updatedAfter; public EntityIndexEvent() { } public EntityIndexEvent(EntityIdScope entityIdScope, final long updatedAfter ) { - super(EventType.ENTITY_INDEX, entityIdScope); + this.entityIdScope = entityIdScope; this.updatedAfter = updatedAfter; } @@ -44,7 +48,7 @@ public long getUpdatedAfter() { } - public void setUpdatedAfter( long updatedAfter ) { - this.updatedAfter = updatedAfter; + public EntityIdScope getEntityIdScope() { + return entityIdScope; } } diff --git a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java index 8b2065194e..68f0113fed 100644 --- a/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java +++ b/stack/core/src/main/java/org/apache/usergrid/corepersistence/asyncevents/model/InitializeApplicationIndexEvent.java @@ -19,20 +19,31 @@ */ package org.apache.usergrid.corepersistence.asyncevents.model; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonTypeName; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; + +import org.apache.usergrid.persistence.collection.serialization.impl.migration.EntityIdScope; import org.apache.usergrid.persistence.core.scope.ApplicationScope; import org.apache.usergrid.persistence.index.IndexLocationStrategy; /** * event to init app index */ -@JsonDeserialize(as = AsyncEvent.class) + public class InitializeApplicationIndexEvent extends AsyncEvent { - public InitializeApplicationIndexEvent() { - super(EventType.APPLICATION_INDEX); - } + + + @JsonProperty + protected IndexLocationStrategy indexLocationStrategy; + public InitializeApplicationIndexEvent(final IndexLocationStrategy indexLocationStrategy) { - super(EventType.APPLICATION_INDEX, indexLocationStrategy); + this.indexLocationStrategy = indexLocationStrategy; + + } + + public IndexLocationStrategy getIndexLocationStrategy() { + return indexLocationStrategy; } } diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java index d37701b9ba..4660389074 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AmazonAsyncEventServiceTest.java @@ -66,9 +66,6 @@ public class AmazonAsyncEventServiceTest extends AsyncIndexServiceTest { @Inject public MetricsFactory metricsFactory; - @Inject - public IndexService indexService; - @Inject public RxTaskScheduler rxTaskScheduler; diff --git a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java index 9b104fc786..d34a1a942a 100644 --- a/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java +++ b/stack/core/src/test/java/org/apache/usergrid/corepersistence/index/AsyncIndexServiceTest.java @@ -131,7 +131,7 @@ public void testMessageIndexing() throws InterruptedException { /** - * Write 10k edges 10 at a time in parallel + * Write 500 edges */ @@ -139,19 +139,19 @@ public void testMessageIndexing() throws InterruptedException { final Id connectingId = createId("connecting"); final Edge edge = CpNamingUtils.createConnectionEdge(connectingId, "likes", testEntity.getId()); - return graphManager.writeEdge(edge).subscribeOn(Schedulers.io()); + return graphManager.writeEdge( edge ); }).toList().toBlocking().last(); + //queue up processing asyncEventService.queueEntityIndexUpdate( applicationScope, testEntity ); - emf.refreshIndex(applicationScope.getApplication().getUuid()); - - // Thread.sleep( 1000000000000l ); final EntityIndex EntityIndex = entityIndexFactory.createEntityIndex( indexLocationStrategyFactory.getIndexLocationStrategy(applicationScope) ); + emf.refreshIndex(applicationScope.getApplication().getUuid()); + final SearchEdge collectionSearchEdge = CpNamingUtils.createSearchEdgeFromSource( collectionEdge ); //query until it's available @@ -176,13 +176,13 @@ public void testMessageIndexing() throws InterruptedException { } - private CandidateResults getResults( final EntityIndex EntityIndex, + private CandidateResults getResults( final EntityIndex entityIndex, final SearchEdge searchEdge, final SearchTypes searchTypes, final int expectedSize, final int attempts ) { for ( int i = 0; i < attempts; i++ ) { final CandidateResults candidateResults = - EntityIndex.search( searchEdge, searchTypes, "select *", 100, 0 ); + entityIndex.search( searchEdge, searchTypes, "select *", 100, 0 ); if ( candidateResults.size() == expectedSize ) { return candidateResults; diff --git a/stack/corepersistence/pom.xml b/stack/corepersistence/pom.xml index b95f3e1cb1..1ee5272f63 100644 --- a/stack/corepersistence/pom.xml +++ b/stack/corepersistence/pom.xml @@ -65,8 +65,7 @@ limitations under the License. 4.0-beta5 3.2 1.4.0 - 2.4.1 - 2.4.3 + 2.6.0 1.10.8 4.11 0.26 diff --git a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java index 6b07b9e0fd..e5c8f8f254 100644 --- a/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java +++ b/stack/corepersistence/queryindex/src/main/java/org/apache/usergrid/persistence/index/IndexLocationStrategy.java @@ -19,6 +19,8 @@ */ package org.apache.usergrid.persistence.index; +import java.io.Serializable; + import org.apache.usergrid.persistence.core.scope.ApplicationScope; /** * location strategy for index @@ -33,7 +35,7 @@ applicationIndexName = {indexRoot}_applications_{bucketId} applicationAliasName = {indexRoot}_{appId}_read_alias || {indexRoot}_{appId}_write_alias */ -public interface IndexLocationStrategy { +public interface IndexLocationStrategy extends Serializable { /** * get the alias name * @return