Commit 86f47524 authored by Lukasz Waskiewicz's avatar Lukasz Waskiewicz
Browse files

refs #1517 optymalizacja zrównoleglania importu cząsteczek

uruchamianie importu snapshota odrazu po zakończeniu importu
poprzedniego
parent f8dc3694
......@@ -10,7 +10,6 @@ import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
......@@ -82,7 +81,7 @@ public class CocosImpalaImportTasklet extends CocosGenericImpalaImportTasklet {
protected List<String> customStatementsList;
private ExecutorCompletionService<Void> executor;
private ExecutorCompletionService<String> executor;
@Value("#{jobParameters['createUdf']?:true}")
private boolean createUdf;
......@@ -191,7 +190,7 @@ public class CocosImpalaImportTasklet extends CocosGenericImpalaImportTasklet {
customStatementsList = StringUtils.isBlank(customStatements) ? new ArrayList<String>() : Arrays.asList(customStatements.split(";"));
customStatementsList.removeIf(StringUtils::isBlank);
executor = new ExecutorCompletionService<Void>(Executors.newFixedThreadPool(concurrencyLevel));
executor = new ExecutorCompletionService<String>(Executors.newFixedThreadPool(concurrencyLevel));
}
......@@ -237,27 +236,38 @@ public class CocosImpalaImportTasklet extends CocosGenericImpalaImportTasklet {
private void executeMultithreaded(ChunkContext chunkContext) throws InterruptedException, ExecutionException {
ExecutionException executorException = null;
int taskCounter = 0;
List<String> statement = null;
while (mayStartAnotherConcurrentTask(taskCounter)) {
statement = (List<String>) statementsIterator.next();
handleConcurrentStatements(statement);
taskCounter++;
}
while (taskCounter > 0) {
try {
executor.take().get();
} catch (ExecutionException e) {
executorException = e;
ConcurrentStatementHolder statementHolder = new ConcurrentStatementHolder();
while (executorException == null && isAnotherConcurrentTaskAvailable()) {
if (mayStartAnotherConcurrentTask(statementHolder.pendingSize())) {
List<String> statement = (List<String>) statementsIterator.next();
handleConcurrentStatements(statement);
statementHolder.submit(statmentListStateIndicator(statement));
} else {
executorException = waitAndProcessCompletedThread(chunkContext, executorException, statementHolder);
}
taskCounter--;
}
while (statementHolder.pendingSize() > 0) {
executorException = waitAndProcessCompletedThread(chunkContext, executorException, statementHolder);
}
if (executorException != null) {
throw executorException;
}
if (!deleteProcessedPartitions && statement != null) {
updateState(chunkContext, statmentListStateIndicator(statement));
}
public ExecutionException waitAndProcessCompletedThread(ChunkContext chunkContext, ExecutionException executorException,
ConcurrentStatementHolder statementHolder) throws InterruptedException {
try {
String completedStatement = executor.take().get();
statementHolder.markCompleted(completedStatement);
if (!deleteProcessedPartitions && statementHolder.taskletStateUpdateNeeded()) {
updateState(chunkContext, statementHolder.taskletState());
}
} catch (ExecutionException e) {
statementHolder.decrease();
return e;
}
return executorException;
}
private String statmentListStateIndicator(List<String> statementsList) {
......@@ -265,15 +275,19 @@ public class CocosImpalaImportTasklet extends CocosGenericImpalaImportTasklet {
}
private boolean mayStartAnotherConcurrentTask(int taskCounter) {
return taskCounter < concurrencyLevel && statementsIterator.hasNext() && statementsIterator.peek() instanceof List;
return taskCounter < concurrencyLevel;
}
private boolean isAnotherConcurrentTaskAvailable() {
return statementsIterator.hasNext() && statementsIterator.peek() instanceof List;
}
private Future<Void> handleConcurrentStatements(List<String> statement) {
return executor.submit(new Callable<Void>() {
private void handleConcurrentStatements(List<String> statement) {
executor.submit(new Callable<String>() {
@Override
public Void call() throws Exception {
public String call() throws Exception {
statement.forEach(single -> handleSingleStatement(single));
return null;
return statmentListStateIndicator(statement);
}
});
}
......
package pl.edu.icm.cocos.imports.impala;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
public class ConcurrentStatementHolder {
private List<ConcurrentStatementState> statements = new ArrayList<>();
private int taskCounter = 0;
public int decrease() {
return --taskCounter;
}
public int pendingSize() {
return taskCounter;
}
public void submit(String statment) {
statements.add(new ConcurrentStatementState(statment));
taskCounter++;
}
public void markCompleted(String completedStatement) {
statements.stream().filter(st -> st.getStatement().equals(completedStatement)).findAny().ifPresent(st -> st.setCompleted(true));
taskCounter--;
}
public boolean taskletStateUpdateNeeded() {
return statements.size() > 0 && statements.get(0).isCompleted();
}
public String taskletState() {
Iterator<ConcurrentStatementState> itr = statements.iterator();
String state = null;
while (itr.hasNext()) {
ConcurrentStatementState st = itr.next();
if (st.isCompleted()) {
state = st.getStatement();
itr.remove();
} else {
break;
}
}
return state;
}
}
package pl.edu.icm.cocos.imports.impala;
public class ConcurrentStatementState {
private final String statement;
private boolean completed = false;
public ConcurrentStatementState(String statement) {
super();
this.statement = statement;
}
public String getStatement() {
return statement;
}
public boolean isCompleted() {
return completed;
}
public void setCompleted(boolean completed) {
this.completed = completed;
}
}
......@@ -3,7 +3,9 @@ package pl.edu.icm.cocos.imports.impala;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.atMost;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
......@@ -482,7 +484,7 @@ public class CocosImpalaImportTaskletTest {
);
ArgumentCaptor<String> stateCaptor = ArgumentCaptor.forClass(String.class);
verify(setup.chunkContext.getStepContext().getStepExecution().getExecutionContext(), times(6)).put(eq(IMPALA_IMPORT_TASKLET_STATE), stateCaptor.capture());
verify(setup.chunkContext.getStepContext().getStepExecution().getExecutionContext(), atLeast(6)).put(eq(IMPALA_IMPORT_TASKLET_STATE), stateCaptor.capture());
Assertions.assertThat(stateCaptor.getValue()).isEqualTo("COMPUTE STATS particle;");
List<String> expectedBatchState = Arrays.asList(
"CREATE DATABASE IF NOT EXISTS dbPrefix_simId LOCATION '/user/impala/databases/dbPrefix_simId';",
......@@ -501,13 +503,79 @@ public class CocosImpalaImportTaskletTest {
}
@Test(invocationCount = 30)
public void shouldHandleFailWhenRunMultithreadedImportWithIteratorFactory() throws Exception {
public void shouldHandleFailWhenRunMultithreadedImportWithIteratorFactoryWhenFailingFirst() throws Exception {
final CocosImpalaTableSpec spec = new CocosImpalaTableSpec();
CocosPartitionInsertIteratorFactory partitionItr =mock(CocosPartitionInsertIteratorFactory.class);
when(partitionItr.buildProcessClauses(any(Path.class))).thenReturn(Arrays.asList(
new ProcessClauses("where=fail1", "drop1"),
new ProcessClauses("where=2", "drop2"),
new ProcessClauses("where=3", "drop3"),
new ProcessClauses("where=4", "drop4")
).iterator());
spec.setTableName("particle");
spec.setPartitionInsertIteratorFactory(partitionItr);
Setup setup = new Setup(this)
.withFile("particle")
.withColumn("double1", PrimitiveTypeName.DOUBLE)
.withColumn("float1", PrimitiveTypeName.FLOAT)
.withTableSpec(spec)
.withConcurrecyLevel(2)
.init();
try {
runExecuteTillFinished(setup);
Assertions.fail("should throw exception");
} catch (Exception e) {
}
List<Object> expectedUpdateStatements = Arrays.asList(
"CREATE DATABASE IF NOT EXISTS dbPrefix_simId LOCATION '/user/impala/databases/dbPrefix_simId';",
"USE dbPrefix_simId;",
"DROP TABLE IF EXISTS particle;",
"CREATE TABLE particle (float1 float,double1 double) STORED AS PARQUET LOCATION '/user/impala/databases/dbPrefix_simId/particle';",
Arrays.asList(
"CREATE EXTERNAL TABLE IF NOT EXISTS particle_where_fail1_external LIKE PARQUET 'uriPath/particle/_metadata' STORED AS PARQUET LOCATION 'uriPath/particle/where=fail1';",
"COMPUTE STATS particle_where_fail1_external;",
"INSERT INTO particle SELECT float1,double1 FROM particle_where_fail1_external;"
),
Arrays.asList(
"CREATE EXTERNAL TABLE IF NOT EXISTS particle_where_2_external LIKE PARQUET 'uriPath/particle/_metadata' STORED AS PARQUET LOCATION 'uriPath/particle/where=2';",
"COMPUTE STATS particle_where_2_external;",
"INSERT INTO particle SELECT float1,double1 FROM particle_where_2_external;",
"DROP TABLE IF EXISTS particle_where_2_external;"
)
);
ArgumentCaptor<String> stateCaptor = ArgumentCaptor.forClass(String.class);
verify(setup.chunkContext.getStepContext().getStepExecution().getExecutionContext(), times(4)).put(eq(IMPALA_IMPORT_TASKLET_STATE), stateCaptor.capture());
List<String> expectedBatchState = Arrays.asList(
"CREATE DATABASE IF NOT EXISTS dbPrefix_simId LOCATION '/user/impala/databases/dbPrefix_simId';",
"USE dbPrefix_simId;",
"DROP TABLE IF EXISTS particle;",
"CREATE TABLE particle (float1 float,double1 double) STORED AS PARQUET LOCATION '/user/impala/databases/dbPrefix_simId/particle';"
);
Assertions.assertThat(stateCaptor.getAllValues()).containsExactlyElementsOf(expectedBatchState);
validateConcurrentUpdateStatements(expectedUpdateStatements);
final int minCount = flatSize(expectedUpdateStatements) - DATABASE_UNAWARE_STATEMENTS;
validateExecuteStatements(minCount, minCount+8, "USE dbPrefix_simId;");
}
@Test(invocationCount = 30)
public void shouldHandleFailWhenRunMultithreadedImportWithIteratorFactoryWhenFailingSecond() throws Exception {
final CocosImpalaTableSpec spec = new CocosImpalaTableSpec();
CocosPartitionInsertIteratorFactory partitionItr =mock(CocosPartitionInsertIteratorFactory.class);
when(partitionItr.buildProcessClauses(any(Path.class))).thenReturn(Arrays.asList(
new ProcessClauses("where=1", "drop1"),
new ProcessClauses("where=fail2", "drop2")
new ProcessClauses("where=fail2", "drop2"),
new ProcessClauses("where=3", "drop3"),
new ProcessClauses("where=4", "drop4")
).iterator());
spec.setTableName("particle");
......@@ -548,18 +616,20 @@ public class CocosImpalaImportTaskletTest {
);
ArgumentCaptor<String> stateCaptor = ArgumentCaptor.forClass(String.class);
verify(setup.chunkContext.getStepContext().getStepExecution().getExecutionContext(), times(4)).put(eq(IMPALA_IMPORT_TASKLET_STATE), stateCaptor.capture());
verify(setup.chunkContext.getStepContext().getStepExecution().getExecutionContext(), times(5)).put(eq(IMPALA_IMPORT_TASKLET_STATE), stateCaptor.capture());
List<String> expectedBatchState = Arrays.asList(
"CREATE DATABASE IF NOT EXISTS dbPrefix_simId LOCATION '/user/impala/databases/dbPrefix_simId';",
"USE dbPrefix_simId;",
"DROP TABLE IF EXISTS particle;",
"CREATE TABLE particle (float1 float,double1 double) STORED AS PARQUET LOCATION '/user/impala/databases/dbPrefix_simId/particle';"
"CREATE TABLE particle (float1 float,double1 double) STORED AS PARQUET LOCATION '/user/impala/databases/dbPrefix_simId/particle';",
"CREATE EXTERNAL TABLE IF NOT EXISTS particle_where_1_external LIKE PARQUET 'uriPath/particle/_metadata' STORED AS PARQUET LOCATION 'uriPath/particle/where=1';"
);
Assertions.assertThat(stateCaptor.getAllValues()).containsExactlyElementsOf(expectedBatchState);
validateConcurrentUpdateStatements(expectedUpdateStatements);
validateExecuteStatements(flatSize(expectedUpdateStatements) - DATABASE_UNAWARE_STATEMENTS, "USE dbPrefix_simId;");
int minCount = flatSize(expectedUpdateStatements) - DATABASE_UNAWARE_STATEMENTS;
validateExecuteStatements(minCount, minCount+8, "USE dbPrefix_simId;");
}
private int flatSize(List<Object> expectedUpdateStatements) {
......@@ -599,8 +669,13 @@ public class CocosImpalaImportTaskletTest {
}
private void validateExecuteStatements(int count, String... stmt) {
validateExecuteStatements(count, count, stmt);
}
private void validateExecuteStatements(int minCount, int maxCount, String... stmt) {
ArgumentCaptor<String> executeCaptor = ArgumentCaptor.forClass(String.class);
verify(operations, times(count)).execute(executeCaptor.capture());
verify(operations, atLeast(minCount)).execute(executeCaptor.capture());
verify(operations, atMost(maxCount)).execute(executeCaptor.capture());
final HashSet<String> executeStatements = new HashSet<String>(executeCaptor.getAllValues());
Assertions.assertThat(executeStatements).hasSize(stmt.length);
Assertions.assertThat(executeStatements).isEqualTo(new HashSet<>(Arrays.asList(stmt)));
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment