Commit 2d9d80c7 authored by Lukasz Waskiewicz's avatar Lukasz Waskiewicz
Browse files

refs #1376 korekta procesowania cząsteczek

parent 3dbc4f93
......@@ -145,101 +145,107 @@ public class ProcessParticles extends ProcessBase {
private void processParticlesWithFofs(JavaRDD<GenericRowWithSchema> fofData, JavaRDD<GenericRowWithSchema> haloData,
JavaRDD<ProcessedParticle> processedParticles) throws Exception {
processedParticles = processedParticles.persist(StorageLevel.MEMORY_AND_DISK());
int particleParallelism = tablesRegistrator.getParallelism(SparkTable.PARTICLE);
Broadcast<Double> boxSize = jsc.broadcast(this.boxSize);
Broadcast<Integer> partitionsCount = jsc.broadcast(this.partitionsCount);
JavaPairRDD<Long, ProcessedParticle> particlesByOrdinal = keyBy(processedParticles, p -> p.getOrdinal(), "id");
Map<String, Integer> fofFieldIndexes = getIndexes(fofData.first().schema());
Broadcast<Map<String, Integer>> fofIndexes = jsc.broadcast(fofFieldIndexes);
JavaPairRDD<Long, Long> fofParticleCounts = mapToPair(fofData, row -> {
final GenericRowWithSchema fof = (GenericRowWithSchema) row;
Long id = fof.getLong(fofIndexes.getValue().get("fof_id"));
Long particlesCount = fof.getLong(fofIndexes.getValue().get("npart"));
return new Tuple2<>(id, particlesCount);
});
JavaPairRDD<Long, Iterable<Tuple2<Long, Long>>> groupedFofs = fofParticleCounts.groupBy(t -> (long) (t._1 % 1e12 / 5e3));
groupedFofs = groupedFofs.persist(StorageLevel.MEMORY_AND_DISK());
JavaPairRDD<Long, Long> groups = groupedFofs.mapValues(fofs -> {
long count = 0;
for (Tuple2<Long, Long> fof : fofs) {
count += fof._2;
}
return count;
});
Map<Long, Long> groupStart = new HashMap<Long, Long>();
long sumCount = 0;
for (Tuple2<Long, Long> groupCount : groups.sortByKey().collect()) {
groupStart.put(groupCount._1, sumCount);
sumCount += groupCount._2;
}
Broadcast<Map<Long, Long>> groupStartBroadcast = jsc.broadcast(groupStart);
JavaPairRDD<Long, Tuple2<Long, Long>> fofOffsets = groupedFofs.flatMapToPair(fofGroup -> {
List<Tuple2<Long, Tuple2<Long, Long>>> localFofOffsets = new ArrayList<>();
long fofOffset = groupStartBroadcast.getValue().get(fofGroup._1);
List<Tuple2<Long, Long>> sortedGroups = FluentIterable.from(fofGroup._2).toSortedList((a, b) -> a._1().compareTo(b._1()));
for (Tuple2<Long, Long> fof : sortedGroups) {
localFofOffsets.add(new Tuple2<>(fof._1, new Tuple2<>(fofOffset, fof._2)));
fofOffset += fof._2;
for (int type = 0; type < 6; type++) {
final int processedType = type;
JavaRDD<ProcessedParticle> filteredParticles = filter(processedParticles, p -> p.getType().equals(processedType));
JavaPairRDD<Long, ProcessedParticle> particlesByOrdinal = keyBy(filteredParticles, p -> p.getOrdinal(), "id");
Map<String, Integer> fofFieldIndexes = getIndexes(fofData.first().schema());
Broadcast<Map<String, Integer>> fofIndexes = jsc.broadcast(fofFieldIndexes);
JavaPairRDD<Long, Long> fofParticleCounts = mapToPair(fofData, row -> {
final GenericRowWithSchema fof = (GenericRowWithSchema) row;
Long id = fof.getLong(fofIndexes.getValue().get("fof_id"));
Long particlesCount = fof.getLong(fofIndexes.getValue().get("npart_type_"+processedType));
return new Tuple2<>(id, particlesCount);
});
JavaPairRDD<Long, Iterable<Tuple2<Long, Long>>> groupedFofs = fofParticleCounts.groupBy(t -> (long) (t._1 % 1e12 / 5e3));
groupedFofs = groupedFofs.persist(StorageLevel.MEMORY_AND_DISK());
JavaPairRDD<Long, Long> groups = groupedFofs.mapValues(fofs -> {
long count = 0;
for (Tuple2<Long, Long> fof : fofs) {
count += fof._2;
}
return count;
});
Map<Long, Long> groupStart = new HashMap<Long, Long>();
long sumCount = 0;
for (Tuple2<Long, Long> groupCount : groups.sortByKey().collect()) {
groupStart.put(groupCount._1, sumCount);
sumCount += groupCount._2;
}
return localFofOffsets;
});
Map<String, Integer> haloFieldIndexes = getIndexes(haloData.first().schema());
Broadcast<Map<String, Integer>> haloIndexes = jsc.broadcast(haloFieldIndexes);
JavaPairRDD<Long, Tuple2<Long, Long>> haloParticleCounts = mapToPair(haloData, row -> {
final GenericRowWithSchema halo = (GenericRowWithSchema) row;
Long id = halo.getLong(haloIndexes.getValue().get("subhalo_id"));
Long fofId = halo.getLong(haloIndexes.getValue().get("fof_id"));
Long particlesCount = halo.getLong(haloIndexes.getValue().get("npart"));
return new Tuple2<>(fofId, new Tuple2<>(id, particlesCount));
});
JavaRDD<Tuple4<Long, Long, Long, Long>> fofs = fofOffsets.leftOuterJoin(haloParticleCounts.groupByKey()).flatMap(tuple -> {
Long groupStartNpart = tuple._2._1._1;
List<Tuple4<Long, Long, Long, Long>> result = new ArrayList<>();
long currentOffset = groupStartNpart;
Optional<Iterable<Tuple2<Long, Long>>> fofHalos = tuple._2._2;
long fofOffset = tuple._2._1._2;
Long nextOffset = currentOffset;
if (fofHalos.isPresent()) {
List<Tuple2<Long, Long>> subhalos = FluentIterable.from(fofHalos.get()).toSortedList((a, b) -> a._1().compareTo(b._1()));
for (Tuple2<Long, Long> halo : subhalos) {
nextOffset += halo._2;
fofOffset -= halo._2;
Tuple4<Long, Long, Long, Long> haloCounts = new Tuple4<>(tuple._1, halo._1, currentOffset, nextOffset);
Broadcast<Map<Long, Long>> groupStartBroadcast = jsc.broadcast(groupStart);
JavaPairRDD<Long, Tuple2<Long, Long>> fofOffsets = groupedFofs.flatMapToPair(fofGroup -> {
List<Tuple2<Long, Tuple2<Long, Long>>> localFofOffsets = new ArrayList<>();
long fofOffset = groupStartBroadcast.getValue().get(fofGroup._1);
List<Tuple2<Long, Long>> sortedGroups = FluentIterable.from(fofGroup._2).toSortedList((a, b) -> a._1().compareTo(b._1()));
for (Tuple2<Long, Long> fof : sortedGroups) {
localFofOffsets.add(new Tuple2<>(fof._1, new Tuple2<>(fofOffset, fof._2)));
fofOffset += fof._2;
}
return localFofOffsets;
});
Map<String, Integer> haloFieldIndexes = getIndexes(haloData.first().schema());
Broadcast<Map<String, Integer>> haloIndexes = jsc.broadcast(haloFieldIndexes);
JavaPairRDD<Long, Tuple2<Long, Long>> haloParticleCounts = mapToPair(haloData, row -> {
final GenericRowWithSchema halo = (GenericRowWithSchema) row;
Long id = halo.getLong(haloIndexes.getValue().get("subhalo_id"));
Long fofId = halo.getLong(haloIndexes.getValue().get("fof_id"));
Long particlesCount = halo.getLong(haloIndexes.getValue().get("npart_type_"+processedType));
return new Tuple2<>(fofId, new Tuple2<>(id, particlesCount));
});
JavaRDD<Tuple4<Long, Long, Long, Long>> fofs = fofOffsets.leftOuterJoin(haloParticleCounts.groupByKey()).flatMap(tuple -> {
Long groupStartNpart = tuple._2._1._1;
List<Tuple4<Long, Long, Long, Long>> result = new ArrayList<>();
long currentOffset = groupStartNpart;
Optional<Iterable<Tuple2<Long, Long>>> fofHalos = tuple._2._2;
long fofOffset = tuple._2._1._2;
Long nextOffset = currentOffset;
if (fofHalos.isPresent()) {
List<Tuple2<Long, Long>> subhalos = FluentIterable.from(fofHalos.get()).toSortedList((a, b) -> a._1().compareTo(b._1()));
for (Tuple2<Long, Long> halo : subhalos) {
nextOffset += halo._2;
fofOffset -= halo._2;
Tuple4<Long, Long, Long, Long> haloCounts = new Tuple4<>(tuple._1, halo._1, currentOffset, nextOffset);
result.add(haloCounts);
currentOffset = nextOffset;
}
}
if (fofOffset > 0) {
nextOffset += fofOffset;
Tuple4<Long, Long, Long, Long> haloCounts = new Tuple4<>(tuple._1, null, currentOffset, nextOffset);
result.add(haloCounts);
currentOffset = nextOffset;
}
}
if (fofOffset > 0) {
nextOffset += fofOffset;
Tuple4<Long, Long, Long, Long> haloCounts = new Tuple4<>(tuple._1, null, currentOffset, nextOffset);
result.add(haloCounts);
}
return result;
});
JavaPairRDD<Long, Tuple2<Long, Long>> ordinalFofs = flatMapToPair(fofs, tuple -> {
List<Tuple2<Long, Tuple2<Long, Long>>> particlesToFofs = new ArrayList<>();
for (long i = tuple._3(); i < tuple._4(); i++) {
particlesToFofs.add(new Tuple2<>(i, new Tuple2<>(tuple._1(), tuple._2())));
}
return particlesToFofs;
});
JavaPairRDD<Long, Tuple2<ProcessedParticle, Optional<Tuple2<Long, Long>>>> particlesWithHalos = leftOuterJoin(particlesByOrdinal, ordinalFofs,
particleParallelism);
JavaRDD<pl.edu.icm.cocos.spark.job.model.output.Particle> result = map(particlesWithHalos, data -> {
ProcessedParticle particle = data._2._1;
Long fofId = data._2._2.transform(x -> x._1).orNull();
Long haloId = data._2._2.isPresent() ? data._2._2.get()._2 : null;
int boxIndex = BoxUtils.calculateBoxIndex(particle.getPos_x(), particle.getPos_y(), particle.getPos_z(), boxSize.getValue(),
partitionsCount.getValue());
return new pl.edu.icm.cocos.spark.job.model.output.Particle(particle, fofId, haloId, boxIndex, particle.getType());
});
saveData(result);
groupedFofs.unpersist();
return result;
});
JavaPairRDD<Long, Tuple2<Long, Long>> ordinalFofs = flatMapToPair(fofs, tuple -> {
List<Tuple2<Long, Tuple2<Long, Long>>> particlesToFofs = new ArrayList<>();
for (long i = tuple._3(); i < tuple._4(); i++) {
particlesToFofs.add(new Tuple2<>(i, new Tuple2<>(tuple._1(), tuple._2())));
}
return particlesToFofs;
});
JavaPairRDD<Long, Tuple2<ProcessedParticle, Optional<Tuple2<Long, Long>>>> particlesWithHalos = leftOuterJoin(particlesByOrdinal, ordinalFofs,
particleParallelism);
JavaRDD<pl.edu.icm.cocos.spark.job.model.output.Particle> result = map(particlesWithHalos, data -> {
ProcessedParticle particle = data._2._1;
Long fofId = data._2._2.transform(x -> x._1).orNull();
Long haloId = data._2._2.isPresent() ? data._2._2.get()._2 : null;
int boxIndex = BoxUtils.calculateBoxIndex(particle.getPos_x(), particle.getPos_y(), particle.getPos_z(), boxSize.getValue(),
partitionsCount.getValue());
return new pl.edu.icm.cocos.spark.job.model.output.Particle(particle, fofId, haloId, boxIndex, particle.getType());
});
saveData(result);
groupedFofs.unpersist();
}
processedParticles.unpersist();
}
private void processParticlesWithoutFofs(JavaRDD<ProcessedParticle> processedParticles) throws Exception {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment