Commit 7505d83d authored by Lukasz Waskiewicz's avatar Lukasz Waskiewicz
Browse files

refs #1376 optymalizacja procesowania cząsteczek

parent ba8eecab
......@@ -82,6 +82,7 @@ public class ProcessParticles extends ProcessBase {
processSimulation(snapshots);
saveRdds();
shell.run(new String[] { "-rm", "-r", outputDirectory.toUri().getPath() + "/" + PARTITIONED_PARTICLES_TEMP_DIR });
final String sourceDir = outputDirectory.toUri().getPath() + "/" + SparkTable.SIMULATION.getResultTable();
final String targetDir = outputDirectory.toUri().getPath() + "/" + SparkTable.SIMULATION.getResultTable().replace("Temp", "");
shell.run(new String[] { "-rm", "-r", targetDir });
......@@ -118,62 +119,60 @@ public class ProcessParticles extends ProcessBase {
JavaRDD<GenericRowWithSchema> particleRows = tablesRegistrator.registerTable(inputDirectory, InputTable.PARTICLE.getTableName(), null,
"snapshotId=" + snapshotId);
JavaRDD<GadgetParticle> particles = tablesRegistrator.mapTable(SparkTable.PARTICLE, particleRows);
JavaRDD<ProcessedParticle> processedParticles = particles.map(v -> {
Map<Long, FileNPartInfo> currentStartMap = fileStartMapBroadcast.getValue();
FileNPartInfo fileNPartInfo = currentStartMap.get(v.getFileId());
Long[] types = fileNPartInfo.getTypes();
Long fileOrd = v.getFileOrdinal();
Integer type = 0;
for (Long nPartType : types) {
if (fileOrd < nPartType) {
break;
if (!particles.isEmpty()) {
JavaRDD<ProcessedParticle> processedParticles = particles.map(v -> {
Map<Long, FileNPartInfo> currentStartMap = fileStartMapBroadcast.getValue();
FileNPartInfo fileNPartInfo = currentStartMap.get(v.getFileId());
Long[] types = fileNPartInfo.getTypes();
Long fileOrd = v.getFileOrdinal();
Integer type = 0;
for (Long nPartType : types) {
if (fileOrd < nPartType) {
break;
}
fileOrd -= nPartType;
type++;
}
fileOrd -= nPartType;
type++;
}
Long startValue = fileNPartInfo.getOffsets()[type];
ProcessedParticle processedParticle = new ProcessedParticle(v, startValue, type);
return processedParticle;
});
if (!fofData.isEmpty()) {
fofData = fofData.persist(StorageLevel.MEMORY_AND_DISK());
haloData = haloData.persist(StorageLevel.MEMORY_AND_DISK());
DataFrame processedParticlesFrame = sqlContext.createDataFrame(processedParticles, ProcessedParticle.class);
saveDataFrame(processedParticlesFrame, PARTITIONED_PARTICLES_TEMP_DIR, new String[] { "type" }, SaveMode.Overwrite);
Long startValue = fileNPartInfo.getOffsets()[type];
ProcessedParticle processedParticle = new ProcessedParticle(v, startValue, type);
return processedParticle;
});
Map<String, Integer> fofFieldIndexes = getIndexes(fofData.first().schema());
Broadcast<Map<String, Integer>> fofIndexes = jsc.broadcast(fofFieldIndexes);
if (!fofData.isEmpty()) {
fofData = fofData.persist(StorageLevel.MEMORY_AND_DISK());
haloData = haloData.persist(StorageLevel.MEMORY_AND_DISK());
DataFrame processedParticlesFrame = sqlContext.createDataFrame(processedParticles, ProcessedParticle.class);
saveDataFrame(processedParticlesFrame, PARTITIONED_PARTICLES_TEMP_DIR, new String[] { "type" }, SaveMode.Overwrite);
Map<String, Integer> fofFieldIndexes = getIndexes(fofData.first().schema());
Broadcast<Map<String, Integer>> fofIndexes = jsc.broadcast(fofFieldIndexes);
long[] fofCounts = fofData.aggregate(new long[PARTICLE_TYPES_COUNT], (count, fof) -> {
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
count[i] = count[i] + fof.getLong(fofIndexes.getValue().get("npart_type_" + i));
}
return count;
}, (a, b) -> {
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
a[i] = a[i] + b[i];
}
return a;
});
long[] fofCounts = fofData.aggregate(new long[PARTICLE_TYPES_COUNT], (count, fof) -> {
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
count[i] = count[i] + fof.getLong(fofIndexes.getValue().get("npart_type_" + i));
}
return count;
}, (a, b) -> {
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
a[i] = a[i] + b[i];
if (fofCounts[i] > 0) {
processParticlesWithFofs(fofData, haloData, i);
} else {
processParticlesWithoutFofs(i);
}
}
return a;
});
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
if (fofCounts[i] > 0) {
processParticlesWithFofs(fofData, haloData, i);
} else {
processParticlesWithoutFofs(i);
}
haloData.unpersist();
fofData.unpersist();
} else {
processParticlesWithoutFofs(processedParticles);
}
final String processedParticlesTempDir = outputDirectory.toUri().getPath() + "/" + PARTITIONED_PARTICLES_TEMP_DIR;
haloData.unpersist();
fofData.unpersist();
shell.run(new String[] { "-rm", "-r", processedParticlesTempDir });
} else {
processParticlesWithoutFofs(processedParticles);
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment