Ingest data is captured or transferred into the Hadoop cluster, where they are transformed and loaded into solution data stores.
- oni-setup
- kafka-python
- oni-nfdump
- tshark
- watchdog
- spark-streaming-kafka-0-8-assembly_2.11
- Ingest user with sudo privileges (i.e. oni). This user will execute all the processes in the Ingest Framework also this user needs to have access to hdfs solution path (i.e. /user/oni/).
Adding Kafka Service:
Ingest framework needs Kafka to work in real-time streaming. Add Kafka service using Cloudera Manager. If you are using a Cloudera Manager version < 5.4.1 you will need to add the kafka parcel manually.
Ingest module uses a default configuration for the message size (999999 bytes), if you modify this size in the ingest configuration file you will need to modify the following configuration properties in kafka:
- message.max.bytes
- replica.fetch.max.bytes
Download the following jar file: spark-streaming-kafka-0-8-assembly_2.11. This jar adds support for Spark Streaming + Kafka and needs to be downloaded on the following path : oni-ingest/oni (with the same name)
Required Roles
The following roles are required in all the nodes where the Ingest Framework will be running.
- HDFS gateway (i.e. Edge Server)
- Kafka Broker
Get the code:
git clone https://github.com/Open-Network-Insight/oni-ingest.git
Ingest Configuration:
The file ingest_conf.json contains all the required configuration to start the ingest module
- dbname: Name of HIVE database where all the ingested data will be stored in avro-parquet format.
- hdfs_app_path: Application path in HDFS where the pipelines will be stored (i.e /user/application_user/).
- kafka: Kafka and Zookeeper server information required to create/listen topics and partitions.
- pipelines: In this section you can add multiple configurations for either the same pipeline or different pipelines. The configuration name must be lowercase without spaces (i.e. flow_internals).
Configuration example:
"dbname" : "database name",
"hdfs_app_path" : "hdfs application path",
"kafka":{
"kafka_server":"kafka ip",
"kafka_port":"kafka port",
"zookeper_server":"zk ip",
"zookeper_port":"zk port",
"message_size":999999
},
"pipelines":{
"flow_internals":{
"type":"flow",
"collector_path":"/path_to_flow_collector",
"local_staging":"/tmp/",
"process_opt":""
},
"flow_externals":{
"type":"flow",
"collector_path":"/path_to_flow_collector",
"local_staging":"/tmp/",
"process_opt":""
},
"dns_server_1":{
"type":"dns",
"collector_path":"/path_to_dns_collector",
"local_staging":"/tmp/",
"pkt_num":"650000",
"pcap_split_staging":"/tmp",
"process_opt":"-E separator=, -E header=y -E occurrence=f -T fields -e frame.time -e frame.time_epoch -e frame.len -e ip.src -e ip.dst -e dns.resp.name -e dns.resp.type -e dns.resp.class -e dns.flags.rcode -e dns.a 'dns.flags.response == 1'"
}
Starting the Ingest
Running in a Standalone Mode:
bash start_standalone_ingest.sh "pipeline_configuration" "num of workers"
Following the previous configuration example starting ingest module in a stand alone mode will look like:
bash start_standalone_ingest.sh flow_internals 4
Running in a Cluster Mode:
Running Master: Master needs to be run in the same server where the collector path is.
python master_collector.py -t "pipeline_configuration" -w "number of workers"
Running Workers: Worker needs to be executed in a server where the required processing program installed (i.e. nfdump), also the worker needs to be identified with a specific id, this id needs to start with 0.
example:
- worker_0, id = 0
- worker_1 , id = 1
This "id" is required to attach the worker with the kafka partition.
python worker.py -t "pipeline_configuration" -i "id of the worker (starts with 0)" --topic "my_topic"