-
Notifications
You must be signed in to change notification settings - Fork 135
/
Copy pathplan.rs
1010 lines (943 loc) · 47.8 KB
/
plan.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#![deny(
clippy::unwrap_used,
clippy::expect_used,
clippy::indexing_slicing,
clippy::panic,
clippy::unimplemented,
clippy::unreachable
)]
use std::cmp::Ordering;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::convert::TryFrom;
use dataflow::payload::{ReplayPathSegment, SourceSelection, TriggerEndpoint};
use dataflow::prelude::*;
use dataflow::DomainRequest;
use readyset_errors::ReadySetError;
use tracing::{debug, instrument, trace};
use vec1::Vec1;
use crate::controller::keys::{self, IndexRef, RawReplayPath};
use crate::controller::migrate::DomainMigrationPlan;
use crate::controller::state::graphviz;
/// A struct representing all the information required to construct and maintain the
/// materializations for a single node within a dataflow graph.
///
/// A [`Plan`] is [constructed] for a single node with references to a dataflow [`Graph`] and the
/// [`Materializations`] in that graph, and over the course of its existence mutates that set of
/// materializations and also adds new messages to a [`DomainMigrationPlan`] to inform domains
/// throughout the graph of information they need to know about maintaining the materializations of
/// that node, including but not limited to:
///
/// - Information about replay paths that originate at the node (both for the domain containing the
/// node, and the domain containing nodes along those replay paths)
/// - Information about [columns that are generated][generated-cols] by the node
/// - Informing [egress nodes][] about tags of their targets
/// - Informing domains about indices they need to create for materializations
///
/// [constructed]: Plan::new
/// [generated-cols]: ColumnSource::GeneratedFromColumns
/// [egress nodes]: dataflow::node::special::Egress
pub(super) struct Plan<'a> {
/// A reference to the materializations in the graph. Only mutated to generate new [`Tag`]s for
/// replay paths
m: &'a mut super::Materializations,
graph: &'a Graph,
node: NodeIndex,
dmp: &'a mut DomainMigrationPlan,
partial: bool,
/// Map from indexes we're adding to this node, to the list of tags which identify those
/// indexes
indexes: HashMap<Index, Vec<Tag>>,
/// Indexes in *parent* nodes for extended replay paths we've already added
///
/// Used to prevent adding the same replay path for an extended replay path twice, for example
/// if two different downstream sets of columns remap to the same set of columns in the parent
parent_indexes: HashMap<NodeIndex, HashSet<Index>>,
/// New paths added in this run of the planner.
paths: HashMap<Tag, Vec<NodeIndex>>,
/// Paths that already exist for this node.
old_paths: HashMap<Tag, Vec<NodeIndex>>,
pending: Vec<PendingReplay>,
}
#[derive(Debug)]
pub(super) struct PendingReplay {
pub(super) tag: Tag,
pub(super) source: LocalNodeIndex,
pub(super) source_domain: DomainIndex,
}
impl<'a> Plan<'a> {
pub(super) fn new(
m: &'a mut super::Materializations,
graph: &'a Graph,
node: NodeIndex,
dmp: &'a mut DomainMigrationPlan,
) -> Plan<'a> {
let partial = m.partial.contains(&node);
let old_paths = m
.paths
.entry(node)
.or_insert_with(|| HashMap::new())
.clone();
Plan {
m,
graph,
node,
dmp,
partial,
indexes: Default::default(),
parent_indexes: Default::default(),
paths: Default::default(),
old_paths,
pending: Vec::new(),
}
}
/// Compute the set of replay paths required to construct and maintain the given `index` in our
/// node.
///
/// Note that if passed an index for a set of generated columns, this may return paths targeting
/// a different index than the passed `index`.
fn paths(&self, index: &Index) -> Result<Vec<RawReplayPath>, ReadySetError> {
let graph = self.graph;
let ni = self.node;
let mut paths = keys::replay_paths_for_opt(
graph,
IndexRef {
node: ni,
index: if self.partial {
Some(index.clone())
} else {
None
},
},
|stop_ni| {
stop_ni != ni
&& self
.m
.have
.get(&stop_ni)
.map(|x| !x.is_empty())
.unwrap_or(false)
},
)?
.into_iter()
.collect::<Vec<_>>();
// don't include paths that don't end at this node.
// TODO(grfn): is this necessary anymore? I don't think so
paths.retain(|x| x.last_segment().node == ni);
// since we cut off part of each path, we *may* now have multiple paths that are the same
// (i.e., if there was a union above the nearest materialization). this would be bad, as it
// would cause a domain to request replays *twice* for a key from one view!
//
// As for this `sort_by`, story time!
// Imagine you had two unions in a diamond shape:
// a -> b -> u_1 -> d -> u_2 -> f
// -> c -> -> e ->
//
// There are going to be four different paths:
// a -> b -> u_1 -> d -> u_2 -> f
// a -> b -> u_1 -> e -> u_2 -> f
// a -> c -> u_1 -> d -> u_2 -> f
// a -> c -> u_1 -> e -> u_2 -> f
//
// For the sake of making this easier to understand, let's represent those paths based only
// on the node they choose at each bifurcation:
// 1. (b, d)
// 2. (b, e)
// 3. (c, d)
// 4. (c, e)
//
// Now, before this `sort_by` was introduced, the paths were sorted like that.
// The problem is that unions, when receiving replay messages, wait until they receive all
// of them to release the full replay.
// That waiting (buffering) mechanism relies solely on how many messages they get.
// Once they get as many messages as ancestors they have, they release the full replay.
//
// Now, following the example, this is what happens:
// 1. u_1 receives message from b, and buffers it
// 2. u_1 receives message from b (which will be the same as before), now it has two
// messages, so it releases it. WRONG.
//
// So, the paths must be sorted based on the node indexes, but backwards
// 1. (b, d)
// 2. (c, d)
// 3. (b, e)
// 4. (c, e)
//
// That's what we are doing here.
// TODO(fran): Unless there's a better fix that I can't picture right now, this is still
// less than ideal imho (very prone to errors).
// A better fix would require a redesign of how we handle replay paths in the presence of
// unions, since even with this fix, we still generate paths that will be traversed more
// than once (in the example, we go through b and c twice).
paths.sort_by(|a, b| {
if a.len() != b.len() {
a.cmp(b)
} else {
a.segments()
.iter()
.rev()
.zip(b.segments().iter().rev())
.fold(Ordering::Equal, |acc, (item_a, item_b)| {
acc.then(item_a.node.cmp(&item_b.node))
})
}
});
paths.dedup();
// all columns better resolve if we're doing partial
if self.partial
&& !paths
.iter()
.all(|p| p.segments().iter().all(|cr| cr.index.is_some()))
{
internal!("tried to be partial over replay paths that require full materialization: paths = {:?}", paths);
}
Ok(paths)
}
/// Finds the appropriate replay paths for the given index, and inform all domains on those
/// paths about them. It also notes if any data backfills will need to be run, which is
/// eventually reported back by `finalize`.
#[allow(clippy::cognitive_complexity, clippy::unreachable)]
#[instrument(level = "debug", "index", skip(self), fields(node = ?self.node))]
pub(super) fn add(&mut self, index_on: Index) -> Result<(), ReadySetError> {
// if we are recovering, we must build the paths again. Otherwise
// if we're full and we already have some paths added... (either this run, or from previous
// runs)
if !self.m.pending_recovery
&& !self.partial
&& (!self.paths.is_empty() || !self.old_paths.is_empty())
{
// ...don't add any more replay paths, because...
// non-partial views should not have one replay path per index. that would cause us to
// replay several times, even though one full replay should always be sufficient.
// we do need to keep track of the fact that there should be an index here though.
self.indexes.entry(index_on).or_default();
return Ok(());
}
let mut paths = self.paths(&index_on)?;
// Discard paths for indices we already have.
//
// We do this both because we generally want to be as idempotent as possible, and (perhaps
// more importantly) in case we get passed two subsequent indexes for generated columns that
// happen to remap to the same set of columns upstream
paths.retain(|p| {
if p.has_extension() {
p.target().index.iter().all(|idx| {
self.parent_indexes
.get(&p.target().node)
.map_or(true, |idxs| !idxs.contains(idx))
})
} else {
p.target()
.index
.iter()
.all(|idx| !self.indexes.contains_key(idx))
}
});
if paths.is_empty() {
// If we aren't making any replay paths for this index, we *do* still need to make sure
// the node actually has the index. This gets hit if the node has generated columns,
// since in that case we make an index for the target of the downstream replay path, and
// an index for the source of the upstream path. If the second one gets `add`ed first,
// it won't create the index, since the actual target index of the replay is different
// than the one that the downstream replay path wants to do a lookup into.
self.indexes.entry(index_on).or_default();
return Ok(());
}
invariant!(
paths.iter().skip(1).all(|p| {
#[allow(clippy::indexing_slicing)] // just checked paths isn't empty
{
p.last_segment().index == paths[0].last_segment().index
}
}),
"All paths should have the same index"
);
#[allow(clippy::unwrap_used)] // paths can't be empty
let target_index = if let Some(idx) = paths.first().unwrap().target().index.clone() {
// This might be different than `index_on` if this replay path is for a generated set of
// columns
idx
} else {
#[allow(clippy::redundant_clone)] // clippy bug!
index_on.clone()
};
// all right, story time!
//
// image you have this graph:
//
// a b
// +--+--+
// |
// u_1
// |
// +--+--+
// c d
// +--+--+
// |
// u_2
// |
// +--+--+
// e f
// +--+--+
// |
// u_3
// |
// v
//
// where c-f are all stateless. you will end up with 8 paths for replays to v.
// a and b will both appear as the root of 4 paths, and will be upqueried that many times.
// while inefficient (TODO), that is not in and of itself a problem. the issue arises at
// the unions, which need to do union buffering (that is, they need to forward _one_
// upquery response for each set of upquery responses they get). specifically, u_1 should
// forward 4 responses, even though it receives 8. u_2 should forward 2 responses, even
// though it gets 4, etc. we may later optimize that (in theory u_1 should be able to only
// forward _one_ response to multiple children, and a and b should only be upqueried
// _once_), but for now we need to deal with the correctness issue that arises if the
// unions do not buffer correctly.
//
// the issue, ultimately, is what the unions "group" upquery responses by. they can't group
// by tag (like shard mergers do), since there are 8 tags here, so there'd be 8 groups each
// with one response. here are the replay paths for u_1:
//
// 1. a -> c -> e
// 2. a -> c -> f
// 3. a -> d -> e
// 4. a -> d -> f
// 5. b -> c -> e
// 6. b -> c -> f
// 7. b -> d -> e
// 8. b -> d -> f
//
// we want to merge 1 with 5 since they're "going the same way". similarly, we want to
// merge 2 and 6, 3 and 7, and 4 and 8. the "grouping" here then is really the suffix of
// the replay's path beyond the union we're looking at. for u_2:
//
// 1/5. a/b -> c -> e
// 2/6. a/b -> c -> f
// 3/7. a/b -> d -> e
// 4/8. a/b -> d -> f
//
// we want to merge 1/5 and 3/7, again since they are going the same way _from here_.
// and similarly, we want to merge 2/6 and 4/8.
//
// so, how do we communicate this grouping to each of the unions?
// well, most of the infrastructure is actually already there in the domains.
// for each tag, each domain keeps some per-node state (`ReplayPathSegment`).
// we can inject the information there!
//
// we're actually going to play an additional trick here, as it allows us to simplify the
// implementation a fair amount. since we know that tags 1 and 5 are identical beyond u_1
// (that's what we're grouping by after all!), why don't we just rewrite all 1 tags to 5s?
// and all 2s to 6s, and so on. that way, at u_2, there will be no replays with tag 1 or 3,
// only 5 and 7. then we can pull the same trick there -- rewrite all 5s to 7s, so that at
// u_3 we only need to deal with 7s (and 8s). this simplifies the implementation since
// unions can now _always_ just group by tags, and it'll just magically work.
//
// this approach also gives us the property that we have a deterministic subset of the tags
// (and of strictly decreasing cardinality!) tags downstream of unions. this may (?)
// improve cache locality, but could perhaps also allow further optimizations later (?).
// find all paths through each union with the same suffix
let assigned_tags: Vec<_> = paths.iter().map(|_| self.m.next_tag()).collect();
let union_suffixes = paths
.iter()
.enumerate()
.flat_map(|(pi, path)| {
let graph = &self.graph;
path.segments().iter().enumerate().filter_map(
move |(at, &IndexRef { node, .. })| {
#[allow(clippy::indexing_slicing)] // replay paths contain valid indices
let n = &graph[node];
if n.is_union() && !n.is_shard_merger() {
let suffix = match path.segments().get((at + 1)..) {
Some(x) => x,
None => {
// FIXME(eta): would like to return a proper internal!() here
return None;
}
};
Some(((node, suffix), pi))
} else {
None
}
},
)
})
.fold(BTreeMap::new(), |mut map, (key, pi)| {
map.entry(key).or_insert_with(Vec::new).push(pi);
map
});
// map each suffix-sharing group of paths at each union to one tag at that union
let path_grouping: HashMap<_, _> = union_suffixes
.into_iter()
.flat_map(|((union, _suffix), paths)| {
// at this union, all the given paths share a suffix
// make all of the paths use a single identifier from that point on
// paths contains a set of `pi` from above, which are generated from
// `paths.iter().enumerate()` we have one assigned_tag for each `pi`
// by construction.
#[allow(clippy::indexing_slicing)]
let tag_all_as = assigned_tags[paths[0]];
paths.into_iter().map(move |pi| ((union, pi), tag_all_as))
})
.collect();
// inform domains about replay paths
for (pi, path) in paths.into_iter().enumerate() {
// paths contains a set of `pi` from above, which are generated from
// `paths.iter().enumerate()` we have one assigned_tag for each `pi` by
// construction.
#[allow(clippy::indexing_slicing)]
let tag = assigned_tags[pi];
// TODO(eta): figure out a way to check partial replay path idempotency
self.paths.insert(
tag,
path.segments()
.iter()
.map(|&IndexRef { node, .. }| node)
.collect(),
);
if path.has_extension() {
if let Some(index) = path.target().index.clone() {
self.parent_indexes
.entry(path.target().node)
.or_default()
.insert(index);
}
self.indexes.entry(index_on.clone()).or_default().push(tag);
} else {
self.indexes
.entry(target_index.clone())
.or_default()
.push(tag);
}
// what index are we using for partial materialization (if any)?
let mut partial: Option<Index> = None;
#[allow(clippy::unwrap_used)] // paths for partial indices must always be partial
if self.partial {
partial = Some(path.source().index.clone().unwrap());
}
// if this is a partial replay path, and the target node is sharded, then we need to
// make sure that the last sharder on the path knows to only send the replay response
// to the requesting shard, as opposed to all shards. in order to do that, that sharder
// needs to know who it is!
let mut partial_unicast_sharder = None;
#[allow(clippy::indexing_slicing)] // paths contain valid node indices
if partial.is_some() && !self.graph[path.target().node].sharded_by().is_none() {
partial_unicast_sharder = path
.segments()
.iter()
.rev()
.map(|&IndexRef { node, .. }| node)
.find(|&ni| self.graph[ni].is_sharder());
}
// first, find out which domains we are crossing
let mut segments: Vec<(
DomainIndex,
Vec<(NodeIndex, Option<Index>, /* is_target: */ bool)>,
)> = Vec::new();
let mut last_domain = None;
for (i, IndexRef { node, index }) in
path.segments_with_extension().iter().cloned().enumerate()
{
#[allow(clippy::indexing_slicing)] // paths contain valid node indices
let domain = self.graph[node].domain();
#[allow(clippy::unwrap_used)]
if last_domain.is_none() || domain != last_domain.unwrap() {
segments.push((domain, Vec::new()));
last_domain = Some(domain);
}
invariant!(!segments.is_empty());
#[allow(clippy::unwrap_used)] // checked by invariant!()
segments.last_mut().unwrap().1.push((
node,
index,
/* is_target = */ i == path.target_index(),
));
}
invariant!(!segments.is_empty());
debug!(%tag, "domain replay path is {:?}", segments);
// tell all the domains about their segment of this replay path
let mut pending = None;
let mut seen = HashSet::new();
for (i, &(domain, ref nodes)) in segments.iter().enumerate() {
invariant!(!nodes.is_empty());
// TODO:
// a domain may appear multiple times in this list if a path crosses into the same
// domain more than once. currently, that will cause a deadlock.
if seen.contains(&domain) {
trace!("{}", graphviz(self.graph, true, None, self.m, None));
internal!("detected A-B-A domain replay path");
}
seen.insert(domain);
if i == 0 {
// check to see if the column index is generated; if so, inform the domain
#[allow(clippy::unwrap_used)] // checked by invariant!()
let first = nodes.first().unwrap();
if let Some(ref index) = first.1 {
#[allow(clippy::indexing_slicing)] // replay paths contain valid nodes
if self.graph[first.0].is_internal() {
if let ColumnSource::GeneratedFromColumns(generated_from) =
self.graph[first.0].column_source(&index.columns)
{
debug!(
domain = %domain.index(),
?tag,
on_node = %first.0.index(),
generated_columns = ?index.columns,
?generated_from,
"telling domain about generated columns",
);
#[allow(clippy::indexing_slicing)]
// replay paths contain valid nodes
self.dmp.add_message(
domain,
DomainRequest::GeneratedColumns {
node: self.graph[first.0].local_addr(),
index: index.clone(),
tag,
},
)?;
}
}
}
}
// we're not replaying through the starter node
let skip_first = usize::from(i == 0);
// use the local index for each node
#[allow(clippy::indexing_slicing)] // replay paths contain valid nodes
let locals: Vec<_> = nodes
.iter()
.skip(skip_first)
.map(|&(ni, ref key, is_target)| ReplayPathSegment {
node: self.graph[ni].local_addr(),
partial_index: key.clone(),
force_tag_to: path_grouping.get(&(ni, pi)).copied(),
is_target,
})
.collect();
// the first domain in the chain may *only* have the source node
// in which case it doesn't need to know about the path
let locals = if let Ok(locals) = Vec1::try_from(locals) {
locals
} else {
invariant_eq!(i, 0);
continue;
};
#[allow(clippy::expect_used)]
let our_replicas = self
.dmp
.num_replicas(domain)
.expect("Domain should exist at this point");
let source_replicas = segments
.get(0)
.and_then(|(source_domain, _)| self.dmp.num_replicas(*source_domain).ok());
let replica_fanout = match source_replicas {
Some(source_replicas) if source_replicas == our_replicas => {
// Same number of replicas, no fanount
false
}
Some(source_replicas) => {
invariant_eq!(
source_replicas,
1,
"Can't currently go from replicated n-ways to replicated m-ways"
);
// Replicating 1-way -> replicating n-ways
true
}
None => {
// Since we iterate in topological order, this shouldn't happen (we should
// have set the number of replicas for the domain by
// now)
internal!("Could not find replicas at source of replay path")
}
};
// build the message we send to this domain to tell it about this replay path.
let mut setup = DomainRequest::SetupReplayPath {
tag,
source: None,
source_index: path.source().index.clone(),
path: locals,
notify_done: false,
partial_unicast_sharder,
trigger: TriggerEndpoint::None,
replica_fanout,
};
// the first domain also gets to know source node
if i == 0 {
// replay paths contain valid nodes & nodes.len() > 0 checked by invariant!()
#[allow(clippy::indexing_slicing)]
if let DomainRequest::SetupReplayPath { ref mut source, .. } = setup {
*source = Some(self.graph[nodes[0].0].local_addr());
}
}
if let Some(ref key) = partial {
// for partial materializations, nodes need to know how to trigger replays
if let DomainRequest::SetupReplayPath {
ref mut trigger, ..
} = setup
{
if segments.len() == 1 {
// replay is entirely contained within one domain
*trigger = TriggerEndpoint::Local(key.clone());
} else if i == 0 {
// first domain needs to be told about partial replay trigger
*trigger = TriggerEndpoint::Start(key.clone());
} else if i == segments.len() - 1 {
// if the source is sharded, we need to know whether we should ask all
// the shards, or just one. if the replay key is the same as the
// sharding key, we just ask one, and all is good. if the replay key
// and the sharding key differs, we generally have to query *all* the
// shards.
//
// there is, however, an exception to this: if we have two domains that
// have the same sharding, but a replay path between them on some other
// key than the sharding key, the right thing to do is to *only* query
// the same shard as ourselves. this is because any answers from other
// shards would necessarily just be with records that do not match our
// sharding key anyway, and that we should thus never see.
// Note : segments[0] looks something like this :
// ( DomainIndex(0),
// [
// (NodeIndex(1), Some([1])),
// (NodeIndex(3), Some([1])),
// (NodeIndex(4), Some([0])),
// (NodeIndex(11), Some([0]))
// ]
// )
//
// NOTE(eta): I have mixed feelings about how the writer of the above
// note thought to be helpful and put the structure thing
// there, but did not think to be helpful enough to refactor
// it to use a more sane data structure...
//
// NOTE(eta): this code is mild copypasta; another copy exists a few
// lines down
#[allow(clippy::indexing_slicing)] // checked by above invariant!()
let first_domain_segments = &segments[0].1;
invariant!(!first_domain_segments.is_empty());
#[allow(clippy::indexing_slicing)] // checked by above invariant!()
let first_domain_node_key = &first_domain_segments[0];
#[allow(clippy::indexing_slicing)] // replay path node indices are valid
let src_sharding = self.graph[first_domain_node_key.0].sharded_by();
let shards = src_sharding.shards().unwrap_or(1);
let lookup_key_index_to_shard = match src_sharding {
Sharding::Random(..) => None,
Sharding::ByColumn(c, _) => {
// we want to check the source of the key column. If the source
// node is sharded by that
// column, we are able to ONLY look at a single
// shard on the source. Otherwise, we need to check all of the
// shards.
let key_column_source = &first_domain_node_key.1;
match key_column_source {
Some(source_index) => {
if source_index.len() == 1 {
#[allow(clippy::indexing_slicing)] // len == 1
if source_index[0] == c {
// the source node is sharded by the key column.
Some(0)
} else {
// the node is sharded by a different column
// than
// the key column, so we need to go ahead and
// query all
// shards. BUMMER.
None
}
} else {
// we're using a compound key to look up into a node
// that's
// sharded by a single column. if the sharding key
// is one
// of the lookup keys, then we indeed only need to
// look at
// one shard, otherwise we need to ask all
source_index
.columns
.iter()
.position(|source_column| source_column == &c)
}
}
// This means the key column has no provenance in
// the source. This could be because it comes from multiple
// columns.
// Or because the node is fully materialized so replays are
// not necessary.
None => None,
}
}
s if s.is_none() => None,
s => internal!("unhandled new sharding pattern {:?}", s),
};
let selection = if let Some(i) = lookup_key_index_to_shard {
// if we are not sharded, all is okay.
//
// if we are sharded:
//
// - if there is a shuffle above us, a shard merger + sharder above
// us will ensure that we hear the replay response.
//
// - if there is not, we are sharded by the same column as the
// source. this also means that the replay key in the destination
// is the sharding key of the destination. to see why, consider
// the case where the destination is sharded by x. the source
// must also be sharded by x for us to be in this case. we also
// know that the replay lookup key on the source must be x since
// lookup_on_shard_key == true. since no shuffle was introduced,
// src.x must resolve to dst.x assuming x is not aliased in dst.
// because of this, it should be the case that KeyShard ==
// SameShard; if that were not the case, the value in dst.x
// should never have reached dst in the first place.
SourceSelection::KeyShard {
key_i_to_shard: i,
nshards: shards,
}
} else {
// replay key != sharding key
// normally, we'd have to query all shards, but if we are sharded
// the same as the source (i.e., there are no shuffles between the
// source and us), then we should *only* query the same shard of
// the source (since it necessarily holds all records we could
// possibly see).
//
// note that the no-sharding case is the same as "ask all shards"
// except there is only one (shards == 1).
#[allow(clippy::indexing_slicing)] // replay path node indices valid
if src_sharding.is_none()
|| segments
.iter()
.flat_map(|s| s.1.iter())
.any(|&(n, _, _)| self.graph[n].is_shard_merger())
{
SourceSelection::AllShards(shards)
} else {
SourceSelection::SameShard
}
};
debug!(policy = ?selection, %tag, "picked source selection policy");
#[allow(clippy::indexing_slicing)] // checked by invariant!() earlier
{
*trigger = TriggerEndpoint::End(selection, segments[0].0);
}
}
}
} else {
// for full materializations, the last domain should report when it's done
if i == segments.len() - 1 {
if let DomainRequest::SetupReplayPath {
ref mut notify_done,
..
} = setup
{
*notify_done = true;
invariant!(pending.is_none());
// NOTE(eta): this code is mild copypasta; another copy exists a few
// lines above
#[allow(clippy::indexing_slicing)]
// checked by segments.len() > 0 invariant!()
let first_domain_segments_data = &segments[0];
invariant!(!first_domain_segments_data.1.is_empty());
#[allow(clippy::indexing_slicing)] // checked by above invariant!()
let first_domain_node_key = &first_domain_segments_data.1[0];
#[allow(clippy::indexing_slicing)] // replay path node indices valid
{
pending = Some(PendingReplay {
tag,
source: self.graph[first_domain_node_key.0].local_addr(),
source_domain: first_domain_segments_data.0,
});
}
}
}
}
if i != segments.len() - 1 {
// since there is a later domain, the last node of any non-final domain
// must either be an egress or a Sharder. If it's an egress, we need
// to tell it about this replay path so that it knows
// what path to forward replay packets on.
#[allow(clippy::unwrap_used)] // nodes>0 checked by invariant
// node indices from replay paths valid
#[allow(clippy::indexing_slicing)]
let n = &self.graph[nodes.last().unwrap().0];
if n.is_egress() {
#[allow(clippy::indexing_slicing)] // checked by enclosing if blocks
let later_domain_segments = &segments[i + 1].1;
invariant!(!later_domain_segments.is_empty());
#[allow(clippy::indexing_slicing)] // checked by invariant
let (ingress_node, _, _) = later_domain_segments[0];
self.dmp.add_message(
domain,
DomainRequest::AddEgressTag {
egress_node: n.local_addr(),
tag,
ingress_node,
},
)?;
} else {
invariant!(n.is_sharder());
}
}
trace!(domain = %domain.index(), "telling domain about replay path");
self.dmp.add_message(domain, setup)?;
}
#[allow(clippy::indexing_slicing)] // we know our node is in the graph
if !self.partial && !self.graph[self.node].is_base() {
// this path requires doing a replay and then waiting for the replay to finish
if let Some(pending) = pending {
self.pending.push(pending);
} else {
internal!(
"no replay planned for fully materialized non-base node {}!",
self.node.index()
);
}
}
}
Ok(())
}
/// Instructs the target node to set up appropriate state for any new indices that have been
/// added. For new indices added to full materializations, this may take some time (a
/// re-indexing has to happen), whereas for new indices to partial views it should be nearly
/// instantaneous.
///
/// Returns a list of backfill replays that need to happen before the migration is complete,
/// and a set of replay paths for this node indexed by tag.
pub(super) fn finalize(
mut self,
) -> ReadySetResult<(Vec<PendingReplay>, HashMap<Tag, Vec<NodeIndex>>)> {
use dataflow::payload::PrepareStateKind;
#[allow(clippy::indexing_slicing)] // the node index we were created with is in graph...
let our_node = &self.graph[self.node];
// NOTE: we cannot use the impl of DerefMut here, since it (reasonably) disallows getting
// mutable references to taken state.
let s = if let Some(r) = our_node.as_reader() {
let index = r
.index()
.ok_or_else(|| internal_err!("Reader has no key"))?
.clone();
if self.partial {
invariant!(r.is_materialized());
#[allow(clippy::indexing_slicing)]
// the node index we were created with is in graph...
let last_domain = self.graph[self.node].domain();
let num_shards = self.dmp.num_shards(last_domain)?;
// since we're partially materializing a reader node,
// we need to give it a way to trigger replays.
#[allow(clippy::indexing_slicing)]
// the node index we were created with is in graph...
PrepareStateKind::PartialReader {
node_index: self.node,
num_columns: self.graph[self.node].columns().len(),
index,
trigger_domain: last_domain,
num_shards,
}
} else {
#[allow(clippy::indexing_slicing)]
// the node index we were created with is in graph...
PrepareStateKind::FullReader {
num_columns: self.graph[self.node].columns().len(),
index,
node_index: self.node,
}
}
} else {
// not a reader
let weak_indices = self.m.added_weak.remove(&self.node).unwrap_or_default();
if self.partial {
let strict_indices = self.indexes.drain().collect();
PrepareStateKind::Partial {
strict_indices,
weak_indices,
}
} else {
let strict_indices = self.indexes.drain().map(|(k, _)| k).collect();
PrepareStateKind::Full {
strict_indices,
weak_indices,
}
}
};
self.dmp.add_message(
our_node.domain(),
DomainRequest::PrepareState {
node: our_node.local_addr(),
state: s,
},
)?;
if self.m.config.packet_filters_enabled {
self.setup_packet_filter()?;
}
if !self.partial {
// if we're constructing a new view, there is no reason to replay any given path more
// than once. we do need to be careful here though: the fact that the source and
// destination of a path are the same does *not* mean that the path is the same (b/c of
// unions), and we do not want to eliminate different paths!
let mut distinct_paths = HashSet::new();
let paths = &self.paths;
self.pending.retain(|p| {
// keep if this path is different
if let Some(path) = paths.get(&p.tag) {
distinct_paths.insert(path)
} else {
// FIXME(eta): proper error handling here! (would be nice to have try_retain)
false
}
});
} else {
invariant!(self.pending.is_empty());
}
Ok((self.pending, self.paths))
}
fn setup_packet_filter(&mut self) -> ReadySetResult<()> {
// If the node is partial and also a reader, then traverse the
// graph upwards to notify the egress node that it should filter
// any packet that was not requested.
#[allow(clippy::indexing_slicing)] // the node index we were created with is in graph...
let our_node = &self.graph[self.node];
return if self.partial && our_node.is_reader() {
// Since reader nodes belong to their own domains, their
// domains should consist only of them + an ingress node.
// It's fair to assume that the reader node has an ingress node as an ancestor.
#[allow(clippy::indexing_slicing)] // neighbors_directed returns valid indices
let ingress_opt = self
.graph
.neighbors_directed(self.node, petgraph::Incoming)
.find(|&n| self.graph[n].is_ingress());
let ingress = match ingress_opt {
None => internal!("The current node is a reader, it MUST belong in its own domain, and therefore must be an ingress node ancestor."),
Some(i) => i
};
// Now we look for the egress node, which should be an ancestor of the ingress node.
#[allow(clippy::indexing_slicing)] // neighbors_directed returns valid indices
let egress_opt = self
.graph
.neighbors_directed(ingress, petgraph::Incoming)
.find(|&n| self.graph[n].is_egress());
let egress = match egress_opt {
// If an ingress node does not have an incoming egress node, that means this reader
// domain is behind a shard merger.
// We skip the packet filter setup for now.
// TODO(fran): Implement packet filtering for shard mergers (https://readysettech.atlassian.net/browse/ENG-183).
None => return Ok(()),
Some(e) => e,
};
// Get the egress node's local address within that domain.
#[allow(clippy::indexing_slicing)] // neighbors_directed returns valid indices
let egress_local_addr = self.graph[egress].local_addr();
// Communicate the egress node's domain that any packet sent
// to the reader's domain ingress node should filter any
// packets not previously requested.
#[allow(clippy::indexing_slicing)] // neighbors_directed returns valid indices
self.dmp.add_message(
self.graph[egress].domain(),
DomainRequest::AddEgressFilter {