Skip to content

Commit

Permalink
fix redundant thread calls and a bunch of bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
rusq committed Feb 15, 2023
1 parent 36e5ade commit a3db0e7
Show file tree
Hide file tree
Showing 15 changed files with 330 additions and 37 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
# IDE
.idea
.vscode
.gonvim

#OS junk
.DS_Store
Expand Down
3 changes: 3 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,6 @@ callvis:
goreleaser:
goreleaser check
goreleaser release --snapshot --rm-dist

tags:
gotags -R *.go > $@
6 changes: 3 additions & 3 deletions cmd/slackdump/internal/diag/obfuscate.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package diag
import (
"context"
"io"
"math/rand"
"os"
"time"

Expand All @@ -26,14 +25,15 @@ var CmdObfuscate = &base.Command{
var obfuscateParams struct {
inputFile string
outputFile string
seed int64
}

func init() {
rand.Seed(time.Now().UnixNano())
CmdObfuscate.Run = runObfuscate

CmdObfuscate.Flag.StringVar(&obfuscateParams.inputFile, "i", "", "input file, if not specified, stdin is used")
CmdObfuscate.Flag.StringVar(&obfuscateParams.outputFile, "o", "", "output file, if not specified, stdout is used")
CmdObfuscate.Flag.Int64Var(&obfuscateParams.seed, "seed", time.Now().UnixNano(), "seed for the random number generator")
}

func runObfuscate(ctx context.Context, cmd *base.Command, args []string) error {
Expand Down Expand Up @@ -65,5 +65,5 @@ func runObfuscate(ctx context.Context, cmd *base.Command, args []string) error {
}
defer out.Close()
}
return obfuscate.Do(ctx, out, in)
return obfuscate.Do(ctx, out, in, obfuscate.WithSeed(obfuscateParams.seed))
}
69 changes: 69 additions & 0 deletions contrib/record_stats.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
#!/usr/bin/env python
import io
import sys
import json

RECORD_FILE = "record.jsonl"
INDEX_FILE = "index.json"

def index_stats(f: io.TextIOWrapper):
s = f.read().strip()
index = json.loads(s)

entries = 0
channels = 0
threads = 0
files = 0
for k, v in index.items():
print("Key: %s, value: %s" % (k, v))
entries += len(v)
if k.startswith("t"):
threads += 1
elif k.startswith("f"):
files += 1
else:
channels += 1

print("Total number of index entries: %d" % len(index))
print("Total number of data offsets: %d" % entries)
print("Total number of channels: %d" % channels)
print("Total number of threads: %d" % threads)
print("Total number of files: %d" % files)



def record_stats(f: io.TextIOWrapper):
lines = list(f)

print("Total number of API requests: {}".format(len(lines)))
messages = 0
msg_requests = 0
threads = 0
thread_requests = 0
files = 0
file_requests = 0
for line in lines:
data = json.loads(line)
if data["type"] == 0:
msg_requests += 1
messages += data["size"]
elif data["type"] == 1:
thread_requests += 1
threads += data["size"]
elif data["type"] == 2:
file_requests += 1
files += data["size"]
print("Total number of message requests: {}, messages {}".format(
msg_requests, messages))
print("Total number of thread requests: {}, thread messages: {}".format(
thread_requests, threads))
print("Total number of file requests: {}, files: {}".format(file_requests, files))


if __name__ == "__main__":
file = sys.argv[1] if len(sys.argv) > 1 else INDEX_FILE
try:
with open(file) as f:
index_stats(f)
except FileNotFoundError:
print("File not found: {}".format(file))
Binary file modified internal/fixtures/assets/events.jsonl.gz
Binary file not shown.
4 changes: 2 additions & 2 deletions internal/processors/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ const (
)

type Event struct {
Type EventType `json:"type,omitempty"`
TS int64 `json:"event_ts,omitempty"`
Type EventType `json:"type"`
Timestamp int64 `json:"event_ts,omitempty"`
ChannelID string `json:"channel_id,omitempty"`
IsThreadMessage bool `json:"is_thread_message,omitempty"`
Size int `json:"size,omitempty"` // number of messages or files
Expand Down
3 changes: 3 additions & 0 deletions internal/processors/obfuscate/obfuscate.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ func (o obfuscator) OneMessage(m *slack.Message) {
if len(m.Attachments) > 0 {
m.Attachments = nil // too much hassle to obfuscate
}
if m.ParentUserId != "" {
m.ParentUserId = o.ID("U", m.ParentUserId)
}
for i := range m.Files {
o.OneFile(&m.Files[i])
}
Expand Down
27 changes: 22 additions & 5 deletions internal/processors/player.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"encoding/json"
"errors"
"io"
"os"
"sync/atomic"

"github.com/slack-go/slack"
)
Expand All @@ -21,7 +23,8 @@ type Player struct {

pointer state // current event pointers

idx index
idx index
lastOffset atomic.Int64
}

// index holds the index of each event type within the file. key is the event
Expand All @@ -37,6 +40,9 @@ func NewPlayer(rs io.ReadSeeker) (*Player, error) {
if err != nil {
return nil, err
}
if _, err := rs.Seek(0, io.SeekStart); err != nil { // reset offset
return nil, err
}
return &Player{
rs: rs,
idx: idx,
Expand All @@ -45,7 +51,7 @@ func NewPlayer(rs io.ReadSeeker) (*Player, error) {
}

// indexRecords indexes the records in the reader and returns an index.
func indexRecords(rs io.ReadSeeker) (index, error) {
func indexRecords(rs io.Reader) (index, error) {
var idx = make(index)

dec := json.NewDecoder(rs)
Expand All @@ -62,12 +68,22 @@ func indexRecords(rs io.ReadSeeker) (index, error) {
}
idx[event.ID()] = append(idx[event.ID()], offset)
}
if _, err := rs.Seek(0, io.SeekStart); err != nil { // reset offset
f, err := os.Create("index.json")
if err != nil {
return nil, err
}
defer f.Close()
if err := json.NewEncoder(f).Encode(idx); err != nil {
return nil, err
}
return idx, nil
}

// Offset returns the last read offset of the record ReadSeeker.
func (p *Player) Offset() int64 {
return p.lastOffset.Load()
}

func (p *Player) tryGetEvent(id string) (*Event, error) {
offsets, ok := p.idx[id]
if !ok {
Expand All @@ -82,6 +98,7 @@ func (p *Player) tryGetEvent(id string) (*Event, error) {
return nil, io.EOF
}

p.lastOffset.Store(offsets[ptr])
_, err := p.rs.Seek(offsets[ptr], io.SeekStart) // seek to the offset
if err != nil {
return nil, err
Expand All @@ -97,12 +114,12 @@ func (p *Player) tryGetEvent(id string) (*Event, error) {
func (p *Player) hasMore(id string) bool {
offsets, ok := p.idx[id]
if !ok {
return false
return false // no such id
}
// getting current offset index for the requested id.
ptr, ok := p.pointer[id]
if !ok {
p.pointer[id] = 0 // initialize, if we see it the first time.
return true //hasn't been accessed yet
}
return ptr < len(offsets)
}
Expand Down
Loading

0 comments on commit a3db0e7

Please sign in to comment.