Skip to content

Commit

Permalink
rudimentary entry-collection works (forgot to add before)
Browse files Browse the repository at this point in the history
  • Loading branch information
flurfis committed Dec 5, 2023
1 parent 8e0f856 commit 4664438
Show file tree
Hide file tree
Showing 3 changed files with 200 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public ImmutableMap<Long, List<BackupEntityWrapper<LogicalEntity>>> wrapLogicalE
bupEntityList.add( tempBupEntity );


// create entityReferences for each table (if there is a reference) with tableDependencies, and add entityReferences to the backupinformationobject, but only for relational entities
// create entityReferences for each table (if there is a reference) with tableDependencies, and add entityReferences to the backupinformationobject, but only for relational entit
if (entity.getEntityType().equals( EntityType.ENTITY) && !(entity.getDataModel().equals( DataModel.DOCUMENT ) || entity.getDataModel().equals( DataModel.GRAPH ))) {
EntityReferencer entityReferencer = new EntityReferencer( entity.getId(), BackupEntityType.TABLE );
if (tableDependencies.containsKey( entity.getId() )) {
Expand Down
76 changes: 73 additions & 3 deletions dbms/src/main/java/org/polypheny/db/backup/BackupManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,13 @@
import org.polypheny.db.backup.datagatherer.GatherEntries;
import org.polypheny.db.backup.datagatherer.GatherSchema;
import org.polypheny.db.backup.datainserter.InsertSchema;
import org.polypheny.db.backup.dependencies.DependencyAssembler;
import org.polypheny.db.backup.dependencies.DependencyManager;
import org.polypheny.db.backup.dependencies.EntityReferencer;
import org.polypheny.db.catalog.entity.logical.LogicalEntity;
import org.polypheny.db.catalog.entity.logical.LogicalForeignKey;
import org.polypheny.db.catalog.entity.logical.LogicalNamespace;
import org.polypheny.db.catalog.entity.logical.LogicalTable;
import org.polypheny.db.catalog.exceptions.GenericRuntimeException;
import org.polypheny.db.catalog.logistic.EntityType;
import org.polypheny.db.information.*;
import org.polypheny.db.transaction.TransactionManager;
import org.polypheny.db.util.Pair;
Expand Down Expand Up @@ -109,7 +108,6 @@ public static BackupManager setAndGetInstance( BackupManager backupManager ) {

public void startDataGathering() {
this.backupInformationObject = new BackupInformationObject();
GatherEntries gatherEntries = new GatherEntries(transactionManager);
GatherSchema gatherSchema = new GatherSchema();

//gatherEntries.start();
Expand All @@ -119,6 +117,14 @@ public void startDataGathering() {
// how/where do i safe the data
//gatherEntries.start();


List<String> tablesForDataCollection = tableDataToBeGathered();
List<Pair<Long, String>> collectionsForDataCollection = collectionDataToBeGathered();
List<Long> graphNamespaceIds = collectGraphNamespaceIds();
GatherEntries gatherEntries = new GatherEntries(transactionManager, tablesForDataCollection, collectionsForDataCollection, graphNamespaceIds);
gatherEntries.start();


}


Expand Down Expand Up @@ -222,4 +228,68 @@ private void startInserting() {
log.info( "inserting done" );
}


/**
* returns a list of all table names where the entry-data should be collected for the backup (right now, all of them, except sources)
* @return list of names with the format: namespacename.tablename
*/
private List<String> tableDataToBeGathered() {
List<String> tableDataToBeGathered = new ArrayList<>();
List<LogicalNamespace> relationalNamespaces = backupInformationObject.getRelNamespaces();

if (!relationalNamespaces.isEmpty()) {
for ( LogicalNamespace relationalNamespace : relationalNamespaces ) {
List<LogicalEntity> tables = backupInformationObject.getTables().get( relationalNamespace.id );
if(!tables.isEmpty() ) {
for ( LogicalEntity table : tables ) {
if (!(table.entityType.equals( EntityType.SOURCE ))) {
tableDataToBeGathered.add( relationalNamespace.name + "." + table.name );
}
}
}
}
}
/*
for ( Map.Entry<Long, List<LogicalEntity>> entry : backupInformationObject.getTables().entrySet() ) {
for ( LogicalEntity table : entry.getValue() ) {
if (!(table.entityType.equals( EntityType.SOURCE ))) {
tableDataToBeGathered.add( relationalNamespace.name + "." + table.name );
}
}
}
*/
return tableDataToBeGathered;
}


/**
* returns a list of pairs with all collection names and their corresponding namespaceId where the entry-data should be collected for the backup (right now all of them)
* @return list of pairs with the format: <namespaceId, collectionName>
*/
private List<Pair<Long, String>> collectionDataToBeGathered() {
List<Pair<Long, String>> collectionDataToBeGathered = new ArrayList<>();

for ( Map.Entry<Long, List<LogicalEntity>> entry : backupInformationObject.getCollections().entrySet() ) {
for ( LogicalEntity collection : entry.getValue() ) {
collectionDataToBeGathered.add( new Pair<>( entry.getKey(), collection.name ) );
}
}

return collectionDataToBeGathered;
}


/**
* gets a list of all graph namespaceIds
* @return list of all graph namespaceIds
*/
private List<Long> collectGraphNamespaceIds() {
List<Long> graphNamespaceIds = new ArrayList<>();
for ( Map.Entry<Long, LogicalEntity> entry : backupInformationObject.getGraphs().entrySet() ) {
graphNamespaceIds.add( entry.getKey() );
}
return graphNamespaceIds;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,163 @@

package org.polypheny.db.backup.datagatherer;

import java.util.List;
import lombok.extern.slf4j.Slf4j;
import org.polypheny.db.PolyImplementation;
import org.polypheny.db.ResultIterator;
import org.polypheny.db.catalog.Catalog;
import org.polypheny.db.catalog.logistic.DataModel;
import org.polypheny.db.languages.LanguageManager;
import org.polypheny.db.languages.QueryLanguage;
import org.polypheny.db.processing.ImplementationContext.ExecutedContext;
import org.polypheny.db.processing.QueryContext;
import org.polypheny.db.transaction.Statement;
import org.polypheny.db.transaction.Transaction;
import org.polypheny.db.transaction.TransactionManager;
import org.polypheny.db.type.entity.PolyValue;
import org.polypheny.db.util.Pair;

@Slf4j
public class GatherEntries {
private final TransactionManager transactionManager;
private final List<String> tablesToBeCollected;
private final List<Pair<Long, String>> collectionsToBeCollected;
private final List<Long> graphNamespaceIds;

public GatherEntries( TransactionManager transactionManager ) {

public GatherEntries( TransactionManager transactionManager, List<String> tablesToBeCollected, List<Pair<Long, String>> collectionsForDataCollection, List<Long> graphNamespaceIds ) {
this.transactionManager = transactionManager;
this.tablesToBeCollected = tablesToBeCollected;
this.collectionsToBeCollected = collectionsForDataCollection;
this.graphNamespaceIds = graphNamespaceIds;
}

// Move data around as little as possible -> use shortest possible path
// Stream and flush data

// Structure for saving: 1 schemafile, 1 storagefile, 1 to many datadata file(s)

public void start() {
if (!tablesToBeCollected.isEmpty()){
for ( String nsTableName : tablesToBeCollected ) {
//TODO(FF): exclude default columns? no, how do you differentiate for each line if it is not a default value
String query = "SELECT * FROM " + nsTableName;
executeQuery( query, DataModel.RELATIONAL, Catalog.defaultNamespaceId );
}
}

if (!collectionsToBeCollected.isEmpty()){
for ( Pair<Long, String> collection : collectionsToBeCollected ) {
String query = String.format( "db.%s.find()", collection.getValue() );
executeQuery( query, DataModel.DOCUMENT, collection.getKey() );
}
}

if (!graphNamespaceIds.isEmpty()){
for ( Long graphNamespaceId : graphNamespaceIds ) {
String query = "MATCH (n) RETURN n";
executeQuery( query, DataModel.GRAPH, graphNamespaceId );
}
}

}

// Gather entries with select statements

private void executeQuery( String query, DataModel dataModel, long namespaceId ) {

log.debug( "gather entries" );
Transaction transaction;
Statement statement = null;
PolyImplementation result;
String query = "";

switch ( dataModel ) {
case RELATIONAL:
try {
// get a transaction and a statement
transaction = transactionManager.startTransaction( Catalog.defaultUserId, false, "Backup Entry-Gatherer" );
statement = transaction.createStatement();
ExecutedContext executedQuery = LanguageManager.getINSTANCE().anyQuery( QueryContext.builder().language( QueryLanguage.from( "sql" ) ).query( query ).origin( "Backup Manager" ).transactionManager( transactionManager ).namespaceId( namespaceId ).build(), statement ).get( 0 );
//TODO(FF): is the list here when there are subqueries? or what was the list for again?
List<ExecutedContext> executedQuery1 = LanguageManager.getINSTANCE().anyQuery( QueryContext.builder().language( QueryLanguage.from( "sql" ) ).query( query ).origin( "Backup Manager" ).transactionManager( transactionManager ).namespaceId( Catalog.defaultNamespaceId ).build(), statement );
// in case of results
ResultIterator iter = executedQuery.getIterator();
while ( iter.hasMoreRows() ) {
// liste mit tuples
List<List<PolyValue>> resultsPerTable = iter.getNextBatch();
log.info( resultsPerTable.toString() );
//FIXME(FF): if this is array: [[1, PolyList(value=[PolyList(value=[PolyList(value=[PolyBigDecimal(value=111), PolyBigDecimal(value=112)]), PolyList(value=[PolyBigDecimal(value=121), PolyBigDecimal(value=122)])]), PolyList(value=[PolyList(value=[PolyBigDecimal(value=211), PolyBigDecimal(value=212)]), PolyList(value=[PolyBigDecimal(value=221), PolyBigDecimal(value=222)])])])]]
//value is shown correctly for tojson

for ( List<PolyValue> row : resultsPerTable ) {
for ( PolyValue polyValue : row ) {
String test = polyValue.serialize();
String jsonString = polyValue.toJson();
PolyValue deserialized = PolyValue.deserialize( test );
//PolyValue deserialized2 = PolyValue.deserialize( jsonString ); // gives nullpointerexception
int jhg=87;
}
}

}

} catch ( Exception e ) {
throw new RuntimeException( "Error while starting transaction", e );
}
break;

case DOCUMENT:
try {
// get a transaction and a statement
transaction = transactionManager.startTransaction( Catalog.defaultUserId, false, "Backup Entry-Gatherer" );
statement = transaction.createStatement();
ExecutedContext executedQuery = LanguageManager.getINSTANCE().anyQuery( QueryContext.builder().language( QueryLanguage.from( "mql" ) ).query( query ).origin( "Backup Manager" ).transactionManager( transactionManager ).namespaceId( namespaceId ).build(), statement ).get( 0 );

ResultIterator iter = executedQuery.getIterator();
while ( iter.hasMoreRows() ) {
// liste mit tuples
List<List<PolyValue>> resultsPerCollection = iter.getNextBatch();
log.info( resultsPerCollection.toString() );
}
} catch ( Exception e ) {
throw new RuntimeException( "Error while starting transaction", e );
}
break;

//TODO(FF): fix rest of data collection (just copied, nothing done yet)
case GRAPH:
try {
// get a transaction and a statement
transaction = transactionManager.startTransaction( Catalog.defaultUserId, false, "Backup Entry-Gatherer" );
statement = transaction.createStatement();
ExecutedContext executedQuery = LanguageManager.getINSTANCE().anyQuery( QueryContext.builder().language( QueryLanguage.from( "cypher" ) ).query( query ).origin( "Backup Manager" ).transactionManager( transactionManager ).namespaceId( namespaceId ).build(), statement ).get( 0 );

ResultIterator iter = executedQuery.getIterator();
while ( iter.hasMoreRows() ) {
// liste mit tuples
List<List<PolyValue>> graphPerNamespace = iter.getNextBatch();
log.info( graphPerNamespace.toString() );
}
} catch ( Exception e ) {
throw new RuntimeException( "Error while starting transaction", e );
}
break;

default:
throw new RuntimeException( "Backup - GatherEntries: DataModel not supported" );
}


/*
try {
// get a transaction and a statement
transaction = transactionManager.startTransaction( Catalog.defaultUserId, false, "Backup Inserter" );
statement = transaction.createStatement();
ExecutedContext executedQuery = LanguageManager.getINSTANCE().anyQuery( QueryContext.builder().language( QueryLanguage.from( "sql" ) ).query( query ).origin( "Backup Manager" ).transactionManager( transactionManager ).namespaceId( Catalog.defaultNamespaceId ).build(), statement ).get( 0 );
// in case of results
int batchSize = 100;
//ResultIterator iter = executedQuery.getIterator(statement, batchSize);
ResultIterator iter = executedQuery.getIterator();
while ( iter.hasMoreRows() ) {
// liste mit tuples
Expand All @@ -60,8 +182,8 @@ public void start() {
} catch ( Exception e ) {
throw new RuntimeException( "Error while starting transaction", e );
}
}
// Gather entries with select statements
*/
}

}

0 comments on commit 4664438

Please sign in to comment.