Commit 60ca71ca authored by Lukasz Waskiewicz's avatar Lukasz Waskiewicz
Browse files

refs #1373 optymalizacjia kompresji

parent 9fc6dd4d
...@@ -6,8 +6,8 @@ import org.apache.hadoop.fs.Path; ...@@ -6,8 +6,8 @@ import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf; import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SaveMode; import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import pl.edu.icm.cocos.imports.model.gadget.GadgetSnapshotFileMetadata; import pl.edu.icm.cocos.imports.model.gadget.GadgetSnapshotFileMetadata;
...@@ -52,9 +52,7 @@ public class ChangeParticlesCompression extends ProcessBase { ...@@ -52,9 +52,7 @@ public class ChangeParticlesCompression extends ProcessBase {
} }
private void processParticlesSnapshot(long snapshotId, SparkTable table) throws Exception { private void processParticlesSnapshot(long snapshotId, SparkTable table) throws Exception {
JavaRDD<GenericRowWithSchema> particleRows = tablesRegistrator.registerTable(inputDirectory, table.getName(), null, "snapshotId=" + snapshotId); DataFrame particles = tablesRegistrator.registerDataFrame(inputDirectory, table.getName(), null, "snapshotId=" + snapshotId);
JavaRDD<?> particles = tablesRegistrator.mapTable(table, particleRows);
saveData(table, particles); saveData(table, particles);
} }
...@@ -63,10 +61,8 @@ public class ChangeParticlesCompression extends ProcessBase { ...@@ -63,10 +61,8 @@ public class ChangeParticlesCompression extends ProcessBase {
shell.run(new String[] { "-mv", tempPath, targetDir }); shell.run(new String[] { "-mv", tempPath, targetDir });
} }
private void saveData(SparkTable table, JavaRDD<?> result) throws Exception { private void saveData(SparkTable table, DataFrame result) throws Exception {
rdds.put(table, result); saveDataFrame(result, table, SaveMode.Append);
saveRdd(table, SaveMode.Append);
rdds.put(table, null);
} }
public JavaSparkContext getJsc() { public JavaSparkContext getJsc() {
......
...@@ -114,8 +114,12 @@ public abstract class ProcessBase implements Serializable { ...@@ -114,8 +114,12 @@ public abstract class ProcessBase implements Serializable {
if (tableRdd == null) { if (tableRdd == null) {
return; return;
} }
String resultTableName = table.getResultTable();
DataFrame frame = sqlContext.createDataFrame(tableRdd, table.getOutputClass()); DataFrame frame = sqlContext.createDataFrame(tableRdd, table.getOutputClass());
saveDataFrame(frame, table, mode);
}
public void saveDataFrame(DataFrame frame, SparkTableBase<?> table, SaveMode mode) {
String resultTableName = table.getResultTable();
Path path = new Path(outputDirectory, resultTableName); Path path = new Path(outputDirectory, resultTableName);
DataFrameWriter writer = frame.write().mode(mode); DataFrameWriter writer = frame.write().mode(mode);
String[] partitionBy= table.getPartitionBy(); String[] partitionBy= table.getPartitionBy();
......
...@@ -68,17 +68,17 @@ public class TablesRegistrator { ...@@ -68,17 +68,17 @@ public class TablesRegistrator {
return mapped; return mapped;
} }
public JavaRDD<GenericRowWithSchema> registerTable(Path rootPath, String dataName, List<String> columns, String whereCondition) throws IOException { public DataFrame registerDataFrame(Path rootPath, String dataName, List<String> columns, String whereCondition) throws IOException {
Path nodesPath = new Path(rootPath, dataName); Path nodesPath = new Path(rootPath, dataName);
if (!fileSystem.exists(nodesPath) && !dataName.startsWith(EXTENDED_PREFIX)) { if (!fileSystem.exists(nodesPath) && !dataName.startsWith(EXTENDED_PREFIX)) {
dataName = EXTENDED_PREFIX + dataName; dataName = EXTENDED_PREFIX + dataName;
return registerTable(rootPath, dataName, columns, whereCondition); return registerDataFrame(rootPath, dataName, columns, whereCondition);
} }
Path metadata = new Path(nodesPath, ".metadata"); Path metadata = new Path(nodesPath, ".metadata");
if (fileSystem.exists(metadata)) { if (fileSystem.exists(metadata)) {
fileSystem.delete(new Path(nodesPath, ".metadata"), true); fileSystem.delete(new Path(nodesPath, ".metadata"), true);
} }
DataFrame dataFrame = sqlContext.read().parquet(nodesPath.toUri().toString()); DataFrame dataFrame = sqlContext.read().parquet(nodesPath.toUri().toString());
if (columns != null && columns.size() > 0) { if (columns != null && columns.size() > 0) {
Column[] frameColumns = new Column[columns.size()]; Column[] frameColumns = new Column[columns.size()];
for (int i = 0; i < columns.size(); i++) { for (int i = 0; i < columns.size(); i++) {
...@@ -89,6 +89,11 @@ public class TablesRegistrator { ...@@ -89,6 +89,11 @@ public class TablesRegistrator {
if (StringUtils.isNotBlank(whereCondition)) { if (StringUtils.isNotBlank(whereCondition)) {
dataFrame = dataFrame.where(whereCondition); dataFrame = dataFrame.where(whereCondition);
} }
return dataFrame;
}
public JavaRDD<GenericRowWithSchema> registerTable(Path rootPath, String dataName, List<String> columns, String whereCondition) throws IOException {
DataFrame dataFrame = registerDataFrame(rootPath, dataName, columns, whereCondition);
return dataFrame.toJavaRDD().map(r -> (GenericRowWithSchema) r).setName(dataName); return dataFrame.toJavaRDD().map(r -> (GenericRowWithSchema) r).setName(dataName);
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment