Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: update dataReader.go for improved pagination and limit hand… #1928

Merged
merged 1 commit into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions internal/storage/memory/dataReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,18 +83,29 @@ func (r *DataReader) QueryRelationships(_ context.Context, tenantID string, filt
})

var tuples []*base.Tuple
count := uint32(0)
limit := pagination.Limit()

for _, t := range tup {
// Skip tuples below the lower bound
switch pagination.Sort() {
case "entity_id":
if t.EntityID >= lowerBound {
tuples = append(tuples, t.ToTuple())
if t.EntityID < lowerBound {
continue
}
case "subject_id":
if t.SubjectID >= lowerBound {
tuples = append(tuples, t.ToTuple())
if t.SubjectID < lowerBound {
continue
}
default:
tuples = append(tuples, t.ToTuple())
}

// Add tuple to result set
tuples = append(tuples, t.ToTuple())

// Enforce the limit if it's set
count++
if limit > 0 && count >= limit {
break
}
}

Expand Down Expand Up @@ -222,7 +233,7 @@ func (r *DataReader) QueryAttributes(_ context.Context, tenantID string, filter
attr = append(attr, t)
}

// Sort tuples based on the provided order field
// Sort attributes based on the provided order field
sort.Slice(attr, func(i, j int) bool {
switch pagination.Sort() {
case "entity_id":
Expand All @@ -233,14 +244,25 @@ func (r *DataReader) QueryAttributes(_ context.Context, tenantID string, filter
})

var attrs []*base.Attribute
count := uint32(0)
limit := pagination.Limit()

for _, t := range attr {
// Skip attributes below the lower bound
switch pagination.Sort() {
case "entity_id":
if t.EntityID >= lowerBound {
attrs = append(attrs, t.ToAttribute())
if t.EntityID < lowerBound {
continue
}
default:
attrs = append(attrs, t.ToAttribute())
}

// Add attribute to result set
attrs = append(attrs, t.ToAttribute())

// Enforce the limit if it's set
count++
if limit > 0 && count >= limit {
break
}
}

Expand Down
18 changes: 15 additions & 3 deletions internal/storage/postgres/dataReader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (r *DataReader) QueryRelationships(ctx context.Context, tenantID string, fi
builder = builder.OrderBy(pagination.Sort())
}

// Apply limit if specified in pagination
limit := pagination.Limit()
if limit > 0 {
builder = builder.Limit(uint64(limit))
}

// Generate the SQL query and arguments.
var query string
query, args, err = builder.ToSql()
Expand Down Expand Up @@ -273,7 +279,7 @@ func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filte
return nil, utils.HandleError(ctx, span, err, base.ErrorCode_ERROR_CODE_INTERNAL)
}

// Build the relationships query based on the provided filter and snapshot value.
// Build the attributes query based on the provided filter and snapshot value.
var args []interface{}
builder := r.database.Builder.Select("entity_type, entity_id, attribute, value").From(AttributesTable).Where(squirrel.Eq{"tenant_id": tenantID})
builder = utils.AttributesFilterQueryForSelectBuilder(builder, filter)
Expand All @@ -292,6 +298,12 @@ func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filte
builder = builder.OrderBy(pagination.Sort())
}

// Apply limit if specified in pagination
limit := pagination.Limit()
if limit > 0 {
builder = builder.Limit(uint64(limit))
}

// Generate the SQL query and arguments.
var query string
query, args, err = builder.ToSql()
Expand All @@ -309,7 +321,7 @@ func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filte
}
defer rows.Close()

// Process the result rows and store the relationships in a TupleCollection.
// Process the result rows and store the attributes in an AttributeCollection.
collection := database.NewAttributeCollection()
for rows.Next() {
rt := storage.Attribute{}
Expand Down Expand Up @@ -339,7 +351,7 @@ func (r *DataReader) QueryAttributes(ctx context.Context, tenantID string, filte

slog.DebugContext(ctx, "successfully retrieved attributes tuples from the database")

// Return a TupleIterator created from the TupleCollection.
// Return an AttributeIterator created from the AttributeCollection.
return collection.CreateAttributeIterator(), nil
}

Expand Down
13 changes: 13 additions & 0 deletions pkg/database/pagination.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type CursorPaginationOption func(*CursorPagination)
type CursorPagination struct {
cursor string
sort string
limit uint32
}

// NewCursorPagination -
Expand Down Expand Up @@ -80,6 +81,13 @@ func Sort(sort string) CursorPaginationOption {
}
}

// Limit -
func Limit(limit uint32) CursorPaginationOption {
return func(c *CursorPagination) {
c.limit = limit
}
}

// Cursor -
func (p CursorPagination) Cursor() string {
return p.cursor
Expand All @@ -89,3 +97,8 @@ func (p CursorPagination) Cursor() string {
func (p CursorPagination) Sort() string {
return p.sort
}

// Limit -
func (p CursorPagination) Limit() uint32 {
return p.limit
}
Loading