Commit f2cf1146 authored by Lukasz Waskiewicz's avatar Lukasz Waskiewicz
Browse files

refs #1376 korekta procesowania cząsteczek

parent 2d9d80c7
......@@ -90,9 +90,11 @@ public class ProcessParticles extends ProcessBase {
@SuppressWarnings("unchecked")
private void processParticlesSnapshot(long snapshotId) throws Exception {
JavaRDD<GenericRowWithSchema> fofData = tablesRegistrator.registerTable(outputDirectory, ImpalaTable.FOF.getName(),
Arrays.asList("snapshot_id", "fof_id", "npart"), "snapshot_id=" + snapshotId);
JavaRDD<GenericRowWithSchema> haloData = tablesRegistrator.registerTable(outputDirectory, ImpalaTable.SUBHALO.getName(),
Arrays.asList("snapshot_id", "subhalo_id", "fof_id", "npart"), "snapshot_id=" + snapshotId);
Arrays.asList("snapshot_id", "fof_id", "npart_type_0", "npart_type_1", "npart_type_2", "npart_type_3", "npart_type_4", "npart_type_5"),
"snapshot_id=" + snapshotId);
JavaRDD<GenericRowWithSchema> haloData = tablesRegistrator.registerTable(outputDirectory, ImpalaTable.SUBHALO.getName(), Arrays.asList("snapshot_id",
"subhalo_id", "fof_id", "npart_type_0", "npart_type_1", "npart_type_2", "npart_type_3", "npart_type_4", "npart_type_5"),
"snapshot_id=" + snapshotId);
Broadcast<Long> snapshotIdValue = jsc.broadcast(snapshotId);
JavaRDD<GadgetSnapshotFileMetadata> snapshots = (JavaRDD<GadgetSnapshotFileMetadata>) rdds.get(SparkTable.SNAPSHOT_FILEMETADATA);
......@@ -131,7 +133,7 @@ public class ProcessParticles extends ProcessBase {
fileOrd -= nPartType;
type++;
}
ProcessedParticle processedParticle = new ProcessedParticle(v, startValue, type);
return processedParticle;
});
......@@ -150,7 +152,7 @@ public class ProcessParticles extends ProcessBase {
Broadcast<Double> boxSize = jsc.broadcast(this.boxSize);
Broadcast<Integer> partitionsCount = jsc.broadcast(this.partitionsCount);
for (int type = 0; type < 6; type++) {
final int processedType = type;
final int processedType = type;
JavaRDD<ProcessedParticle> filteredParticles = filter(processedParticles, p -> p.getType().equals(processedType));
JavaPairRDD<Long, ProcessedParticle> particlesByOrdinal = keyBy(filteredParticles, p -> p.getOrdinal(), "id");
Map<String, Integer> fofFieldIndexes = getIndexes(fofData.first().schema());
......@@ -158,7 +160,7 @@ public class ProcessParticles extends ProcessBase {
JavaPairRDD<Long, Long> fofParticleCounts = mapToPair(fofData, row -> {
final GenericRowWithSchema fof = (GenericRowWithSchema) row;
Long id = fof.getLong(fofIndexes.getValue().get("fof_id"));
Long particlesCount = fof.getLong(fofIndexes.getValue().get("npart_type_"+processedType));
Long particlesCount = fof.getLong(fofIndexes.getValue().get("npart_type_" + processedType));
return new Tuple2<>(id, particlesCount);
});
JavaPairRDD<Long, Iterable<Tuple2<Long, Long>>> groupedFofs = fofParticleCounts.groupBy(t -> (long) (t._1 % 1e12 / 5e3));
......@@ -195,7 +197,7 @@ public class ProcessParticles extends ProcessBase {
final GenericRowWithSchema halo = (GenericRowWithSchema) row;
Long id = halo.getLong(haloIndexes.getValue().get("subhalo_id"));
Long fofId = halo.getLong(haloIndexes.getValue().get("fof_id"));
Long particlesCount = halo.getLong(haloIndexes.getValue().get("npart_type_"+processedType));
Long particlesCount = halo.getLong(haloIndexes.getValue().get("npart_type_" + processedType));
return new Tuple2<>(fofId, new Tuple2<>(id, particlesCount));
});
......@@ -223,7 +225,7 @@ public class ProcessParticles extends ProcessBase {
}
return result;
});
JavaPairRDD<Long, Tuple2<Long, Long>> ordinalFofs = flatMapToPair(fofs, tuple -> {
List<Tuple2<Long, Tuple2<Long, Long>>> particlesToFofs = new ArrayList<>();
for (long i = tuple._3(); i < tuple._4(); i++) {
......@@ -233,7 +235,7 @@ public class ProcessParticles extends ProcessBase {
});
JavaPairRDD<Long, Tuple2<ProcessedParticle, Optional<Tuple2<Long, Long>>>> particlesWithHalos = leftOuterJoin(particlesByOrdinal, ordinalFofs,
particleParallelism);
JavaRDD<pl.edu.icm.cocos.spark.job.model.output.Particle> result = map(particlesWithHalos, data -> {
ProcessedParticle particle = data._2._1;
Long fofId = data._2._2.transform(x -> x._1).orNull();
......@@ -299,24 +301,24 @@ public class ProcessParticles extends ProcessBase {
for (int i = 0; i < fields.length; i++) {
StructField field = fields[i];
switch (field.name()) {
case "box_size":
data.setBox_size(row.getDouble(i));
continue;
case "hubble_param":
data.setHubble_param(row.getDouble(i));
continue;
case "omega_0":
data.setOmega_0(row.getDouble(i));
continue;
case "omega_lambda":
data.setOmega_lambda(row.getDouble(i));
continue;
case "n_fof":
data.setN_fof(row.getLong(i));
continue;
case "n_sub":
data.setN_sub(row.getLong(i));
continue;
case "box_size":
data.setBox_size(row.getDouble(i));
continue;
case "hubble_param":
data.setHubble_param(row.getDouble(i));
continue;
case "omega_0":
data.setOmega_0(row.getDouble(i));
continue;
case "omega_lambda":
data.setOmega_lambda(row.getDouble(i));
continue;
case "n_fof":
data.setN_fof(row.getLong(i));
continue;
case "n_sub":
data.setN_sub(row.getLong(i));
continue;
}
}
return data;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment