Skip to content

Commit

Permalink
fixes to graph substitutions
Browse files Browse the repository at this point in the history
  • Loading branch information
datomo committed Oct 26, 2023
1 parent 267b2db commit 030e0fd
Show file tree
Hide file tree
Showing 10 changed files with 65 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,8 @@ public static Expression convert( Expression operand, Type fromType, Type toType
return Expressions.call( PolyDocument.class, "convert", operand );
} else if ( toType == PolyValue.class ) {
return Expressions.convert_( operand, toType ); // document
} else if ( toType == PolyNumber.class ) {
return Expressions.convert_( operand, toType ); // number
}
log.warn( "Converter missing " + toType );
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
import com.google.common.collect.ImmutableList;
import io.activej.serializer.BinarySerializer;
import io.activej.serializer.annotations.Deserialize;
import io.activej.serializer.annotations.Serialize;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -47,16 +44,13 @@
import org.polypheny.db.type.PolySerializable;
import org.polypheny.db.util.Pair;

@Getter
@EqualsAndHashCode(callSuper = true)
@Value
public class DocStoreCatalog extends StoreCatalog {

@Getter
public BinarySerializer<DocStoreCatalog> serializer = PolySerializable.buildSerializer( DocStoreCatalog.class );

@Serialize
public ConcurrentMap<Pair<Long, Long>, PhysicalField> fields; // allocId, columnId


public DocStoreCatalog( long adapterId ) {
this( adapterId, Map.of(), Map.of(), Map.of(), Map.of() );
Expand Down Expand Up @@ -86,10 +80,9 @@ public DocStoreCatalog(
@Deserialize("adapterId") long adapterId,
@Deserialize("physicals") Map<Long, PhysicalEntity> physicals,
@Deserialize("allocations") Map<Long, AllocationEntity> allocations,
@Deserialize("fields") Map<Pair<Long, Long>, PhysicalColumn> fields,
@Deserialize("fields") Map<Pair<Long, Long>, PhysicalField> fields,
@Deserialize("allocToPhysicals") Map<Long, Set<Long>> allocToPhysicals ) {
super( adapterId, Map.of(), physicals, allocations, allocToPhysicals );
this.fields = new ConcurrentHashMap<>( fields );
super( adapterId, Map.of(), physicals, allocations, allocToPhysicals, fields );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
import com.google.common.collect.ImmutableList;
import io.activej.serializer.BinarySerializer;
import io.activej.serializer.annotations.Deserialize;
import io.activej.serializer.annotations.Serialize;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -47,16 +44,13 @@
import org.polypheny.db.type.PolySerializable;
import org.polypheny.db.util.Pair;

@Getter
@EqualsAndHashCode(callSuper = true)
@Value
public class GraphStoreCatalog extends StoreCatalog {

@Getter
public BinarySerializer<GraphStoreCatalog> serializer = PolySerializable.buildSerializer( GraphStoreCatalog.class );

@Serialize
public ConcurrentMap<Pair<Long, Long>, PhysicalField> fields; // allocId, columnId


public GraphStoreCatalog( long adapterId ) {
this( adapterId, Map.of(), Map.of(), Map.of(), Map.of() );
Expand All @@ -67,10 +61,9 @@ public GraphStoreCatalog(
@Deserialize("adapterId") long adapterId,
@Deserialize("physicals") Map<Long, PhysicalEntity> physicals,
@Deserialize("allocations") Map<Long, AllocationEntity> allocations,
@Deserialize("fields") Map<Pair<Long, Long>, PhysicalColumn> fields,
@Deserialize("fields") Map<Pair<Long, Long>, PhysicalField> fields,
@Deserialize("allocToPhysicals") Map<Long, Set<Long>> allocToPhysicals ) {
super( adapterId, Map.of(), physicals, allocations, allocToPhysicals );
this.fields = new ConcurrentHashMap<>( fields );
super( adapterId, Map.of(), physicals, allocations, allocToPhysicals, fields );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,10 @@
import com.google.common.collect.ImmutableList;
import io.activej.serializer.BinarySerializer;
import io.activej.serializer.annotations.Deserialize;
import io.activej.serializer.annotations.Serialize;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand All @@ -41,25 +38,23 @@
import org.polypheny.db.catalog.entity.logical.LogicalTable;
import org.polypheny.db.catalog.entity.physical.PhysicalColumn;
import org.polypheny.db.catalog.entity.physical.PhysicalEntity;
import org.polypheny.db.catalog.entity.physical.PhysicalField;
import org.polypheny.db.catalog.entity.physical.PhysicalTable;
import org.polypheny.db.type.PolySerializable;
import org.polypheny.db.util.Pair;

@Getter
@EqualsAndHashCode(callSuper = true)
@Value
@Slf4j
@NonFinal
public class RelStoreCatalog extends StoreCatalog {

@Getter
public BinarySerializer<GraphStoreCatalog> serializer = PolySerializable.buildSerializer( GraphStoreCatalog.class );

@Serialize
public ConcurrentMap<Pair<Long, Long>, PhysicalColumn> columns; // allocId, columnId


public RelStoreCatalog( long adapterId ) {
this( adapterId, Map.of(), Map.of(), Map.of(), Map.of() );
this( adapterId, Map.of(), Map.of(), Map.of(), Map.of(), Map.of() );
}


Expand All @@ -68,32 +63,32 @@ public RelStoreCatalog(
@Deserialize("physicals") Map<Long, PhysicalEntity> physicals,
@Deserialize("allocations") Map<Long, AllocationEntity> allocations,
@Deserialize("columns") Map<Pair<Long, Long>, PhysicalColumn> columns,
@Deserialize("allocToPhysicals") Map<Long, Set<Long>> allocToPhysicals ) {
super( adapterId, Map.of(), physicals, allocations, allocToPhysicals );
this.columns = new ConcurrentHashMap<>( columns );
@Deserialize("allocToPhysicals") Map<Long, Set<Long>> allocToPhysicals,
@Deserialize("fields") Map<Pair<Long, Long>, PhysicalField> fields ) {
super( adapterId, Map.of(), physicals, allocations, allocToPhysicals, fields );
}


@Override
public void renameLogicalColumn( long id, String newFieldName ) {
List<PhysicalColumn> updates = new ArrayList<>();
for ( PhysicalColumn field : columns.values() ) {
for ( PhysicalField field : fields.values() ) {
if ( field.id == id ) {
updates.add( field.toBuilder().logicalName( newFieldName ).build() );
updates.add( field.unwrap( PhysicalColumn.class ).toBuilder().logicalName( newFieldName ).build() );
}
}
for ( PhysicalColumn u : updates ) {
PhysicalTable table = fromAllocation( u.allocId );
List<PhysicalColumn> newColumns = table.columns.stream().filter( c -> c.id != id ).collect( Collectors.toList() );
newColumns.add( u );
physicals.put( table.id, table.toBuilder().columns( ImmutableList.copyOf( newColumns ) ).build() );
columns.put( Pair.of( u.allocId, u.id ), u );
fields.put( Pair.of( u.allocId, u.id ), u );
}
}


public void addColumn( PhysicalColumn column ) {
columns.put( Pair.of( column.allocId, column.id ), column );
fields.put( Pair.of( column.allocId, column.id ), column );
}


Expand All @@ -103,7 +98,7 @@ public PhysicalTable getTable( long id ) {


public PhysicalColumn getColumn( long id, long allocId ) {
return columns.get( Pair.of( allocId, id ) );
return fields.get( Pair.of( allocId, id ) ).unwrap( PhysicalColumn.class );
}


Expand Down Expand Up @@ -148,17 +143,17 @@ public PhysicalTable fromAllocation( long id ) {


public void dropColumn( long allocId, long columnId ) {
PhysicalColumn column = columns.get( Pair.of( allocId, columnId ) );
PhysicalColumn column = fields.get( Pair.of( allocId, columnId ) ).unwrap( PhysicalColumn.class );
PhysicalTable table = fromAllocation( allocId );
List<PhysicalColumn> pColumns = new ArrayList<>( table.columns );
pColumns.remove( column );
addPhysical( getAlloc( allocId ), table.toBuilder().columns( ImmutableList.copyOf( pColumns ) ).build() );
columns.remove( Pair.of( allocId, columnId ) );
fields.remove( Pair.of( allocId, columnId ) );
}


public List<PhysicalColumn> getColumns( long allocId ) {
return columns.values().stream().filter( c -> c.allocId == allocId ).collect( Collectors.toList() );
return fields.values().stream().map( p -> p.unwrap( PhysicalColumn.class ) ).filter( c -> c.allocId == allocId ).collect( Collectors.toList() );
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
Expand All @@ -35,9 +36,11 @@
import org.polypheny.db.catalog.IdBuilder;
import org.polypheny.db.catalog.entity.allocation.AllocationEntity;
import org.polypheny.db.catalog.entity.physical.PhysicalEntity;
import org.polypheny.db.catalog.entity.physical.PhysicalField;
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
import org.polypheny.db.schema.Namespace;
import org.polypheny.db.type.PolySerializable;
import org.polypheny.db.util.Pair;

@Value
@NonFinal
Expand All @@ -62,9 +65,12 @@ public abstract class StoreCatalog implements PolySerializable {
@Serialize
public ConcurrentMap<Long, Set<Long>> allocToPhysicals;

@Serialize
public ConcurrentMap<Pair<Long, Long>, PhysicalField> fields; // allocId, fieldId


public StoreCatalog( long adapterId ) {
this( adapterId, Map.of(), Map.of(), Map.of(), Map.of() );
this( adapterId, Map.of(), Map.of(), Map.of(), Map.of(), Map.of() );
}


Expand All @@ -73,12 +79,14 @@ public StoreCatalog(
Map<Long, Namespace> namespaces,
Map<Long, PhysicalEntity> physicals,
Map<Long, AllocationEntity> allocations,
Map<Long, Set<Long>> allocToPhysicals ) {
Map<Long, Set<Long>> allocToPhysicals,
Map<Pair<Long, Long>, PhysicalField> fields ) {
this.adapterId = adapterId;
this.namespaces = new ConcurrentHashMap<>( namespaces );
this.physicals = new ConcurrentHashMap<>( physicals );
this.allocations = new ConcurrentHashMap<>( allocations );
this.allocToPhysicals = new ConcurrentHashMap<>( allocToPhysicals );
this.fields = new ConcurrentHashMap<>( fields );
}


Expand Down Expand Up @@ -154,9 +162,15 @@ public void removeAllocAndPhysical( long allocId ) {
}
allocToPhysicals.remove( physical.allocationId );
allocations.remove( physical.allocationId );

// remove fields
List<PhysicalField> removeFields = fields.entrySet().stream().filter( f -> f.getValue().entityId == physicalId
|| f.getKey().getKey() == physical.allocationId ).map( Entry::getValue ).collect( Collectors.toList() );
removeFields.forEach( field -> fields.remove( Pair.of( field.allocId, field.id ) ) );
}
physicals.forEach( this.physicals::remove );
physicals.forEach( allocToPhysicals::remove );

}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,21 +64,18 @@
import org.polypheny.db.type.JavaToPolyTypeConversionRules;
import org.polypheny.db.type.PolyType;
import org.polypheny.db.type.PolyTypeFactoryImpl;
import org.polypheny.db.type.entity.PolyBigDecimal;
import org.polypheny.db.type.entity.PolyBinary;
import org.polypheny.db.type.entity.PolyBoolean;
import org.polypheny.db.type.entity.PolyDate;
import org.polypheny.db.type.entity.PolyDouble;
import org.polypheny.db.type.entity.PolyFloat;
import org.polypheny.db.type.entity.PolyInteger;
import org.polypheny.db.type.entity.PolyInterval;
import org.polypheny.db.type.entity.PolyList;
import org.polypheny.db.type.entity.PolyString;
import org.polypheny.db.type.entity.PolySymbol;
import org.polypheny.db.type.entity.PolyTime;
import org.polypheny.db.type.entity.PolyTimeStamp;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.type.entity.category.PolyBlob;
import org.polypheny.db.type.entity.category.PolyNumber;
import org.polypheny.db.type.entity.category.PolyTemporal;
import org.polypheny.db.type.entity.graph.PolyEdge;
import org.polypheny.db.type.entity.graph.PolyGraph;
import org.polypheny.db.type.entity.graph.PolyNode;
Expand Down Expand Up @@ -220,11 +217,16 @@ public Type getJavaClass( AlgDataType type ) {
return PolyDate.class;
case TIME:
case TIME_WITH_LOCAL_TIME_ZONE:
return PolyTime.class;
return PolyTemporal.class;
case DOUBLE:
case FLOAT: // sic
case REAL:
case SMALLINT:
case TINYINT:
case DECIMAL:
case INTEGER:
return PolyInteger.class;
case BIGINT:
return PolyBigDecimal.class;
return PolyNumber.class;
case TIMESTAMP:
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return PolyTimeStamp.class;
Expand All @@ -242,20 +244,8 @@ public Type getJavaClass( AlgDataType type ) {
case INTERVAL_MINUTE_SECOND:
case INTERVAL_SECOND:
return PolyInterval.class;
case SMALLINT:
return PolyInteger.class;
case TINYINT:
return PolyInteger.class;
case DECIMAL:
return PolyBigDecimal.class;
case BOOLEAN:
return PolyBoolean.class;
case DOUBLE:
return PolyDouble.class;
case FLOAT: // sic
return PolyFloat.class;
case REAL:
return PolyFloat.class;
case BINARY:
case VARBINARY:
return PolyBinary.class;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public void copyGraphData( AllocationGraph to, LogicalGraph from, Transaction tr

LogicalLpgModify modify = new LogicalLpgModify( builder.getCluster(), builder.getCluster().traitSetOf( ModelTrait.GRAPH ), to, values, Modify.Operation.INSERT, null, null );

AlgNode routedModify = RoutingManager.getInstance().getDmlRouter().routeGraphDml( modify, statement, from, List.of( to.id ) );
AlgNode routedModify = RoutingManager.getInstance().getDmlRouter().routeGraphDml( modify, statement, from, List.of( to.placementId ) );

result = statement.getQueryProcessor().prepareQuery(
AlgRoot.of( routedModify, Kind.SELECT ),
Expand Down
Loading

0 comments on commit 030e0fd

Please sign in to comment.