Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Longw/metadata containerlogv2 kubernetes #1139

Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9833794
add metadata feilds including labels, annotations, uid and image
wanlonghenry Nov 17, 2023
c7abdde
rename to podUid and add nil check
wanlonghenry Nov 17, 2023
9dd9595
rename to podUid and add nil check
wanlonghenry Nov 17, 2023
a1d659c
set the include_fields if customer set it otherwise go with default i…
wanlonghenry Nov 27, 2023
822cdc3
add end and puts log
wanlonghenry Nov 28, 2023
29f256b
Merge branch 'develop/fluent-bit-kubernetes-plugin' of https://github…
wanlonghenry Nov 28, 2023
8b5aff8
add workload to testing clusters
wanlonghenry Nov 28, 2023
9702ae3
Merge branch 'develop/fluent-bit-kubernetes-plugin' of https://github…
wanlonghenry Nov 29, 2023
03074d5
containerlogv2 metadata backend change with feature flag on
wanlonghenry Dec 4, 2023
dec7dd3
include list change and data filter
wanlonghenry Dec 7, 2023
ffc4cf1
update script and fix tsl
wanlonghenry Dec 12, 2023
92e13ea
adjust the marshal and log the output
wanlonghenry Dec 14, 2023
376a94e
update trivy ignore
wanlonghenry Dec 14, 2023
a098502
update debug
wanlonghenry Dec 15, 2023
a09288c
add more debug info
wanlonghenry Dec 15, 2023
03d8f60
update ignore list
wanlonghenry Dec 15, 2023
3e2627c
address the list split and logs
wanlonghenry Dec 15, 2023
dd4b716
logs
wanlonghenry Dec 16, 2023
db4fd77
refactor code
wanlonghenry Dec 18, 2023
1fd45a9
update include list format
wanlonghenry Dec 18, 2023
a5a3edc
clean up logs
wanlonghenry Dec 18, 2023
4d1040e
add convert bytes to string
wanlonghenry Dec 18, 2023
cc0216d
clean up comment
wanlonghenry Dec 18, 2023
d8cdee4
address comments add check and scenario for empty list
wanlonghenry Dec 18, 2023
e02ac9b
remove ADX support
wanlonghenry Dec 18, 2023
15402d8
rename to KubernetesMetadataEnabled
wanlonghenry Dec 19, 2023
f09d7e5
Merge branch 'develop/fluent-bit-kubernetes-plugin' into longw/metada…
wanlonghenry Dec 19, 2023
9c604a9
go fmt for better format
wanlonghenry Dec 19, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion .trivyignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,15 @@ CVE-2023-39325 #same as CVE-2023-44487
CVE-2023-3978
CVE-2023-44487 #false positive according to Mariner team
GHSA-jq35-85cj-fj4p
CVE-2019-3826

#telegraf HIGH
GHSA-m425-mq94-257g
CVE-2023-46129
CVE-2023-47090

# ruby HIGH
#ruby HIGH
CVE-2017-10784

#openssl
CVE-2023-5678
5 changes: 4 additions & 1 deletion build/common/installer/scripts/tomlparser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,10 @@ def populateSettingValuesFromConfigMap(parsedConfig)
@logEnableKubernetesMetadata = parsedConfig[:log_collection_settings][:metadata_collection][:enabled]
if !parsedConfig[:log_collection_settings][:metadata_collection][:include_fields].nil?
puts "config::Using config map setting for kubernetes metadata include fields"
@logKubernetesMetadataiIncludeFields = parsedConfig[:log_collection_settings][:metadata_collection][:include_fields]
include_fields = parsedConfig[:log_collection_settings][:metadata_collection][:include_fields]
if include_fields.kind_of?(Array)
@logKubernetesMetadataiIncludeFields = include_fields.join(",")
end
end
end
end
Expand Down
1 change: 1 addition & 0 deletions build/linux/installer/conf/fluent-bit.conf
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,6 @@
#${KubernetesFilterEnabled} Use_Kubelet true
#${KubernetesFilterEnabled} Kubelet_Port 10250
#${KubernetesFilterEnabled} Kubelet_Host ${NODE_IP}
#${KubernetesFilterEnabled} tls.verify Off
#${KubernetesFilterEnabled} K8S-Logging.Exclude On
#${KubernetesFilterEnabled} Kube_Tag_Prefix oms.container.log.la.var.log.containers.
111 changes: 110 additions & 1 deletion source/plugins/go/src/oms.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,10 @@ var (
ContainerLogSchemaV2 bool
// container log schema version from config map
ContainerLogV2ConfigMap bool
// Kubernetes Metadata enabled through configmap flag
KubernetesMetadataConfigMap bool
wanlonghenry marked this conversation as resolved.
Show resolved Hide resolved
// Kubernetes Metadata enabled exclude list
KubernetesMetadataIncludeList []string
//ADX Cluster URI
AdxClusterUri string
// ADX clientID
Expand Down Expand Up @@ -272,6 +276,8 @@ type DataItemLAv2 struct {
PodNamespace string `json:"PodNamespace"`
LogMessage string `json:"LogMessage"`
LogSource string `json:"LogSource"`
KubernetesMetadata string `json:"KubernetesMetadata"`
wanlonghenry marked this conversation as resolved.
Show resolved Hide resolved
//LogLevel string `json:"LogLevel"`
//PodLabels string `json:"PodLabels"`
}

Expand Down Expand Up @@ -1105,6 +1111,62 @@ func UpdateNumTelegrafMetricsSentTelemetry(numMetricsSent int, numSendErrors int
ContainerLogTelemetryMutex.Unlock()
}

func processIncludes(kubernetesMetadataMap map[string]interface{}, includesList []string) map[string]interface{} {
includedMetadata := make(map[string]interface{})
for _, include := range includesList {
switch include {
case "podUid":
if val, ok := kubernetesMetadataMap["pod_id"]; ok {
includedMetadata["podUid"] = val
}
case "podLabels":
if val, ok := kubernetesMetadataMap["labels"]; ok {
includedMetadata["podLabels"] = val
}
case "podAnnotations":
if val, ok := kubernetesMetadataMap["annotations"]; ok {
includedMetadata["podAnnotations"] = val
}
case "image":
if hash, ok := kubernetesMetadataMap["container_hash"]; ok {
includedMetadata["image_hash"] = hash
}
if image, ok := kubernetesMetadataMap["container_image"]; ok {
includedMetadata["image"] = image
}
}
}
return includedMetadata
}

func convertKubernetesMetadata(kubernetesMetadataJson interface{}) (map[string]interface{}, error) {
m, ok := kubernetesMetadataJson.(map[interface{}]interface{})
if !ok {
return nil, fmt.Errorf("type assertion to map[interface{}]interface{} failed")
}

strMap := make(map[string]interface{})
for k, v := range m {
strKey, ok := k.(string)
if !ok {
continue
}
switch val := v.(type) {
case map[interface{}]interface{}:
convertedMap, err := convertKubernetesMetadata(val)
if err != nil {
return nil, err
}
strMap[strKey] = convertedMap
case []byte:
strMap[strKey] = string(val)
default:
strMap[strKey] = val
}
}
return strMap, nil
}

// PostDataHelper sends data to the ODS endpoint or oneagent or ADX
func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
start := time.Now()
Expand Down Expand Up @@ -1135,6 +1197,27 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
for _, record := range tailPluginRecords {
containerID, k8sNamespace, k8sPodName, containerName := GetContainerIDK8sNamespacePodNameFromFileName(ToString(record["filepath"]))
logEntrySource := ToString(record["stream"])
kubernetesMetadata := ""
if KubernetesMetadataConfigMap {
if kubernetesMetadataJson, exists := record["kubernetes"]; exists {
kubernetesMetadataMap, err := convertKubernetesMetadata(kubernetesMetadataJson)
if err != nil {
Log(fmt.Sprintf("Error convertKubernetesMetadata: %v", err))
}
includedMetadata := processIncludes(kubernetesMetadataMap, KubernetesMetadataIncludeList)
kubernetesMetadataBytes, err := json.Marshal(includedMetadata)
if err != nil {
message := fmt.Sprintf("Error while Marshalling kubernetesMetadataBytes to json bytes: %s", err.Error())
Log(message)
SendException(message)
}
kubernetesMetadata = string(kubernetesMetadataBytes)
} else {
message := fmt.Sprintf("Error while fetching kubernetesMetadataJson")
Log(message)
continue
}
}

if strings.EqualFold(logEntrySource, "stdout") {
if containerID == "" || containsKey(StdoutIgnoreNsSet, k8sNamespace) {
Expand All @@ -1160,6 +1243,8 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
}

logEntry := ToString(record["log"])
//filter loglevel and define here
//logLevel := ToString(record["LogLevel"])
logEntryTimeStamp := ToString(record["time"])

if !ContainerLogV2ConfigMap && IsAADMSIAuthMode == true && !IsGenevaLogsIntegrationEnabled {
Expand All @@ -1179,7 +1264,18 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
}

//ADX Schema & LAv2 schema are almost the same (except resourceId)
if ContainerLogSchemaV2 == true || ContainerLogsRouteADX == true {
if ContainerLogSchemaV2 == true {
stringMap["Computer"] = Computer
stringMap["ContainerId"] = containerID
stringMap["ContainerName"] = containerName
stringMap["PodName"] = k8sPodName
stringMap["PodNamespace"] = k8sNamespace
stringMap["LogMessage"] = logEntry
stringMap["LogSource"] = logEntrySource
stringMap["TimeGenerated"] = logEntryTimeStamp
stringMap["KubernetesMetadata"] = kubernetesMetadata
//stringMap["LogLevel"] = logLevel
} else if ContainerLogsRouteADX == true {
stringMap["Computer"] = Computer
stringMap["ContainerId"] = containerID
stringMap["ContainerName"] = containerName
Expand Down Expand Up @@ -1251,6 +1347,8 @@ func PostDataHelper(tailPluginRecords []map[interface{}]interface{}) int {
PodNamespace: stringMap["PodNamespace"],
LogMessage: stringMap["LogMessage"],
LogSource: stringMap["LogSource"],
KubernetesMetadata: stringMap["KubernetesMetadata"],
wanlonghenry marked this conversation as resolved.
Show resolved Hide resolved
//LogLevel: stringMap["LogLevel"],
}
//ODS-v2 schema
dataItemsLAv2 = append(dataItemsLAv2, dataItemLAv2)
Expand Down Expand Up @@ -1870,6 +1968,17 @@ func InitializePlugin(pluginConfPath string, agentVersion string) {
ContainerLogSchemaV2 = false //default is v1 schema
ContainerLogV2ConfigMap = (strings.Compare(ContainerLogSchemaVersion, ContainerLogV2SchemaVersion) == 0)

KubernetesMetadataConfigMap = false
KubernetesMetadataConfigMap = (strings.Compare(strings.ToLower(os.Getenv("AZMON_KUBERNETES_METADATA_ENABLED")), "true") == 0)
metadataIncludeList := os.Getenv("AZMON_KUBERNETES_METADATA_INCLUDES_FIELDS")
Log(fmt.Sprintf("KubernetesMetadataIncludeList from configmap: %+v\n", metadataIncludeList))
KubernetesMetadataIncludeList = []string{"podLabels", "podAnnotations", "podUid", "image"}
if KubernetesMetadataConfigMap && len(metadataIncludeList) > 0 {
KubernetesMetadataIncludeList = strings.Split(metadataIncludeList, ",")
} else if KubernetesMetadataConfigMap {
pfrcks marked this conversation as resolved.
Show resolved Hide resolved
KubernetesMetadataIncludeList = []string{}
}

if ContainerLogV2ConfigMap && ContainerLogsRouteADX != true {
ContainerLogSchemaV2 = true
Log("Container logs schema=%s", ContainerLogV2SchemaVersion)
Expand Down
29 changes: 29 additions & 0 deletions test/scenario/containerlogv2_linux-resource-app.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: linux-resource-intensive-app
namespace: kube-system
spec:
replicas: 10
selector:
matchLabels:
app: linux-resource-intensive-app
template:
metadata:
labels:
app: linux-resource-intensive-app
spec:
nodeSelector:
kubernetes.io/os: linux
containers:
- name: linux-resource-intensive-app
image: ubuntu
command: ["/bin/bash", "-c"]
args:
- |
while true; do
timestamp=$(date "+%Y/%m/%d %H:%M:%S.%3N")
logEntry=$(printf 'Test-%.0s' {1..200})
echo "$timestamp $logEntry"
sleep 0.001 # Sleep for 1 millisecond
done
37 changes: 37 additions & 0 deletions test/scenario/containerlogv2_windows-resource-app.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: windows-resource-intensive-app
namespace: kube-system
spec:
replicas: 10 # 50
selector:
matchLabels:
app: windows-resource-intensive-app
template:
metadata:
labels:
app: windows-resource-intensive-app
# more labels
spec:
nodeSelector:
kubernetes.io/os: windows
containers:
- name: windows-resource-intensive-app
image: mcr.microsoft.com/windows/servercore:ltsc2022
command: ["powershell", "-c"]
args:
- |
while ($true) {
$timestamp = Get-Date -Format "yyyy/MM/dd HH:mm:ss.fff"
$logEntry = 'Test-' * 200 # Adjusted to create a log entry of about 1000 bytes
Write-Output "$timestamp $logEntry"
Start-Sleep -Milliseconds 1 # Generates approximately 1000 logs per second
}

## thousand logs per second
## 10 pods in certain nodes combining logs each generate some logs scale performance podlevel
## multiple 100 pods

## nodes level testing both windows linux if api then proabbly node level kubelet
## create a branch for current code branch for testing current stage