Commit c5764646 authored by Lukasz Waskiewicz's avatar Lukasz Waskiewicz
Browse files

refs #1376 optymalizacja procesowania, poprawa wylicznaia ordinal

parent 7505d83d
......@@ -109,7 +109,7 @@ public class ProcessParticles extends ProcessBase {
long[] offsets = new long[PARTICLE_TYPES_COUNT];
Map<Long, FileNPartInfo> fileStartMap = new HashMap<Long, FileNPartInfo>();
for (Tuple2<Long, Long[]> fileCount : fileCounts.sortByKey().collect()) {
fileStartMap.put(fileCount._1, new FileNPartInfo(offsets, fileCount._2));
fileStartMap.put(fileCount._1, new FileNPartInfo(offsets.clone(), fileCount._2));
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
offsets[i] = offsets[i] + fileCount._2[i];
}
......@@ -122,7 +122,7 @@ public class ProcessParticles extends ProcessBase {
if (!particles.isEmpty()) {
JavaRDD<ProcessedParticle> processedParticles = particles.map(v -> {
Map<Long, FileNPartInfo> currentStartMap = fileStartMapBroadcast.getValue();
FileNPartInfo fileNPartInfo = currentStartMap.get(v.getFileId());
Long[] types = fileNPartInfo.getTypes();
Long fileOrd = v.getFileOrdinal();
......@@ -134,47 +134,57 @@ public class ProcessParticles extends ProcessBase {
fileOrd -= nPartType;
type++;
}
Long startValue = fileNPartInfo.getOffsets()[type];
ProcessedParticle processedParticle = new ProcessedParticle(v, startValue, type);
return processedParticle;
});
if (!fofData.isEmpty()) {
fofData = fofData.persist(StorageLevel.MEMORY_AND_DISK());
haloData = haloData.persist(StorageLevel.MEMORY_AND_DISK());
DataFrame processedParticlesFrame = sqlContext.createDataFrame(processedParticles, ProcessedParticle.class);
saveDataFrame(processedParticlesFrame, PARTITIONED_PARTICLES_TEMP_DIR, new String[] { "type" }, SaveMode.Overwrite);
Map<String, Integer> fofFieldIndexes = getIndexes(fofData.first().schema());
Broadcast<Map<String, Integer>> fofIndexes = jsc.broadcast(fofFieldIndexes);
long[] fofCounts = fofData.aggregate(new long[PARTICLE_TYPES_COUNT], (count, fof) -> {
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
count[i] = count[i] + fof.getLong(fofIndexes.getValue().get("npart_type_" + i));
}
return count;
}, (a, b) -> {
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
a[i] = a[i] + b[i];
}
return a;
});
int particleTypesCount = 0;
int particleType = -1;
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
if (fofCounts[i] > 0) {
processParticlesWithFofs(fofData, haloData, i);
} else {
processParticlesWithoutFofs(i);
if (offsets[i] > 0) {
particleTypesCount++;
particleType = i;
}
}
if (particleTypesCount > 1) {
fofData = fofData.persist(StorageLevel.MEMORY_AND_DISK());
haloData = haloData.persist(StorageLevel.MEMORY_AND_DISK());
DataFrame processedParticlesFrame = sqlContext.createDataFrame(processedParticles, ProcessedParticle.class);
saveDataFrame(processedParticlesFrame, PARTITIONED_PARTICLES_TEMP_DIR, new String[] { "type" }, SaveMode.Overwrite);
Map<String, Integer> fofFieldIndexes = getIndexes(fofData.first().schema());
Broadcast<Map<String, Integer>> fofIndexes = jsc.broadcast(fofFieldIndexes);
long[] fofCounts = fofData.aggregate(new long[PARTICLE_TYPES_COUNT], (count, fof) -> {
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
count[i] = count[i] + fof.getLong(fofIndexes.getValue().get("npart_type_" + i));
}
return count;
}, (a, b) -> {
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
a[i] = a[i] + b[i];
}
return a;
});
haloData.unpersist();
fofData.unpersist();
for (int i = 0; i < PARTICLE_TYPES_COUNT; i++) {
if (fofCounts[i] > 0) {
processParticlesWithFofs(fofData, haloData, i);
} else {
processParticlesWithoutFofs(i);
}
}
haloData.unpersist();
fofData.unpersist();
} else {
processParticlesWithFofs(fofData, haloData, processedParticles, particleType);
}
} else {
processParticlesWithoutFofs(processedParticles);
}
}
}
private void processParticlesWithFofs(JavaRDD<GenericRowWithSchema> fofData, JavaRDD<GenericRowWithSchema> haloData, final Integer processedType)
......@@ -182,7 +192,11 @@ public class ProcessParticles extends ProcessBase {
JavaRDD<GenericRowWithSchema> particles = tablesRegistrator.registerTable(outputDirectory, PARTITIONED_PARTICLES_TEMP_DIR, null,
"type=" + processedType);
JavaRDD<ProcessedParticle> processedParticles = tablesRegistrator.mapTable(ProcessedParticle.class, particles);
processParticlesWithFofs(fofData, haloData, processedParticles, processedType);
}
private void processParticlesWithFofs(JavaRDD<GenericRowWithSchema> fofData, JavaRDD<GenericRowWithSchema> haloData,
JavaRDD<ProcessedParticle> processedParticles, final Integer processedType) throws Exception {
int particleParallelism = tablesRegistrator.getParallelism(SparkTable.PARTICLE);
Broadcast<Double> boxSize = jsc.broadcast(this.boxSize);
Broadcast<Integer> partitionsCount = jsc.broadcast(this.partitionsCount);
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment