Skip to content

Commit

Permalink
Merge pull request #630 from oom-ai/jinghan/implement_export
Browse files Browse the repository at this point in the history
feat(oomd/server): implement rpc Export and ChannelExport
  • Loading branch information
jinghancc authored Nov 26, 2021
2 parents 9f7bb07 + a2c37de commit f5eb420
Showing 1 changed file with 57 additions and 2 deletions.
59 changes: 57 additions & 2 deletions oomd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,11 +193,54 @@ func (s *server) Join(ctx context.Context, req *codegen.JoinRequest) (*codegen.J
}

func (s *server) ChannelExport(req *codegen.ChannelExportRequest, stream codegen.OomD_ChannelExportServer) error {
panic("implement me")
ctx := context.Background()
exportResult, err := s.oomstore.ChannelExport(ctx, types.ChannelExportOpt{
FeatureNames: req.FeatureNames,
RevisionID: int(req.RevisionId),
Limit: req.Limit,
})
if err != nil {
return err
}
for row := range exportResult.Data {
valueRow, err := convertToValueSlice(row)
if err != nil {
return err
}
if err := stream.Send(&codegen.ChannelExportResponse{
Status: buildStatus(code.Code_OK, ""),
Header: exportResult.Header,
Row: valueRow,
}); err != nil {
return err
}
}
exportErr := exportResult.CheckStreamError()
if exportErr != nil {
if err := stream.Send(&codegen.ChannelExportResponse{
Status: buildStatus(code.Code_INTERNAL, exportErr.Error()),
}); err != nil {
return err
}
}
return nil
}

func (s *server) Export(ctx context.Context, req *codegen.ExportRequest) (*codegen.ExportResponse, error) {
panic("implement me")
err := s.oomstore.Export(ctx, types.ExportOpt{
FeatureNames: req.FeatureNames,
RevisionID: int(req.RevisionId),
Limit: req.Limit,
OutputFilePath: req.OutputFilePath,
})
if err != nil {
return &codegen.ExportResponse{
Status: buildStatus(code.Code_INTERNAL, err.Error()),
}, err
}
return &codegen.ExportResponse{
Status: buildStatus(code.Code_OK, ""),
}, nil
}

func convertToValueMap(m map[string]interface{}) (map[string]*codegen.Value, error) {
Expand All @@ -212,6 +255,18 @@ func convertToValueMap(m map[string]interface{}) (map[string]*codegen.Value, err
return valueMap, nil
}

func convertToValueSlice(s []interface{}) ([]*codegen.Value, error) {
valueSlice := make([]*codegen.Value, 0, len(s))
for i := range s {
value, err := convertInterfaceToValue(i)
if err != nil {
return nil, err
}
valueSlice = append(valueSlice, value)
}
return valueSlice, nil
}

func buildStatus(code code.Code, message string) *status.Status {
return &status.Status{
Code: int32(code),
Expand Down

0 comments on commit f5eb420

Please sign in to comment.