diff --git a/Makefile b/Makefile index 5ce24fb..091ef53 100644 --- a/Makefile +++ b/Makefile @@ -25,6 +25,10 @@ test: proto-health: protoc proto/health/*.proto --go_out=paths=source_relative:. --proto_path=. +.PHONY: proto-trace +proto-trace: + protoc proto/trace/*.proto --go_out=paths=source_relative:. --proto_path=. + .PHONY: proto-sidecar proto-sidecar: protoc proto/sidecar/*.proto --go_out=paths=source_relative:. --proto_path=. diff --git a/go.mod b/go.mod index 481bc6c..62ae0e7 100644 --- a/go.mod +++ b/go.mod @@ -19,6 +19,9 @@ require ( github.com/patrickmn/go-cache v2.1.0+incompatible github.com/stretchr/testify v1.9.0 github.com/w-h-a/crd v0.1.0 + go.opentelemetry.io/otel v1.31.0 + go.opentelemetry.io/otel/sdk v1.31.0 + go.opentelemetry.io/otel/trace v1.31.0 golang.org/x/crypto v0.26.0 golang.org/x/text v0.17.0 google.golang.org/grpc v1.65.0 @@ -44,6 +47,7 @@ require ( github.com/felixge/httpsnoop v1.0.4 // indirect github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/go-logr/logr v1.4.2 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-openapi/jsonpointer v0.19.6 // indirect github.com/go-openapi/jsonreference v0.20.2 // indirect github.com/go-openapi/swag v0.22.4 // indirect @@ -64,9 +68,10 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/x448/float16 v0.8.4 // indirect + go.opentelemetry.io/otel/metric v1.31.0 // indirect golang.org/x/net v0.28.0 // indirect golang.org/x/oauth2 v0.21.0 // indirect - golang.org/x/sys v0.24.0 // indirect + golang.org/x/sys v0.26.0 // indirect golang.org/x/term v0.23.0 // indirect golang.org/x/time v0.3.0 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240822170219-fc7c04adadcd // indirect diff --git a/go.sum b/go.sum index 35d36db..5239e55 100644 --- a/go.sum +++ b/go.sum @@ -41,8 +41,11 @@ github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2 github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fxamacker/cbor/v2 v2.7.0 h1:iM5WgngdRBanHcxugY4JySA0nk1wZorNOpTgCMedv5E= github.com/fxamacker/cbor/v2 v2.7.0/go.mod h1:pxXPTn3joSm21Gbwsv0w9OSA2y1HFR9qXEeXQVeNoDQ= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-openapi/jsonpointer v0.19.6 h1:eCs3fxoIi3Wh6vtgmLTOjdhSpiqphQ+DaPn38N2ZdrE= github.com/go-openapi/jsonpointer v0.19.6/go.mod h1:osyAmYz/mB/C3I+WsTTSgw1ONzaLJoLCyoi6/zppojs= github.com/go-openapi/jsonreference v0.20.2 h1:3sVjiK66+uXK/6oQ8xgcRKcFgQ5KXa2KvnJRumpMGbE= @@ -132,6 +135,14 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM= github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= +go.opentelemetry.io/otel v1.31.0 h1:NsJcKPIW0D0H3NgzPDHmo0WW6SptzPdqg/L1zsIm2hY= +go.opentelemetry.io/otel v1.31.0/go.mod h1:O0C14Yl9FgkjqcCZAsE053C13OaddMYr/hz6clDkEJE= +go.opentelemetry.io/otel/metric v1.31.0 h1:FSErL0ATQAmYHUIzSezZibnyVlft1ybhy4ozRPcF2fE= +go.opentelemetry.io/otel/metric v1.31.0/go.mod h1:C3dEloVbLuYoX41KpmAhOqNriGbA+qqH6PQ5E5mUfnY= +go.opentelemetry.io/otel/sdk v1.31.0 h1:xLY3abVHYZ5HSfOg3l2E5LUj2Cwva5Y7yGxnSW9H5Gk= +go.opentelemetry.io/otel/sdk v1.31.0/go.mod h1:TfRbMdhvxIIr/B2N2LQW2S5v9m3gOQ/08KsbbO5BPT0= +go.opentelemetry.io/otel/trace v1.31.0 h1:ffjsj1aRouKewfr85U2aGagJ46+MvodynlQ1HYdmJys= +go.opentelemetry.io/otel/trace v1.31.0/go.mod h1:TXZkRk7SM2ZQLtR6eoAWQFIHPvzQ06FJAsO1tJg480A= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= @@ -153,8 +164,8 @@ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.24.0 h1:Twjiwq9dn6R1fQcyiK+wQyHWfaz/BJB+YIpzU/Cv3Xg= -golang.org/x/sys v0.24.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= +golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.23.0 h1:F6D4vR+EHoL9/sWAWgAR1H2DcHr4PareCbAaCo1RpuU= golang.org/x/term v0.23.0/go.mod h1:DgV24QBUrK6jhZXl+20l6UWznPlwAHm1Q1mGHtydmSk= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= diff --git a/proto/sidecar/sidecar.pb.go b/proto/sidecar/sidecar.pb.go index 29f510a..ca47559 100644 --- a/proto/sidecar/sidecar.pb.go +++ b/proto/sidecar/sidecar.pb.go @@ -27,8 +27,8 @@ type Event struct { sizeCache protoimpl.SizeCache unknownFields protoimpl.UnknownFields - EventName string `protobuf:"bytes,1,opt,name=eventName,proto3" json:"eventName,omitempty"` - Data *anypb.Any `protobuf:"bytes,2,opt,name=data,proto3" json:"data,omitempty"` + EventName string `protobuf:"bytes,1,opt,name=eventName,proto3" json:"eventName,omitempty"` + Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` } func (x *Event) Reset() { @@ -70,9 +70,9 @@ func (x *Event) GetEventName() string { return "" } -func (x *Event) GetData() *anypb.Any { +func (x *Event) GetPayload() []byte { if x != nil { - return x.Data + return x.Payload } return nil } @@ -761,12 +761,11 @@ var file_proto_sidecar_sidecar_proto_rawDesc = []byte{ 0x73, 0x69, 0x64, 0x65, 0x63, 0x61, 0x72, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x07, 0x73, 0x69, 0x64, 0x65, 0x63, 0x61, 0x72, 0x1a, 0x19, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2f, 0x61, 0x6e, 0x79, 0x2e, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x22, 0x4f, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x65, 0x76, + 0x6f, 0x22, 0x3f, 0x0a, 0x05, 0x45, 0x76, 0x65, 0x6e, 0x74, 0x12, 0x1c, 0x0a, 0x09, 0x65, 0x76, 0x65, 0x6e, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x65, - 0x76, 0x65, 0x6e, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x28, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, - 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, 0x41, 0x6e, 0x79, 0x52, 0x04, 0x64, 0x61, - 0x74, 0x61, 0x22, 0x46, 0x0a, 0x06, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x12, 0x10, 0x0a, 0x03, + 0x76, 0x65, 0x6e, 0x74, 0x4e, 0x61, 0x6d, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x70, 0x61, 0x79, 0x6c, + 0x6f, 0x61, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x07, 0x70, 0x61, 0x79, 0x6c, 0x6f, + 0x61, 0x64, 0x22, 0x46, 0x0a, 0x06, 0x4b, 0x65, 0x79, 0x56, 0x61, 0x6c, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x2a, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x14, 0x2e, 0x67, 0x6f, 0x6f, 0x67, 0x6c, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x75, 0x66, 0x2e, @@ -856,19 +855,18 @@ var file_proto_sidecar_sidecar_proto_goTypes = []interface{}{ (*anypb.Any)(nil), // 16: google.protobuf.Any } var file_proto_sidecar_sidecar_proto_depIdxs = []int32{ - 16, // 0: sidecar.Event.data:type_name -> google.protobuf.Any - 16, // 1: sidecar.KeyVal.value:type_name -> google.protobuf.Any - 15, // 2: sidecar.Secret.data:type_name -> sidecar.Secret.DataEntry - 1, // 3: sidecar.PostStateRequest.records:type_name -> sidecar.KeyVal - 1, // 4: sidecar.ListStateResponse.records:type_name -> sidecar.KeyVal - 1, // 5: sidecar.GetStateResponse.records:type_name -> sidecar.KeyVal - 0, // 6: sidecar.PublishRequest.event:type_name -> sidecar.Event - 2, // 7: sidecar.GetSecretResponse.secret:type_name -> sidecar.Secret - 8, // [8:8] is the sub-list for method output_type - 8, // [8:8] is the sub-list for method input_type - 8, // [8:8] is the sub-list for extension type_name - 8, // [8:8] is the sub-list for extension extendee - 0, // [0:8] is the sub-list for field type_name + 16, // 0: sidecar.KeyVal.value:type_name -> google.protobuf.Any + 15, // 1: sidecar.Secret.data:type_name -> sidecar.Secret.DataEntry + 1, // 2: sidecar.PostStateRequest.records:type_name -> sidecar.KeyVal + 1, // 3: sidecar.ListStateResponse.records:type_name -> sidecar.KeyVal + 1, // 4: sidecar.GetStateResponse.records:type_name -> sidecar.KeyVal + 0, // 5: sidecar.PublishRequest.event:type_name -> sidecar.Event + 2, // 6: sidecar.GetSecretResponse.secret:type_name -> sidecar.Secret + 7, // [7:7] is the sub-list for method output_type + 7, // [7:7] is the sub-list for method input_type + 7, // [7:7] is the sub-list for extension type_name + 7, // [7:7] is the sub-list for extension extendee + 0, // [0:7] is the sub-list for field type_name } func init() { file_proto_sidecar_sidecar_proto_init() } diff --git a/proto/sidecar/sidecar.proto b/proto/sidecar/sidecar.proto index 1fa9861..bf5e709 100644 --- a/proto/sidecar/sidecar.proto +++ b/proto/sidecar/sidecar.proto @@ -9,7 +9,7 @@ import "google/protobuf/any.proto"; // domain message Event { string eventName = 1; - google.protobuf.Any data = 2; + bytes payload = 2; } message KeyVal { diff --git a/proto/trace/trace.pb.go b/proto/trace/trace.pb.go new file mode 100644 index 0000000..7ac17c4 --- /dev/null +++ b/proto/trace/trace.pb.go @@ -0,0 +1,336 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.28.1 +// protoc v3.21.9 +// source: proto/trace/trace.proto + +package trace + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +// domain +type Span struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"` + Id string `protobuf:"bytes,2,opt,name=id,proto3" json:"id,omitempty"` + Parent string `protobuf:"bytes,3,opt,name=parent,proto3" json:"parent,omitempty"` + Trace string `protobuf:"bytes,4,opt,name=trace,proto3" json:"trace,omitempty"` + Started uint64 `protobuf:"varint,5,opt,name=started,proto3" json:"started,omitempty"` + Ended uint64 `protobuf:"varint,6,opt,name=ended,proto3" json:"ended,omitempty"` + Metadata map[string]string `protobuf:"bytes,7,rep,name=metadata,proto3" json:"metadata,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *Span) Reset() { + *x = Span{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_trace_trace_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Span) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Span) ProtoMessage() {} + +func (x *Span) ProtoReflect() protoreflect.Message { + mi := &file_proto_trace_trace_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Span.ProtoReflect.Descriptor instead. +func (*Span) Descriptor() ([]byte, []int) { + return file_proto_trace_trace_proto_rawDescGZIP(), []int{0} +} + +func (x *Span) GetName() string { + if x != nil { + return x.Name + } + return "" +} + +func (x *Span) GetId() string { + if x != nil { + return x.Id + } + return "" +} + +func (x *Span) GetParent() string { + if x != nil { + return x.Parent + } + return "" +} + +func (x *Span) GetTrace() string { + if x != nil { + return x.Trace + } + return "" +} + +func (x *Span) GetStarted() uint64 { + if x != nil { + return x.Started + } + return 0 +} + +func (x *Span) GetEnded() uint64 { + if x != nil { + return x.Ended + } + return 0 +} + +func (x *Span) GetMetadata() map[string]string { + if x != nil { + return x.Metadata + } + return nil +} + +// trace request/response +type TraceRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Count uint64 `protobuf:"varint,1,opt,name=count,proto3" json:"count,omitempty"` +} + +func (x *TraceRequest) Reset() { + *x = TraceRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_trace_trace_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TraceRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TraceRequest) ProtoMessage() {} + +func (x *TraceRequest) ProtoReflect() protoreflect.Message { + mi := &file_proto_trace_trace_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TraceRequest.ProtoReflect.Descriptor instead. +func (*TraceRequest) Descriptor() ([]byte, []int) { + return file_proto_trace_trace_proto_rawDescGZIP(), []int{1} +} + +func (x *TraceRequest) GetCount() uint64 { + if x != nil { + return x.Count + } + return 0 +} + +type TraceResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Spans []*Span `protobuf:"bytes,1,rep,name=spans,proto3" json:"spans,omitempty"` +} + +func (x *TraceResponse) Reset() { + *x = TraceResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proto_trace_trace_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *TraceResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*TraceResponse) ProtoMessage() {} + +func (x *TraceResponse) ProtoReflect() protoreflect.Message { + mi := &file_proto_trace_trace_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use TraceResponse.ProtoReflect.Descriptor instead. +func (*TraceResponse) Descriptor() ([]byte, []int) { + return file_proto_trace_trace_proto_rawDescGZIP(), []int{2} +} + +func (x *TraceResponse) GetSpans() []*Span { + if x != nil { + return x.Spans + } + return nil +} + +var File_proto_trace_trace_proto protoreflect.FileDescriptor + +var file_proto_trace_trace_proto_rawDesc = []byte{ + 0x0a, 0x17, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x72, 0x61, 0x63, 0x65, 0x2f, 0x74, 0x72, + 0x61, 0x63, 0x65, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x05, 0x74, 0x72, 0x61, 0x63, 0x65, + 0x22, 0xfc, 0x01, 0x0a, 0x04, 0x53, 0x70, 0x61, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x6e, 0x61, 0x6d, + 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x6e, 0x61, 0x6d, 0x65, 0x12, 0x0e, 0x0a, + 0x02, 0x69, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x69, 0x64, 0x12, 0x16, 0x0a, + 0x06, 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x06, 0x70, + 0x61, 0x72, 0x65, 0x6e, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x74, 0x72, 0x61, 0x63, 0x65, 0x18, 0x04, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x74, 0x72, 0x61, 0x63, 0x65, 0x12, 0x18, 0x0a, 0x07, 0x73, + 0x74, 0x61, 0x72, 0x74, 0x65, 0x64, 0x18, 0x05, 0x20, 0x01, 0x28, 0x04, 0x52, 0x07, 0x73, 0x74, + 0x61, 0x72, 0x74, 0x65, 0x64, 0x12, 0x14, 0x0a, 0x05, 0x65, 0x6e, 0x64, 0x65, 0x64, 0x18, 0x06, + 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, 0x65, 0x6e, 0x64, 0x65, 0x64, 0x12, 0x35, 0x0a, 0x08, 0x6d, + 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x18, 0x07, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x19, 0x2e, + 0x74, 0x72, 0x61, 0x63, 0x65, 0x2e, 0x53, 0x70, 0x61, 0x6e, 0x2e, 0x4d, 0x65, 0x74, 0x61, 0x64, + 0x61, 0x74, 0x61, 0x45, 0x6e, 0x74, 0x72, 0x79, 0x52, 0x08, 0x6d, 0x65, 0x74, 0x61, 0x64, 0x61, + 0x74, 0x61, 0x1a, 0x3b, 0x0a, 0x0d, 0x4d, 0x65, 0x74, 0x61, 0x64, 0x61, 0x74, 0x61, 0x45, 0x6e, + 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, + 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, 0x22, + 0x24, 0x0a, 0x0c, 0x54, 0x72, 0x61, 0x63, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, + 0x14, 0x0a, 0x05, 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x18, 0x01, 0x20, 0x01, 0x28, 0x04, 0x52, 0x05, + 0x63, 0x6f, 0x75, 0x6e, 0x74, 0x22, 0x32, 0x0a, 0x0d, 0x54, 0x72, 0x61, 0x63, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x21, 0x0a, 0x05, 0x73, 0x70, 0x61, 0x6e, 0x73, 0x18, + 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x0b, 0x2e, 0x74, 0x72, 0x61, 0x63, 0x65, 0x2e, 0x53, 0x70, + 0x61, 0x6e, 0x52, 0x05, 0x73, 0x70, 0x61, 0x6e, 0x73, 0x42, 0x22, 0x5a, 0x20, 0x67, 0x69, 0x74, + 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x77, 0x2d, 0x68, 0x2d, 0x61, 0x2f, 0x70, 0x6b, + 0x67, 0x2f, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2f, 0x74, 0x72, 0x61, 0x63, 0x65, 0x62, 0x06, 0x70, + 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_proto_trace_trace_proto_rawDescOnce sync.Once + file_proto_trace_trace_proto_rawDescData = file_proto_trace_trace_proto_rawDesc +) + +func file_proto_trace_trace_proto_rawDescGZIP() []byte { + file_proto_trace_trace_proto_rawDescOnce.Do(func() { + file_proto_trace_trace_proto_rawDescData = protoimpl.X.CompressGZIP(file_proto_trace_trace_proto_rawDescData) + }) + return file_proto_trace_trace_proto_rawDescData +} + +var file_proto_trace_trace_proto_msgTypes = make([]protoimpl.MessageInfo, 4) +var file_proto_trace_trace_proto_goTypes = []interface{}{ + (*Span)(nil), // 0: trace.Span + (*TraceRequest)(nil), // 1: trace.TraceRequest + (*TraceResponse)(nil), // 2: trace.TraceResponse + nil, // 3: trace.Span.MetadataEntry +} +var file_proto_trace_trace_proto_depIdxs = []int32{ + 3, // 0: trace.Span.metadata:type_name -> trace.Span.MetadataEntry + 0, // 1: trace.TraceResponse.spans:type_name -> trace.Span + 2, // [2:2] is the sub-list for method output_type + 2, // [2:2] is the sub-list for method input_type + 2, // [2:2] is the sub-list for extension type_name + 2, // [2:2] is the sub-list for extension extendee + 0, // [0:2] is the sub-list for field type_name +} + +func init() { file_proto_trace_trace_proto_init() } +func file_proto_trace_trace_proto_init() { + if File_proto_trace_trace_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_proto_trace_trace_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Span); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_trace_trace_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TraceRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proto_trace_trace_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*TraceResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_proto_trace_trace_proto_rawDesc, + NumEnums: 0, + NumMessages: 4, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_proto_trace_trace_proto_goTypes, + DependencyIndexes: file_proto_trace_trace_proto_depIdxs, + MessageInfos: file_proto_trace_trace_proto_msgTypes, + }.Build() + File_proto_trace_trace_proto = out.File + file_proto_trace_trace_proto_rawDesc = nil + file_proto_trace_trace_proto_goTypes = nil + file_proto_trace_trace_proto_depIdxs = nil +} diff --git a/proto/trace/trace.proto b/proto/trace/trace.proto new file mode 100644 index 0000000..011c3f5 --- /dev/null +++ b/proto/trace/trace.proto @@ -0,0 +1,25 @@ +syntax = "proto3"; + +package trace; + +option go_package = "github.com/w-h-a/pkg/proto/trace"; + +// domain +message Span { + string name = 1; + string id = 2; + string parent = 3; + string trace = 4; + uint64 started = 5; + uint64 ended = 6; + map metadata = 7; +} + +// trace request/response +message TraceRequest { + uint64 count = 1; +} + +message TraceResponse { + repeated Span spans = 1; +} \ No newline at end of file diff --git a/security/authn/utils.go b/security/authn/utils.go index 6fd84c8..102e667 100644 --- a/security/authn/utils.go +++ b/security/authn/utils.go @@ -8,7 +8,7 @@ import ( ) const ( - accountKey = "auth-account" + AccountKey = "auth-account" ) func ContextWithAccount(ctx context.Context, account *Account) (context.Context, error) { @@ -17,11 +17,11 @@ func ContextWithAccount(ctx context.Context, account *Account) (context.Context, return ctx, err } - return metadatautils.SetContext(ctx, accountKey, string(bytes)), nil + return metadatautils.SetContext(ctx, AccountKey, string(bytes)), nil } func AccountFromContext(ctx context.Context) (*Account, error) { - str, ok := metadatautils.GetContext(ctx, accountKey) + str, ok := metadatautils.GetContext(ctx, AccountKey) if !ok { return nil, nil } diff --git a/serverv2/grpc/server.go b/serverv2/grpc/server.go index 2425669..b1a97b0 100644 --- a/serverv2/grpc/server.go +++ b/serverv2/grpc/server.go @@ -84,8 +84,6 @@ func (s *server) Run() error { } s.mtx.RUnlock() - // TODO: init exporters here? - // TODO: tls listener, err := net.Listen("tcp", s.options.Address) if err != nil { diff --git a/serverv2/http/server.go b/serverv2/http/server.go index e8799c6..844f4b7 100644 --- a/serverv2/http/server.go +++ b/serverv2/http/server.go @@ -68,8 +68,6 @@ func (s *server) Run() error { } s.mtx.RUnlock() - // TODO: init exporters here? - // TODO: tls listener, err := net.Listen("tcp", s.options.Address) if err != nil { diff --git a/serverv2/http/server_test.go b/serverv2/http/server_test.go index 0d11c3c..b5a693a 100644 --- a/serverv2/http/server_test.go +++ b/serverv2/http/server_test.go @@ -10,10 +10,11 @@ import ( "github.com/w-h-a/pkg/serverv2" "github.com/w-h-a/pkg/telemetry/log" "github.com/w-h-a/pkg/telemetry/log/memory" + "github.com/w-h-a/pkg/utils/memoryutils" ) func TestHttpServer(t *testing.T) { - logger := memory.NewLog() + logger := memory.NewLog(memory.LogWithBuffer(memoryutils.NewBuffer())) log.SetLogger(logger) diff --git a/serverv2/http/utils.go b/serverv2/http/utils.go deleted file mode 100644 index 71fa12a..0000000 --- a/serverv2/http/utils.go +++ /dev/null @@ -1,24 +0,0 @@ -package http - -import ( - "encoding/json" - "net/http" - - "github.com/w-h-a/pkg/utils/errorutils" -) - -func ErrResponse(w http.ResponseWriter, err error) { - internal := err.(*errorutils.Error) - Response(w, int(internal.Code), []byte(internal.Error())) -} - -func OkResponse(w http.ResponseWriter, payload interface{}) { - bs, _ := json.Marshal(payload) - Response(w, 200, bs) -} - -func Response(w http.ResponseWriter, code int, bs []byte) { - w.Header().Set("content-type", "application/json") - w.WriteHeader(code) - w.Write(bs) -} diff --git a/sidecar/custom/sidecar.go b/sidecar/custom/sidecar.go index 4e4a291..882576d 100644 --- a/sidecar/custom/sidecar.go +++ b/sidecar/custom/sidecar.go @@ -2,6 +2,7 @@ package custom import ( "context" + "encoding/hex" "encoding/json" "fmt" "strconv" @@ -13,6 +14,7 @@ import ( "github.com/w-h-a/pkg/sidecar" "github.com/w-h-a/pkg/store" "github.com/w-h-a/pkg/telemetry/log" + "github.com/w-h-a/pkg/telemetry/tracev2" "github.com/w-h-a/pkg/utils/datautils" "golang.org/x/text/cases" "golang.org/x/text/language" @@ -28,14 +30,26 @@ func (s *customSidecar) Options() sidecar.SidecarOptions { return s.options } -func (s *customSidecar) SaveStateToStore(state *sidecar.State) error { +func (s *customSidecar) SaveStateToStore(ctx context.Context, state *sidecar.State) error { + _, spanId := s.options.Tracer.Start(ctx, "customSidecar.SaveStateToStore") + defer s.options.Tracer.Finish(spanId) + + records, _ := json.Marshal(state.Records) + + s.options.Tracer.AddMetadata(spanId, map[string]string{ + "storeId": state.StoreId, + "records": string(records), + }) + if len(state.Records) == 0 { + s.options.Tracer.UpdateStatus(spanId, 2, "success") return nil } st, ok := s.options.Stores[state.StoreId] if !ok { log.Warnf("store %s was not found", state.StoreId) + s.options.Tracer.UpdateStatus(spanId, 1, fmt.Sprintf("store %s was not found", state.StoreId)) return sidecar.ErrComponentNotFound } @@ -48,82 +62,149 @@ func (s *customSidecar) SaveStateToStore(state *sidecar.State) error { bs, err := datautils.Stringify(data) if err != nil { + s.options.Tracer.UpdateStatus(spanId, 1, err.Error()) return err } storeRecord.Value = bs if err := st.Write(storeRecord); err != nil { + s.options.Tracer.UpdateStatus(spanId, 1, err.Error()) return err } } + s.options.Tracer.UpdateStatus(spanId, 2, "success") + return nil } -func (c *customSidecar) ListStateFromStore(storeId string) ([]*store.Record, error) { - st, ok := c.options.Stores[storeId] +func (s *customSidecar) ListStateFromStore(ctx context.Context, storeId string) ([]*store.Record, error) { + _, spanId := s.options.Tracer.Start(ctx, "customSidecar.ListStateFromStore") + defer s.options.Tracer.Finish(spanId) + + s.options.Tracer.AddMetadata(spanId, map[string]string{ + "storeId": storeId, + }) + + st, ok := s.options.Stores[storeId] if !ok { log.Warnf("store %s was not found", storeId) + s.options.Tracer.UpdateStatus(spanId, 1, fmt.Sprintf("store %s was not found", storeId)) return nil, sidecar.ErrComponentNotFound } // TODO: limit + offset recs, err := st.Read("", store.ReadWithPrefix()) if err != nil { + s.options.Tracer.UpdateStatus(spanId, 1, err.Error()) return nil, err } + s.options.Tracer.UpdateStatus(spanId, 2, "success") + return recs, nil } -func (s *customSidecar) SingleStateFromStore(storeId, key string) ([]*store.Record, error) { +func (s *customSidecar) SingleStateFromStore(ctx context.Context, storeId, key string) ([]*store.Record, error) { + _, spanId := s.options.Tracer.Start(ctx, "customSidecar.SingleStateFromStore") + defer s.options.Tracer.Finish(spanId) + + s.options.Tracer.AddMetadata(spanId, map[string]string{ + "storeId": storeId, + "key": key, + }) + st, ok := s.options.Stores[storeId] if !ok { log.Warnf("store %s was not found", storeId) + s.options.Tracer.UpdateStatus(spanId, 1, fmt.Sprintf("store %s was not found", storeId)) return nil, sidecar.ErrComponentNotFound } recs, err := st.Read(key) if err != nil { + s.options.Tracer.UpdateStatus(spanId, 1, err.Error()) return nil, err } + s.options.Tracer.UpdateStatus(spanId, 2, "success") + return recs, nil } -func (s *customSidecar) RemoveStateFromStore(storeId, key string) error { +func (s *customSidecar) RemoveStateFromStore(ctx context.Context, storeId, key string) error { + _, spanId := s.options.Tracer.Start(ctx, "customSidecar.RemoveStateFromStore") + defer s.options.Tracer.Finish(spanId) + + s.options.Tracer.AddMetadata(spanId, map[string]string{ + "storeId": storeId, + "key": key, + }) + st, ok := s.options.Stores[storeId] if !ok { log.Warnf("store %s was not found", storeId) + s.options.Tracer.UpdateStatus(spanId, 1, fmt.Sprintf("store %s was not found", storeId)) return sidecar.ErrComponentNotFound } if err := st.Delete(key); err != nil { + s.options.Tracer.UpdateStatus(spanId, 1, err.Error()) return err } + s.options.Tracer.UpdateStatus(spanId, 2, "success") + return nil } -func (s *customSidecar) WriteEventToBroker(event *sidecar.Event) error { +func (s *customSidecar) WriteEventToBroker(ctx context.Context, event *sidecar.Event) error { + newCtx, spanId := s.options.Tracer.Start(ctx, "customSidecar.WriteEventToBroker") + defer s.options.Tracer.Finish(spanId) + + if traceparent, found := tracev2.TraceParentFromContext(newCtx); found { + if _, ok := event.Payload[tracev2.TraceParentKey].(string); !ok { + event.Payload[tracev2.TraceParentKey] = hex.EncodeToString(traceparent[:]) + } + } + + payload, _ := json.Marshal(event.Payload) + + s.options.Tracer.AddMetadata(spanId, map[string]string{ + "eventName": event.EventName, + "payload": string(payload), + }) + bk, ok := s.options.Brokers[event.EventName] if !ok { log.Warnf("broker %s was not found", event.EventName) + s.options.Tracer.UpdateStatus(spanId, 1, fmt.Sprintf("broker %s was not found", event.EventName)) return sidecar.ErrComponentNotFound } - if err := bk.Publish(event.Data, *bk.Options().PublishOptions); err != nil { + if err := bk.Publish(event.Payload, *bk.Options().PublishOptions); err != nil { + s.options.Tracer.UpdateStatus(spanId, 1, err.Error()) return err } + s.options.Tracer.UpdateStatus(spanId, 2, "success") + return nil } -func (s *customSidecar) ReadEventsFromBroker(brokerId string) { +func (s *customSidecar) ReadEventsFromBroker(ctx context.Context, brokerId string) { + _, spanId := s.options.Tracer.Start(ctx, "customSidecar.ReadEventsFromBroker") + defer s.options.Tracer.Finish(spanId) + + s.options.Tracer.AddMetadata(spanId, map[string]string{ + "brokerId": brokerId, + }) + bk, ok := s.options.Brokers[brokerId] if !ok { log.Warnf("broker %s was not found", brokerId) + s.options.Tracer.UpdateStatus(spanId, 1, fmt.Sprintf("broker %s was not found", brokerId)) return } @@ -133,22 +214,47 @@ func (s *customSidecar) ReadEventsFromBroker(brokerId string) { if ok { log.Warnf("a subscriber for broker %s was already found", brokerId) s.mtx.RUnlock() + s.options.Tracer.UpdateStatus(spanId, 1, fmt.Sprintf("a subscriber for broker %s was already found", brokerId)) return } s.mtx.RUnlock() sub := bk.Subscribe(func(b []byte) error { - var body interface{} - if err := json.Unmarshal(b, &body); err != nil { + var payload map[string]interface{} + + if err := json.Unmarshal(b, &payload); err != nil { + s.options.Tracer.UpdateStatus(spanId, 1, err.Error()) return err } + var traceparent [16]byte + + var spanId string + + if encoded, ok := payload[tracev2.TraceParentKey].(string); ok { + decoded, _ := hex.DecodeString(encoded) + copy(traceparent[:], decoded) + ctx, _ := tracev2.ContextWithTraceParent(context.Background(), traceparent) + _, spanId = s.options.Tracer.Start(ctx, fmt.Sprintf("%s.Handler", brokerId)) + } else { + _, spanId = s.options.Tracer.Start(context.Background(), fmt.Sprintf("%s.Handler", brokerId)) + } + + defer s.options.Tracer.Finish(spanId) + + s.options.Tracer.AddMetadata(spanId, map[string]string{ + "brokerId": brokerId, + "payload": string(b), + }) + event := &sidecar.Event{ EventName: brokerId, - Data: body, + Payload: payload, } + s.options.Tracer.UpdateStatus(spanId, 2, "success") + return s.sendEventToService(event) }, *bk.Options().SubscribeOptions) @@ -156,20 +262,31 @@ func (s *customSidecar) ReadEventsFromBroker(brokerId string) { defer s.mtx.Unlock() s.subscribers[brokerId] = sub + + s.options.Tracer.UpdateStatus(spanId, 2, "success") } -func (s *customSidecar) UnsubscribeFromBroker(brokerId string) error { +func (s *customSidecar) UnsubscribeFromBroker(ctx context.Context, brokerId string) error { + _, spanId := s.options.Tracer.Start(ctx, "customSidecar.UnsubscribeFromBroker") + defer s.options.Tracer.Finish(spanId) + + s.options.Tracer.AddMetadata(spanId, map[string]string{ + "brokerId": brokerId, + }) + s.mtx.RLock() sub, ok := s.subscribers[brokerId] if !ok { s.mtx.RUnlock() + s.options.Tracer.UpdateStatus(spanId, 1, fmt.Sprintf("broker %s was not found", brokerId)) return nil } s.mtx.RUnlock() if err := sub.Unsubscribe(); err != nil { + s.options.Tracer.UpdateStatus(spanId, 1, err.Error()) return err } @@ -178,21 +295,35 @@ func (s *customSidecar) UnsubscribeFromBroker(brokerId string) error { delete(s.subscribers, brokerId) + s.options.Tracer.UpdateStatus(spanId, 2, "success") + return nil } -func (s *customSidecar) ReadFromSecretStore(secretStore string, name string) (*sidecar.Secret, error) { +func (s *customSidecar) ReadFromSecretStore(ctx context.Context, secretStore string, name string) (*sidecar.Secret, error) { + _, spanId := s.options.Tracer.Start(ctx, "customSidecar.ReadFromSecretStore") + defer s.options.Tracer.Finish(spanId) + + s.options.Tracer.AddMetadata(spanId, map[string]string{ + "secretStore": secretStore, + "name": name, + }) + sc, ok := s.options.Secrets[secretStore] if !ok { log.Warnf("secret store %s was not found", secretStore) + s.options.Tracer.UpdateStatus(spanId, 1, fmt.Sprintf("secret store %s was not found", secretStore)) return nil, sidecar.ErrComponentNotFound } mp, err := sc.GetSecret(name) if err != nil { + s.options.Tracer.UpdateStatus(spanId, 1, fmt.Sprintf("failed to get secret: %v", err)) return nil, err } + s.options.Tracer.UpdateStatus(spanId, 2, "success") + return &sidecar.Secret{ Data: mp, }, nil diff --git a/sidecar/domain.go b/sidecar/domain.go index a947d0c..bf402a1 100644 --- a/sidecar/domain.go +++ b/sidecar/domain.go @@ -1,8 +1,8 @@ package sidecar type Event struct { - EventName string `json:"eventName,omitempty"` - Data interface{} `json:"data,omitempty"` + EventName string `json:"eventName,omitempty"` + Payload map[string]interface{} `json:"payload,omitempty"` } type State struct { diff --git a/sidecar/options.go b/sidecar/options.go index e7460e3..4f600cd 100644 --- a/sidecar/options.go +++ b/sidecar/options.go @@ -7,6 +7,7 @@ import ( "github.com/w-h-a/pkg/client" "github.com/w-h-a/pkg/security/secret" "github.com/w-h-a/pkg/store" + "github.com/w-h-a/pkg/telemetry/tracev2" ) type SidecarOption func(o *SidecarOptions) @@ -20,6 +21,7 @@ type SidecarOptions struct { Stores map[string]store.Store Brokers map[string]broker.Broker Secrets map[string]secret.Secret + Tracer tracev2.Trace Context context.Context } @@ -76,6 +78,12 @@ func SidecarWithSecrets(s map[string]secret.Secret) SidecarOption { } } +func SidecarWithTracer(tr tracev2.Trace) SidecarOption { + return func(o *SidecarOptions) { + o.Tracer = tr + } +} + func NewSidecarOptions(opts ...SidecarOption) SidecarOptions { options := SidecarOptions{ Stores: map[string]store.Store{}, diff --git a/sidecar/sidecar.go b/sidecar/sidecar.go index fe6dea9..5213af8 100644 --- a/sidecar/sidecar.go +++ b/sidecar/sidecar.go @@ -1,6 +1,7 @@ package sidecar import ( + "context" "errors" "github.com/w-h-a/pkg/store" @@ -13,13 +14,13 @@ var ( type Sidecar interface { Options() SidecarOptions - SaveStateToStore(state *State) error - ListStateFromStore(store string) ([]*store.Record, error) - SingleStateFromStore(store, key string) ([]*store.Record, error) - RemoveStateFromStore(store, key string) error - WriteEventToBroker(event *Event) error - ReadEventsFromBroker(broker string) - UnsubscribeFromBroker(broker string) error - ReadFromSecretStore(secretStore string, name string) (*Secret, error) + SaveStateToStore(ctx context.Context, state *State) error + ListStateFromStore(ctx context.Context, store string) ([]*store.Record, error) + SingleStateFromStore(ctx context.Context, store, key string) ([]*store.Record, error) + RemoveStateFromStore(ctx context.Context, store, key string) error + WriteEventToBroker(ctx context.Context, event *Event) error + ReadEventsFromBroker(ctx context.Context, broker string) + UnsubscribeFromBroker(ctx context.Context, broker string) error + ReadFromSecretStore(ctx context.Context, secretStore string, name string) (*Secret, error) String() string } diff --git a/sidecar/utils.go b/sidecar/utils.go index da43f63..4ea5a4d 100644 --- a/sidecar/utils.go +++ b/sidecar/utils.go @@ -4,19 +4,13 @@ import ( "encoding/json" pb "github.com/w-h-a/pkg/proto/sidecar" - "google.golang.org/protobuf/types/known/anypb" ) func SerializeEvent(event *Event) (*pb.Event, error) { - bs, err := json.Marshal(event.Data) - if err != nil { - return nil, err - } + bs, _ := json.Marshal(event.Payload) return &pb.Event{ EventName: event.EventName, - Data: &anypb.Any{ - Value: bs, - }, + Payload: bs, }, nil } diff --git a/telemetry/buffer/buffer.go b/telemetry/buffer/buffer.go deleted file mode 100644 index 2a7834f..0000000 --- a/telemetry/buffer/buffer.go +++ /dev/null @@ -1,12 +0,0 @@ -package buffer - -var ( - defaultSize = 1024 -) - -type Buffer interface { - Options() BufferOptions - Put(v interface{}) - Get(n int) []*Entry - String() string -} diff --git a/telemetry/buffer/domain.go b/telemetry/buffer/domain.go deleted file mode 100644 index 78ca85c..0000000 --- a/telemetry/buffer/domain.go +++ /dev/null @@ -1,8 +0,0 @@ -package buffer - -import "time" - -type Entry struct { - Value interface{} - Timestamp time.Time -} diff --git a/telemetry/log/memory/log.go b/telemetry/log/memory/log.go index 8a2eec1..08a63d5 100644 --- a/telemetry/log/memory/log.go +++ b/telemetry/log/memory/log.go @@ -3,14 +3,13 @@ package memory import ( golog "log" - "github.com/w-h-a/pkg/telemetry/buffer" - "github.com/w-h-a/pkg/telemetry/buffer/memory" "github.com/w-h-a/pkg/telemetry/log" + "github.com/w-h-a/pkg/utils/memoryutils" ) type memoryLog struct { options log.LogOptions - buffer buffer.Buffer + buffer *memoryutils.Buffer } func (l *memoryLog) Options() log.LogOptions { @@ -30,10 +29,12 @@ func (l *memoryLog) Write(rec log.Record) error { func (l *memoryLog) Read(opts ...log.ReadOption) ([]log.Record, error) { options := log.NewReadOptions(opts...) - entries := []*buffer.Entry{} + var entries []*memoryutils.Entry if options.Count > 0 { entries = l.buffer.Get(options.Count) + } else { + entries = l.buffer.Get(l.buffer.Options().Size) } records := []log.Record{} @@ -61,10 +62,10 @@ func NewLog(opts ...log.LogOption) log.Log { options: options, } - if s, ok := GetSizeFromContext(options.Context); ok && s > 0 { - l.buffer = memory.NewBuffer(buffer.BufferWithSize(s)) + if b, ok := GetBufferFromContext(options.Context); ok && b != nil { + l.buffer = b } else { - l.buffer = memory.NewBuffer() + log.Fatalf("no buffer was given") } return l diff --git a/telemetry/log/memory/log_test.go b/telemetry/log/memory/log_test.go index 299243e..4f3a5bb 100644 --- a/telemetry/log/memory/log_test.go +++ b/telemetry/log/memory/log_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/require" "github.com/w-h-a/pkg/telemetry/log" + "github.com/w-h-a/pkg/utils/memoryutils" ) func TestLogger(t *testing.T) { @@ -13,7 +14,7 @@ func TestLogger(t *testing.T) { service := "service.namespace" - logger := NewLog(log.LogWithPrefix(service), log.LogWithLevel(log.LevelDebug), LogWithSize(size)) + logger := NewLog(log.LogWithPrefix(service), log.LogWithLevel(log.LevelDebug), LogWithBuffer(memoryutils.NewBuffer(memoryutils.BufferWithSize(size)))) require.Equal(t, size, logger.(*memoryLog).buffer.Options().Size) log.SetLogger(logger) diff --git a/telemetry/log/memory/options.go b/telemetry/log/memory/options.go index 378f36d..97665aa 100644 --- a/telemetry/log/memory/options.go +++ b/telemetry/log/memory/options.go @@ -4,17 +4,18 @@ import ( "context" "github.com/w-h-a/pkg/telemetry/log" + "github.com/w-h-a/pkg/utils/memoryutils" ) -type sizeKey struct{} +type bufferKey struct{} -func LogWithSize(s int) log.LogOption { +func LogWithBuffer(b *memoryutils.Buffer) log.LogOption { return func(o *log.LogOptions) { - o.Context = context.WithValue(o.Context, sizeKey{}, s) + o.Context = context.WithValue(o.Context, bufferKey{}, b) } } -func GetSizeFromContext(ctx context.Context) (int, bool) { - s, ok := ctx.Value(sizeKey{}).(int) - return s, ok +func GetBufferFromContext(ctx context.Context) (*memoryutils.Buffer, bool) { + b, ok := ctx.Value(bufferKey{}).(*memoryutils.Buffer) + return b, ok } diff --git a/telemetry/trace/domain.go b/telemetry/trace/domain.go new file mode 100644 index 0000000..58a0895 --- /dev/null +++ b/telemetry/trace/domain.go @@ -0,0 +1,15 @@ +package trace + +import ( + "time" +) + +type Span struct { + Name string `json:"name"` + Id string `json:"id"` + Parent string `json:"parent"` + Trace string `json:"trace"` + Started time.Time `json:"started"` + Duration time.Duration `json:"duration"` + Metadata map[string]string `json:"metadata"` +} diff --git a/telemetry/trace/memory/options.go b/telemetry/trace/memory/options.go new file mode 100644 index 0000000..d9c4102 --- /dev/null +++ b/telemetry/trace/memory/options.go @@ -0,0 +1,20 @@ +package memory + +import ( + "context" + + "github.com/w-h-a/pkg/telemetry/trace" +) + +type sizeKey struct{} + +func TraceWithSize(s int) trace.TraceOption { + return func(o *trace.TraceOptions) { + o.Context = context.WithValue(o.Context, sizeKey{}, s) + } +} + +func GetSizeFromContext(ctx context.Context) (int, bool) { + s, ok := ctx.Value(sizeKey{}).(int) + return s, ok +} diff --git a/telemetry/trace/memory/trace.go b/telemetry/trace/memory/trace.go new file mode 100644 index 0000000..d853a7d --- /dev/null +++ b/telemetry/trace/memory/trace.go @@ -0,0 +1,104 @@ +package memory + +import ( + "context" + "time" + + "github.com/google/uuid" + "github.com/w-h-a/pkg/telemetry/trace" + "github.com/w-h-a/pkg/utils/memoryutils" +) + +type memoryTrace struct { + options trace.TraceOptions + buffer *memoryutils.Buffer +} + +func (t *memoryTrace) Options() trace.TraceOptions { + return t.options +} + +func (t *memoryTrace) Start(ctx context.Context, name string, md map[string]string) (context.Context, *trace.Span, error) { + span := &trace.Span{ + Name: name, + Id: uuid.New().String(), + Trace: uuid.New().String(), + Started: time.Now(), + Metadata: md, + } + + if ctx == nil { + newCtx, err := trace.ContextWithIds(context.Background(), span.Trace, span.Id) + return newCtx, span, err + } + + traceId, traceFound, parentId, spanFound := trace.IdsFromContext(ctx) + + if !traceFound { + newCtx, err := trace.ContextWithIds(ctx, span.Trace, span.Id) + return newCtx, span, err + } else { + span.Trace = traceId + } + + if spanFound { + span.Parent = parentId + } + + newCtx, err := trace.ContextWithIds(ctx, span.Trace, span.Id) + return newCtx, span, err +} + +func (t *memoryTrace) Finish(span *trace.Span) error { + span.Duration = time.Since(span.Started) + + t.buffer.Put(span) + + return nil +} + +func (t *memoryTrace) Read(opts ...trace.ReadOption) ([]*trace.Span, error) { + options := trace.NewReadOptions(opts...) + + var entries []*memoryutils.Entry + + if options.Count > 0 { + entries = t.buffer.Get(options.Count) + } else { + entries = t.buffer.Get(t.buffer.Options().Size) + } + + spans := []*trace.Span{} + + for _, entry := range entries { + span := entry.Value.(*trace.Span) + + if len(options.Trace) > 0 && options.Trace != span.Trace { + continue + } + + spans = append(spans, span) + } + + return spans, nil +} + +func (t *memoryTrace) String() string { + return "memory" +} + +func NewTrace(opts ...trace.TraceOption) trace.Trace { + options := trace.NewTraceOptions(opts...) + + t := &memoryTrace{ + options: options, + } + + if s, ok := GetSizeFromContext(options.Context); ok && s > 0 { + t.buffer = memoryutils.NewBuffer(memoryutils.BufferWithSize(s)) + } else { + t.buffer = memoryutils.NewBuffer() + } + + return t +} diff --git a/telemetry/trace/options.go b/telemetry/trace/options.go new file mode 100644 index 0000000..c788eec --- /dev/null +++ b/telemetry/trace/options.go @@ -0,0 +1,53 @@ +package trace + +import "context" + +type TraceOption func(o *TraceOptions) + +type TraceOptions struct { + Context context.Context +} + +func NewTraceOptions(opts ...TraceOption) TraceOptions { + options := TraceOptions{ + Context: context.Background(), + } + + for _, fn := range opts { + fn(&options) + } + + return options +} + +type ReadOption func(o *ReadOptions) + +type ReadOptions struct { + Trace string + Count int + Context context.Context +} + +func ReadWithTrace(t string) ReadOption { + return func(o *ReadOptions) { + o.Trace = t + } +} + +func ReadWithCount(c int) ReadOption { + return func(o *ReadOptions) { + o.Count = c + } +} + +func NewReadOptions(opts ...ReadOption) ReadOptions { + options := ReadOptions{ + Context: context.Background(), + } + + for _, fn := range opts { + fn(&options) + } + + return options +} diff --git a/telemetry/trace/trace.go b/telemetry/trace/trace.go new file mode 100644 index 0000000..b5d417a --- /dev/null +++ b/telemetry/trace/trace.go @@ -0,0 +1,28 @@ +package trace + +import ( + "context" + "errors" +) + +var ( + ErrStart = errors.New("failed to start span") +) + +var tracer Trace + +type Trace interface { + Options() TraceOptions + Start(ctx context.Context, name string, md map[string]string) (context.Context, *Span, error) + Finish(span *Span) error + Read(opts ...ReadOption) ([]*Span, error) + String() string +} + +func SetTracer(t Trace) { + tracer = t +} + +func GetTracer() Trace { + return tracer +} diff --git a/telemetry/trace/utils.go b/telemetry/trace/utils.go new file mode 100644 index 0000000..b438682 --- /dev/null +++ b/telemetry/trace/utils.go @@ -0,0 +1,30 @@ +package trace + +import ( + "context" + + "github.com/w-h-a/pkg/utils/metadatautils" +) + +const ( + traceIdKey = "trace-id" + spanIdKey = "span-id" +) + +func ContextWithIds(ctx context.Context, traceId, parentId string) (context.Context, error) { + return metadatautils.MergeContext(ctx, map[string]string{ + traceIdKey: traceId, + spanIdKey: parentId, + }, true), nil +} + +func IdsFromContext(ctx context.Context) (traceId string, foundTrace bool, parentId string, foundParent bool) { + traceId, traceOk := metadatautils.GetContext(ctx, traceIdKey) + if !traceOk { + return + } + + parentId, spanOk := metadatautils.GetContext(ctx, spanIdKey) + + return traceId, traceOk, parentId, spanOk +} diff --git a/telemetry/tracev2/domain.go b/telemetry/tracev2/domain.go new file mode 100644 index 0000000..23e202b --- /dev/null +++ b/telemetry/tracev2/domain.go @@ -0,0 +1,21 @@ +package tracev2 + +import ( + "time" +) + +type SpanData struct { + Name string `json:"name"` + Id string `json:"id"` + Parent string `json:"parent"` + Trace string `json:"trace"` + Started time.Time `json:"started"` + Ended time.Time `json:"ended"` + Metadata map[string]string `json:"metadata"` + Status Status `json:"status"` +} + +type Status struct { + Code uint32 `json:"code"` + Description string `json:"description"` +} diff --git a/telemetry/tracev2/memory/exporter.go b/telemetry/tracev2/memory/exporter.go new file mode 100644 index 0000000..f7d6d4c --- /dev/null +++ b/telemetry/tracev2/memory/exporter.go @@ -0,0 +1,69 @@ +package memory + +import ( + "context" + + "github.com/w-h-a/pkg/telemetry/tracev2" + "github.com/w-h-a/pkg/utils/memoryutils" + "go.opentelemetry.io/otel/attribute" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + "go.opentelemetry.io/otel/trace" +) + +type memoryExporter struct { + buffer *memoryutils.Buffer +} + +func (e *memoryExporter) ExportSpans(ctx context.Context, spans []sdktrace.ReadOnlySpan) error { + spanData := []*tracev2.SpanData{} + + for _, s := range spans { + var parentSpanId trace.SpanID + + if s.Parent().IsValid() { + parentSpanId = s.Parent().SpanID() + } + + metadata := map[string]string{} + + for _, attr := range s.Attributes() { + if attr.Value.Type() != attribute.STRING { + continue + } + metadata[string(attr.Key)] = attr.Value.AsString() + } + + status := tracev2.Status{ + Code: uint32(s.Status().Code), + Description: s.Status().Description, + } + + data := &tracev2.SpanData{ + Name: s.Name(), + Id: s.SpanContext().SpanID().String(), + Parent: parentSpanId.String(), + Trace: s.SpanContext().TraceID().String(), + Started: s.StartTime(), + Ended: s.EndTime(), + Metadata: metadata, + Status: status, + } + + spanData = append(spanData, data) + } + + for _, d := range spanData { + e.buffer.Put(d) + } + + return nil +} + +// TODO? +func (e *memoryExporter) Shutdown(ctx context.Context) error { + return nil +} + +func NewExporter(buffer *memoryutils.Buffer) sdktrace.SpanExporter { + return &memoryExporter{buffer} +} diff --git a/telemetry/tracev2/memory/options.go b/telemetry/tracev2/memory/options.go new file mode 100644 index 0000000..bef14ca --- /dev/null +++ b/telemetry/tracev2/memory/options.go @@ -0,0 +1,21 @@ +package memory + +import ( + "context" + + "github.com/w-h-a/pkg/telemetry/tracev2" + "github.com/w-h-a/pkg/utils/memoryutils" +) + +type bufferKey struct{} + +func TraceWithBuffer(b *memoryutils.Buffer) tracev2.TraceOption { + return func(o *tracev2.TraceOptions) { + o.Context = context.WithValue(o.Context, bufferKey{}, b) + } +} + +func GetBufferFromContext(ctx context.Context) (*memoryutils.Buffer, bool) { + b, ok := ctx.Value(bufferKey{}).(*memoryutils.Buffer) + return b, ok +} diff --git a/telemetry/tracev2/memory/trace.go b/telemetry/tracev2/memory/trace.go new file mode 100644 index 0000000..633c3ff --- /dev/null +++ b/telemetry/tracev2/memory/trace.go @@ -0,0 +1,153 @@ +package memory + +import ( + "context" + "sync" + + "github.com/w-h-a/pkg/telemetry/log" + "github.com/w-h-a/pkg/telemetry/tracev2" + "github.com/w-h-a/pkg/utils/memoryutils" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +type memoryTrace struct { + options tracev2.TraceOptions + tracer trace.Tracer + spans map[string]trace.Span + buffer *memoryutils.Buffer + mtx sync.RWMutex +} + +func (t *memoryTrace) Options() tracev2.TraceOptions { + return t.options +} + +func (t *memoryTrace) Start(ctx context.Context, name string) (context.Context, string) { + t.mtx.Lock() + defer t.mtx.Unlock() + + parentCtxCfg := trace.SpanContextConfig{} + + if spanparent, ok := tracev2.SpanParentFromContext(ctx); ok { + parentCtxCfg.SpanID = spanparent + } + + if traceparent, ok := tracev2.TraceParentFromContext(ctx); ok { + parentCtxCfg.TraceID = traceparent + } + + ctx, span := t.start(ctx, name, parentCtxCfg) + + key := span.SpanContext().SpanID().String() + + t.spans[key] = span + + intermediateCtx, _ := tracev2.ContextWithSpanParent(ctx, span.SpanContext().SpanID()) + + newCtx, _ := tracev2.ContextWithTraceParent(intermediateCtx, span.SpanContext().TraceID()) + + return newCtx, key +} + +func (t *memoryTrace) AddMetadata(span string, md map[string]string) { + t.mtx.Lock() + defer t.mtx.Unlock() + + if t.spans[span] == nil { + return + } + + if len(md) == 0 { + return + } + + attrs := []attribute.KeyValue{} + + for k, v := range md { + attrs = append(attrs, attribute.String(k, v)) + } + + if len(attrs) == 0 { + return + } + + t.spans[span].SetAttributes(attrs...) +} + +func (t *memoryTrace) UpdateStatus(span string, code uint32, description string) { + t.mtx.Lock() + defer t.mtx.Unlock() + + switch code { + case 2: + t.spans[span].SetStatus(codes.Ok, description) + case 1: + t.spans[span].SetStatus(codes.Error, description) + default: + } +} + +func (t *memoryTrace) Finish(span string) { + t.mtx.Lock() + defer t.mtx.Unlock() + + t.spans[span].End() + + delete(t.spans, span) +} + +func (t *memoryTrace) Read(opts ...tracev2.ReadOption) ([]*tracev2.SpanData, error) { + options := tracev2.NewReadOptions(opts...) + + var entries []*memoryutils.Entry + + if options.Count > 0 { + entries = t.buffer.Get(options.Count) + } else { + entries = t.buffer.Get(t.buffer.Options().Size) + } + + spans := []*tracev2.SpanData{} + + for _, entry := range entries { + span := entry.Value.(*tracev2.SpanData) + + spans = append(spans, span) + } + + return spans, nil +} + +func (t *memoryTrace) String() string { + return "memory" +} + +func (t *memoryTrace) start(ctx context.Context, name string, parentCtxCfg trace.SpanContextConfig) (context.Context, trace.Span) { + parentSpanCtx := trace.NewSpanContext(parentCtxCfg) + + ctx = trace.ContextWithRemoteSpanContext(ctx, parentSpanCtx) + + return t.tracer.Start(ctx, name) +} + +func NewTrace(opts ...tracev2.TraceOption) tracev2.Trace { + options := tracev2.NewTraceOptions(opts...) + + t := &memoryTrace{ + options: options, + tracer: otel.Tracer(options.Name), + spans: map[string]trace.Span{}, + mtx: sync.RWMutex{}, + } + + if b, ok := GetBufferFromContext(options.Context); ok && b != nil { + t.buffer = b + } else { + log.Fatalf("no buffer was given") + } + + return t +} diff --git a/telemetry/tracev2/options.go b/telemetry/tracev2/options.go new file mode 100644 index 0000000..871de5f --- /dev/null +++ b/telemetry/tracev2/options.go @@ -0,0 +1,53 @@ +package tracev2 + +import "context" + +type TraceOption func(o *TraceOptions) + +type TraceOptions struct { + Name string + Context context.Context +} + +func TraceWithName(name string) TraceOption { + return func(o *TraceOptions) { + o.Name = name + } +} + +func NewTraceOptions(opts ...TraceOption) TraceOptions { + options := TraceOptions{ + Context: context.Background(), + } + + for _, fn := range opts { + fn(&options) + } + + return options +} + +type ReadOption func(o *ReadOptions) + +type ReadOptions struct { + Count int + Context context.Context +} + +func ReadWithCount(c int) ReadOption { + return func(o *ReadOptions) { + o.Count = c + } +} + +func NewReadOptions(opts ...ReadOption) ReadOptions { + options := ReadOptions{ + Context: context.Background(), + } + + for _, fn := range opts { + fn(&options) + } + + return options +} diff --git a/telemetry/tracev2/trace.go b/telemetry/tracev2/trace.go new file mode 100644 index 0000000..1c9d843 --- /dev/null +++ b/telemetry/tracev2/trace.go @@ -0,0 +1,15 @@ +package tracev2 + +import ( + "context" +) + +type Trace interface { + Options() TraceOptions + Start(ctx context.Context, name string) (context.Context, string) + AddMetadata(span string, md map[string]string) + UpdateStatus(span string, code uint32, description string) + Finish(span string) + Read(opts ...ReadOption) ([]*SpanData, error) + String() string +} diff --git a/telemetry/tracev2/utils.go b/telemetry/tracev2/utils.go new file mode 100644 index 0000000..356f530 --- /dev/null +++ b/telemetry/tracev2/utils.go @@ -0,0 +1,57 @@ +package tracev2 + +import ( + "context" + "encoding/hex" + + "github.com/w-h-a/pkg/utils/metadatautils" +) + +const ( + TraceParentKey = "traceparent" + SpanParentKey = "spanparent" +) + +func ContextWithTraceParent(ctx context.Context, traceparent [16]byte) (context.Context, error) { + return metadatautils.MergeContext(ctx, map[string]string{ + TraceParentKey: string(traceparent[:]), + }, true), nil +} + +func TraceParentFromContext(ctx context.Context) (traceparent [16]byte, found bool) { + traceId, found := metadatautils.GetContext(ctx, TraceParentKey) + if !found { + return + } + + decoded, err := hex.DecodeString(traceId) + if err == nil { + copy(traceparent[:], decoded) + } else { + copy(traceparent[:], traceId) + } + + return +} + +func ContextWithSpanParent(ctx context.Context, spanparent [8]byte) (context.Context, error) { + return metadatautils.MergeContext(ctx, map[string]string{ + SpanParentKey: string(spanparent[:]), + }, true), nil +} + +func SpanParentFromContext(ctx context.Context) (spanparent [8]byte, found bool) { + spanId, found := metadatautils.GetContext(ctx, SpanParentKey) + if !found { + return + } + + decoded, err := hex.DecodeString(spanId) + if err == nil { + copy(spanparent[:], decoded) + } else { + copy(spanparent[:], spanId) + } + + return +} diff --git a/utils/httputils/http_utils.go b/utils/httputils/http_utils.go index 5dc047e..f5839bc 100644 --- a/utils/httputils/http_utils.go +++ b/utils/httputils/http_utils.go @@ -2,11 +2,14 @@ package httputils import ( "bytes" + "encoding/json" "fmt" "io" "net/http" "strings" "time" + + "github.com/w-h-a/pkg/utils/errorutils" ) func HttpGetNTimes(url string, n int) ([]byte, error) { @@ -63,3 +66,19 @@ func ExtractBody(r io.ReadCloser) ([]byte, error) { return body, nil } + +func ErrResponse(w http.ResponseWriter, err error) { + internal := err.(*errorutils.Error) + Response(w, int(internal.Code), []byte(internal.Error())) +} + +func OkResponse(w http.ResponseWriter, payload interface{}) { + bs, _ := json.Marshal(payload) + Response(w, 200, bs) +} + +func Response(w http.ResponseWriter, code int, bs []byte) { + w.Header().Set("content-type", "application/json") + w.WriteHeader(code) + w.Write(bs) +} diff --git a/telemetry/buffer/memory/memory.go b/utils/memoryutils/memory_utils.go similarity index 56% rename from telemetry/buffer/memory/memory.go rename to utils/memoryutils/memory_utils.go index 90b4ef3..8bc9662 100644 --- a/telemetry/buffer/memory/memory.go +++ b/utils/memoryutils/memory_utils.go @@ -1,28 +1,35 @@ -package memory +package memoryutils import ( "sync" "time" +) - "github.com/w-h-a/pkg/telemetry/buffer" +var ( + defaultSize = 1024 ) -type memoryBuffer struct { - options buffer.BufferOptions +type Buffer struct { + options BufferOptions mtx sync.RWMutex - entries []*buffer.Entry + entries []*Entry +} + +type Entry struct { + Value interface{} + Timestamp time.Time } -func (m *memoryBuffer) Options() buffer.BufferOptions { +func (m *Buffer) Options() BufferOptions { return m.options } -func (m *memoryBuffer) Put(v interface{}) { +func (m *Buffer) Put(v interface{}) { m.mtx.Lock() defer m.mtx.Unlock() // make the entry - entry := &buffer.Entry{ + entry := &Entry{ Value: v, Timestamp: time.Now(), } @@ -37,7 +44,7 @@ func (m *memoryBuffer) Put(v interface{}) { } } -func (m *memoryBuffer) Get(n int) []*buffer.Entry { +func (m *Buffer) Get(n int) []*Entry { m.mtx.RLock() defer m.mtx.RUnlock() @@ -52,17 +59,13 @@ func (m *memoryBuffer) Get(n int) []*buffer.Entry { return m.entries[delta:] } -func (m *memoryBuffer) String() string { - return "memory" -} - -func NewBuffer(opts ...buffer.BufferOption) buffer.Buffer { - options := buffer.NewBufferOptions(opts...) +func NewBuffer(opts ...BufferOption) *Buffer { + options := NewBufferOptions(opts...) - b := &memoryBuffer{ + b := &Buffer{ options: options, mtx: sync.RWMutex{}, - entries: []*buffer.Entry{}, + entries: []*Entry{}, } return b diff --git a/telemetry/buffer/memory/memory_test.go b/utils/memoryutils/memory_utils.test.go similarity index 78% rename from telemetry/buffer/memory/memory_test.go rename to utils/memoryutils/memory_utils.test.go index 9d6c3c0..cf684c4 100644 --- a/telemetry/buffer/memory/memory_test.go +++ b/utils/memoryutils/memory_utils.test.go @@ -1,14 +1,13 @@ -package memory +package memoryutils import ( "testing" "github.com/stretchr/testify/require" - "github.com/w-h-a/pkg/telemetry/buffer" ) func TestMemoryBuffer(t *testing.T) { - b := NewBuffer(buffer.BufferWithSize(10)) + b := NewBuffer(BufferWithSize(10)) b.Put("foo") @@ -17,7 +16,7 @@ func TestMemoryBuffer(t *testing.T) { val := entries[0].Value.(string) require.Equal(t, "foo", val) - b = NewBuffer(buffer.BufferWithSize(10)) + b = NewBuffer(BufferWithSize(10)) for i := 0; i < 10; i++ { b.Put(i) diff --git a/telemetry/buffer/options.go b/utils/memoryutils/options.go similarity index 59% rename from telemetry/buffer/options.go rename to utils/memoryutils/options.go index 74c2cb5..2ab3eee 100644 --- a/telemetry/buffer/options.go +++ b/utils/memoryutils/options.go @@ -1,24 +1,20 @@ -package buffer - -import "context" +package memoryutils type BufferOption func(o *BufferOptions) type BufferOptions struct { - Size int - Context context.Context + Size int } -func BufferWithSize(s int) BufferOption { +func BufferWithSize(size int) BufferOption { return func(o *BufferOptions) { - o.Size = s + o.Size = size } } func NewBufferOptions(opts ...BufferOption) BufferOptions { options := BufferOptions{ - Context: context.Background(), - Size: defaultSize, + Size: defaultSize, } for _, fn := range opts {