Skip to content

Commit

Permalink
#! Drop Map.Entry from Pair
Browse files Browse the repository at this point in the history
  • Loading branch information
Novotnik, Petr authored and David Moravek committed Oct 5, 2018
1 parent f236fa0 commit 3193136
Show file tree
Hide file tree
Showing 12 changed files with 22 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
import java.util.Objects;

/**
* Pair of any types. The pair has to fit in memory and is read-only.
* A pair, i.e. a tuple of two elements.
*
* @param <K> the type of the key - first element of the pair
* @param <V> the type of the value - second element of the pair
* @param <K> the type of the first element of the pair
* @param <V> the type of the second element of the pair
*/
public class Pair<K, V> implements java.util.Map.Entry<K, V> {
public class Pair<K, V> {

private static final Comparator<Pair> CMP_BY_FIRST =
(o1, o2) -> doCompare(o1.getFirst(), o2.getFirst());
Expand Down Expand Up @@ -109,20 +109,6 @@ public String toString() {
return "Pair{first='" + first + "', second='" + second + "'}";
}

// ~ Map.Entry implementation -----------------------------------------------------

@Override
public K getKey() { return first; }

@Override
public V getValue() { return second; }

/** Always throws {@link java.lang.UnsupportedOperationException}. */
@Override
public V setValue(V value) {
throw new UnsupportedOperationException("Read-only entry!");
}

@Override
public boolean equals(Object obj) {
if (obj == this) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public Map<String, String> getAll() {
.stream()
.filter(e -> e.getKey().startsWith(prefix))
.map(e -> Pair.of(e.getKey().substring(prefix.length()), e.getValue()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
.collect(Collectors.toMap(Pair::getFirst, Pair::getSecond));
}

public boolean contains(String key) {
Expand All @@ -108,7 +108,7 @@ public void setString(String key, String value) {

public String getString(String key, String def) {
String skey = skey(requireNonNull(key));
return map.containsKey(skey) ? map.get(skey) : def;
return map.getOrDefault(skey, def);
}

public String getString(String key) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ public void before() throws Exception {
.output();

Dataset<Pair<Object, Long>> output = Join.of(mapped, reduced)
.by(e -> e, Pair::getKey)
.by(e -> e, Pair::getFirst)
.using((Object l, Pair<Object, Long> r, Context<Long> c) -> c.collect(r.getSecond()))
.windowBy(Time.of(Duration.ofSeconds(1)))
.output();
Expand Down Expand Up @@ -160,7 +160,7 @@ public void testMultipleOutputsToSameSink() throws Exception {
.output();

Dataset<Pair<Object, Long>> output = Join.of(mapped, reduced)
.by(e -> e, Pair::getKey)
.by(e -> e, Pair::getFirst)
.using((Object l, Pair<Object, Long> r, Context<Long> c) -> {
c.collect(r.getSecond());
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ private static Flow buildFlow(Settings settings, URI input, URI output, int part
// format output
MapElements.named("FORMAT")
.of(counted)
.using(p -> p.getKey() + "\t" + p.getSecond())
.using(p -> p.getFirst() + "\t" + p.getSecond())
.output()
.persist(output);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ public DataSet translate(FlinkOperator<ReduceByKey> operator,
new PartitionerWrapper<>(origOperator.getPartitioning().getPartitioner()),
Utils.wrapQueryable(
(KeySelector<BatchElement<Window, Pair>, Comparable>)
(BatchElement<Window, Pair> we) -> (Comparable) we.getElement().getKey(),
(BatchElement<Window, Pair> we) -> (Comparable) we.getElement().getFirst(),
Comparable.class))
.setParallelism(operator.getParallelism());
}
Expand All @@ -145,7 +145,7 @@ static class RBKKeySelector
public Tuple2<Comparable, Comparable> getKey(
BatchElement<Window, Pair> value) {

return new Tuple2(value.getWindow(), value.getElement().getKey());
return new Tuple2(value.getWindow(), value.getElement().getFirst());
}
}

Expand All @@ -167,7 +167,7 @@ static class RBKReducer
wid,
Math.max(p1.getTimestamp(), p2.getTimestamp()),
Pair.of(
p1.getElement().getKey(),
p1.getElement().getFirst(),
reducer.apply(Arrays.asList(p1.getElement().getSecond(), p2.getElement().getSecond()))));
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public DataSet translate(FlinkOperator<ReduceStateByKey> operator,
origOperator.getPartitioning().getPartitioner()),
Utils.wrapQueryable(
(KeySelector<BatchElement<?, Pair>, Comparable>)
(BatchElement<?, Pair> we) -> (Comparable) we.getElement().getKey(),
(BatchElement<?, Pair> we) -> (Comparable) we.getElement().getFirst(),
Comparable.class))
.setParallelism(operator.getParallelism());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public DataStream<?> translate(FlinkOperator<ReduceStateByKey> operator,
if (!origOperator.getPartitioning().hasDefaultPartitioner()) {
reduced = reduced.partitionCustom(
new PartitionerWrapper<>(origOperator.getPartitioning().getPartitioner()),
p -> p.getElement().getKey());
p -> p.getElement().getFirst());
}

return reduced;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ public void testAttachedWindowNonCombining() throws Exception {
static <K, V> HashMap<K, V> toMap(Pair<K, V> ... ps) {
HashMap<K, V> m = new HashMap<>();
for (Pair<K, V> p : ps) {
m.put(p.getKey(), p.getValue());
m.put(p.getFirst(), p.getSecond());
}
return m;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public HadoopWriter(RecordWriter<K, V> hadoopWriter,
@Override
public void write(Pair<K, V> record) throws IOException {
try {
hadoopWriter.write(record.getKey(), record.getValue());
hadoopWriter.write(record.getFirst(), record.getSecond());
} catch (InterruptedException e) {
throw new IOException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -353,10 +353,10 @@ public void testWordCountBatch() throws Exception {
ImmutableMap<String, Pair<String, Long>> idx =
Maps.uniqueIndex(f.getOutput(0), Pair::getFirst);
assertEquals(4, idx.size());
assertEquals((long) idx.get("one").getValue(), 4L);
assertEquals((long) idx.get("two").getValue(), 3L);
assertEquals((long) idx.get("three").getValue(), 2L);
assertEquals((long) idx.get("four").getValue(), 1L);
assertEquals((long) idx.get("one").getSecond(), 4L);
assertEquals((long) idx.get("two").getSecond(), 3L);
assertEquals((long) idx.get("three").getSecond(), 2L);
assertEquals((long) idx.get("four").getSecond(), 1L);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ public ProducerWriter(Producer producer, String topicId, Integer partition) {
@Override
public void write(Pair<byte[], byte[]> elem) throws IOException {
final ProducerRecord r =
new ProducerRecord(topicId, partition, elem.getKey(), elem.getValue());
new ProducerRecord(topicId, partition, elem.getFirst(), elem.getSecond());
fs.addLast(producer.send(r));

// ~ try to consume already finished futures ... preventing the pool of futures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ public void validate(Partitions<WPair<Integer, Integer, Integer>> partitions) {
// map (window, key) -> list(data)
Map<Pair<Integer, Integer>, List<WPair<Integer, Integer, Integer>>>
windowKeyMap = first.stream()
.collect(Collectors.groupingBy(p -> Pair.of(p.getWindow(), p.getKey())));
.collect(Collectors.groupingBy(p -> Pair.of(p.getWindow(), p.getFirst())));

// two windows, three keys
assertEquals(6, windowKeyMap.size());
Expand Down

0 comments on commit 3193136

Please sign in to comment.