Commit 80b0701e authored by Lukasz Waskiewicz's avatar Lukasz Waskiewicz
Browse files

refs #1517 optymalizacja zrównoleglania importu cząsteczek

limit na ilości równoczesnych wykonań create table
parent 86f47524
......@@ -10,6 +10,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
......@@ -35,6 +36,8 @@ import com.google.common.collect.PeekingIterator;
import parquet.schema.PrimitiveType.PrimitiveTypeName;
import pl.edu.icm.cocos.fileSystem.FileSystemDecorator;
import pl.edu.icm.cocos.imports.impala.statement.DatabaseUnawareStatement;
import pl.edu.icm.cocos.imports.impala.statement.StatementWithConcurrentExecutionLimit;
@Component
@Scope("step")
......@@ -95,7 +98,13 @@ public class CocosImpalaImportTasklet extends CocosGenericImpalaImportTasklet {
@Value("#{jobParameters['concurrencyLevel']?:1}")
private int concurrencyLevel;
@Value("#{jobParameters['concurrencyLimit']?:-1}")
private int concurrencyLimit;
private Semaphore semaphore;
public void initialize(StepContext stepContext) throws IOException {
semaphore = new Semaphore(concurrencyLimit <=0 ? Integer.MAX_VALUE : concurrencyLimit);
if (initialized) {
return;
}
......@@ -136,14 +145,14 @@ public class CocosImpalaImportTasklet extends CocosGenericImpalaImportTasklet {
for (Iterator<ProcessClauses> processClauses = partitionInsertIteratorFactory.get().buildProcessClauses(file.getPath()); processClauses
.hasNext();) {
ProcessClauses clause = processClauses.next();
List<String> concurrentStatements = new ArrayList<String>();
List<Object> concurrentStatements = new ArrayList<>();
String partitionTableName = tableName + "_" + clause.getWhereClause().replace("=", "_");
String partitionTableLocation = parquetDirPath + "/" + clause.getWhereClause();
String externalPartitionTableName = getExternalTableName(partitionTableName);
String createPartitionTable = new CreateTableBuilder(partitionTableLocation, externalPartitionTableName, sourceTableLikePath).ifNotExists()
.external(!deleteProcessedPartitions).build();
concurrentStatements.add(createPartitionTable);
concurrentStatements.add(new StatementWithConcurrentExecutionLimit(createPartitionTable, "createTable"));
concurrentStatements.add(computeStatsStatement(externalPartitionTableName));
String insertStatementFromPart = " FROM " + externalPartitionTableName;
......@@ -181,9 +190,9 @@ public class CocosImpalaImportTasklet extends CocosGenericImpalaImportTasklet {
return true;
}
if (item instanceof List) {
start.set(start.get() || batchState.equals(statmentListStateIndicator((List<String>) item)));
start.set(start.get() || batchState.equals(statmentListStateIndicator((List<?>) item)));
} else {
start.set(start.get() || batchState.equals(item));
start.set(start.get() || batchState.equals(item.toString()));
}
return false;
}).iterator());
......@@ -240,7 +249,7 @@ public class CocosImpalaImportTasklet extends CocosGenericImpalaImportTasklet {
while (executorException == null && isAnotherConcurrentTaskAvailable()) {
if (mayStartAnotherConcurrentTask(statementHolder.pendingSize())) {
List<String> statement = (List<String>) statementsIterator.next();
List<?> statement = (List<?>) statementsIterator.next();
handleConcurrentStatements(statement);
statementHolder.submit(statmentListStateIndicator(statement));
} else {
......@@ -270,8 +279,8 @@ public class CocosImpalaImportTasklet extends CocosGenericImpalaImportTasklet {
return executorException;
}
private String statmentListStateIndicator(List<String> statementsList) {
return statementsList.get(0);
private String statmentListStateIndicator(List<?> statementsList) {
return statementsList.get(0).toString();
}
private boolean mayStartAnotherConcurrentTask(int taskCounter) {
......@@ -282,28 +291,50 @@ public class CocosImpalaImportTasklet extends CocosGenericImpalaImportTasklet {
return statementsIterator.hasNext() && statementsIterator.peek() instanceof List;
}
private void handleConcurrentStatements(List<String> statement) {
private void handleConcurrentStatements(List<?> statements) {
executor.submit(new Callable<String>() {
@Override
public String call() throws Exception {
statement.forEach(single -> handleSingleStatement(single));
return statmentListStateIndicator(statement);
statements.forEach((Object single) -> handleSingleStatement(single));
return statmentListStateIndicator(statements);
}
});
}
private void handleSingleStatement(Object statement) {
if (statement instanceof String) {
handleSingleStatement((String) statement);
} else if (statement instanceof DatabaseUnawareStatement) {
handleSingleStatement((DatabaseUnawareStatement) statement);
} else if (statement instanceof StatementWithConcurrentExecutionLimit) {
handleSingleStatement((StatementWithConcurrentExecutionLimit) statement);
} else {
throw new RuntimeException("Wrong config");
}
}
protected void handleSingleStatement(DatabaseUnawareStatement statement) {
customStatementsList.forEach(customStatement -> operations.execute(customStatement + ";"));
executeUpdateStatement(statement.toString());
}
operations.update(statement.toString());
protected void handleSingleStatement(StatementWithConcurrentExecutionLimit statement) {
semaphore.acquireUninterruptibly();
try{
handleSingleStatement(statement.toString());
} finally {
semaphore.release();
}
}
protected void handleSingleStatement(String statement) {
String databaseName = databasePrefix + simulationBusinessId;
operations.execute("USE " + databaseName + ";");
customStatementsList.forEach(customStatement -> operations.execute(customStatement + ";"));
executeUpdateStatement(statement);
}
private void executeUpdateStatement(String statement) {
customStatementsList.forEach(customStatement -> operations.execute(customStatement + ";"));
operations.update(statement);
}
......
......@@ -4,6 +4,8 @@ import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import pl.edu.icm.cocos.imports.impala.statement.ConcurrentStatementState;
public class ConcurrentStatementHolder {
private List<ConcurrentStatementState> statements = new ArrayList<>();
......
package pl.edu.icm.cocos.imports.impala;
package pl.edu.icm.cocos.imports.impala.statement;
public class ConcurrentStatementState {
public class ConcurrentStatementState extends Statement{
private final String statement;
private boolean completed = false;
public ConcurrentStatementState(String statement) {
super();
this.statement = statement;
}
public String getStatement() {
return statement;
super(statement);
}
public boolean isCompleted() {
......
package pl.edu.icm.cocos.imports.impala.statement;
import java.io.Serializable;
public class DatabaseUnawareStatement extends Statement implements Serializable {
private static final long serialVersionUID = 2036961357590202563L;
public DatabaseUnawareStatement(String statement) {
super(statement);
}
}
package pl.edu.icm.cocos.imports.impala;
package pl.edu.icm.cocos.imports.impala.statement;
import java.io.Serializable;
public class DatabaseUnawareStatement implements Serializable {
private static final long serialVersionUID = 2036961357590202563L;
public abstract class Statement {
private final String statement;
public DatabaseUnawareStatement(String statement) {
public Statement(String statement) {
super();
this.statement = statement;
}
......
package pl.edu.icm.cocos.imports.impala.statement;
public class StatementWithConcurrentExecutionLimit extends Statement {
private final String flagName;
public StatementWithConcurrentExecutionLimit(String statement, String flagName) {
super(statement);
this.flagName = flagName;
}
public String getFlagName() {
return flagName;
}
}
......@@ -59,6 +59,7 @@ public class CocosImpalaImportTaskletTest {
CocosImpalaImportTaskletTest test;
String inputPath = "/";
int concurrencyLevel = 1;
int concurrencyLimit = -1;
String databasePrefix ="dbPrefix_";
String simulationBusinessId ="simId";
String batchState = StringUtils.EMPTY;
......@@ -98,6 +99,11 @@ public class CocosImpalaImportTaskletTest {
return this;
}
Setup withConcurrecyLimit(int concurrencyLimit) {
this.concurrencyLimit = concurrencyLimit;
return this;
}
Setup withBatchState(String batchState) {
this.batchState = batchState;
return this;
......@@ -125,6 +131,7 @@ public class CocosImpalaImportTaskletTest {
reflectionSet(tasklet, "inputPath", inputPath);
reflectionSet(tasklet, "concurrencyLevel", concurrencyLevel);
reflectionSet(tasklet, "concurrencyLimit", concurrencyLimit);
reflectionSet(tasklet, "databasePrefix", databasePrefix);
reflectionSet(tasklet, "simulationBusinessId", simulationBusinessId);
reflectionSet(tasklet, "customStatements", customStatements);
......@@ -566,6 +573,7 @@ public class CocosImpalaImportTaskletTest {
final int minCount = flatSize(expectedUpdateStatements) - DATABASE_UNAWARE_STATEMENTS;
validateExecuteStatements(minCount, minCount+8, "USE dbPrefix_simId;");
}
@Test(invocationCount = 30)
public void shouldHandleFailWhenRunMultithreadedImportWithIteratorFactoryWhenFailingSecond() throws Exception {
final CocosImpalaTableSpec spec = new CocosImpalaTableSpec();
......@@ -664,6 +672,7 @@ public class CocosImpalaImportTaskletTest {
private void validateUpdateStatements(List<String> expectedUpdateStatements) {
ArgumentCaptor<String> updateCaptor = ArgumentCaptor.forClass(String.class);
// might usee times(expectedUpdateStatements.size()) but next assert gives better notifications
verify(operations, atLeastOnce()).update(updateCaptor.capture());
Assertions.assertThat(updateCaptor.getAllValues()).containsExactlyElementsOf(expectedUpdateStatements);
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment