Skip to content
This repository has been archived by the owner on Oct 7, 2023. It is now read-only.

preserve ephemeral owner and permit directory deletion after losing ephemeral children #89

Merged
merged 3 commits into from
Apr 19, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,54 @@ func TestGetRoot(t *testing.T) {
})
}

func TestEphemeral(t *testing.T) {
zkclus := newZKCluster(t)
defer zkclus.Close(t)

c, _, err := zk.Connect([]string{zkclus.Addr()}, time.Second)
if err != nil {
t.Fatal(err)
}
// Use closure to capture c because it's overwritten with a new connection later.
defer func() { c.Close() }()

if _, err := c.Create("/abc", []byte(""), 0, acl); err != nil {
t.Fatal(err)
}
if _, err := c.Create("/abc/def", []byte(""), zk.FlagEphemeral, acl); err != nil {
t.Fatal(err)
}
// Confirm there's an ephemeral owner after updating an ephemeral node.
if _, err := c.Set("/abc/def", []byte("123"), -1); err != nil {
t.Fatal(err)
}
_, s, err := c.Get("/abc/def")
if err != nil {
t.Fatal(err)
}
if s.EphemeralOwner == 0 {
t.Fatal("expected ephemeral owner")
}

// Confirm parent directory can be deleted after session expiration.
c.Close()
time.Sleep(3 * time.Second)
c, _, err = zk.Connect([]string{zkclus.Addr()}, time.Second)
if err != nil {
t.Fatal(err)
}
keys, _, cerr := c.Children("/abc")
if cerr != nil {
t.Fatal(err)
}
if len(keys) != 0 {
t.Fatalf("expected no keys in /abc, got %v", keys)
}
if err := c.Delete("/abc", -1); err != nil {
t.Fatal(err)
}
}

func TestDirStat(t *testing.T) {
runTest(t, func(t *testing.T, c *zk.Conn) {
if _, err := c.Create("/abc", []byte(""), 0, acl); err != nil {
Expand Down
12 changes: 7 additions & 5 deletions zketcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,15 @@ func (z *zkEtcd) mkDeleteTxnOp(op *DeleteRequest) opBundle {
}
}

crev := s.Rev(mkPathCVer(p))
// Force CVer into read-set to catch any conflicting update
// which would invalidate emptiness check.
s.Rev(mkPathCVer(p))
// Check if directory has any children.
gresp, gerr := z.c.Get(z.c.Ctx(), getListPfx(p),
// TODO: monotonic revisions from serializable
// etcd.WithSerializable(),
etcd.WithPrefix(),
etcd.WithCountOnly(),
etcd.WithRev(crev),
etcd.WithLimit(1))
if gerr != nil {
return gerr
Expand Down Expand Up @@ -360,9 +362,9 @@ func (z *zkEtcd) mkSetDataTxnOp(op *SetDataRequest) opBundle {
return ErrBadVersion

}
s.Put(mkPathKey(p), string(op.Data))
s.Put(mkPathVer(p), string(encodeInt64(int64(currentVersion+1))))
s.Put(mkPathMTime(p), encodeTime())
s.Put(mkPathKey(p), string(op.Data), etcd.WithIgnoreLease())
s.Put(mkPathVer(p), string(encodeInt64(int64(currentVersion+1))), etcd.WithIgnoreLease())
s.Put(mkPathMTime(p), encodeTime(), etcd.WithIgnoreLease())
return nil
}

Expand Down