Skip to content

Commit d2c092a

Browse files
committed
Merge branch 'main' into dev-refreshable-http-getters
# Conflicts: # CHANGELOG.md
2 parents 37e9566 + c69a662 commit d2c092a

File tree

32 files changed

+2601
-323
lines changed

32 files changed

+2601
-323
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
### Features
66

7+
* implement first prototype of ng logprep runner
78
* make http getters periodically refresh if configured in file path defined by environment variable `LOGPREP_GETTER_CONFIG`.
89
* cache http getter results by utilising the etag header.
910
* add per-target (i.e. `localhost:1234/foo`) callbacks to http getters that are called when getters are refreshed with new data.

doc/source/user_manual/execution.rst

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,34 @@ To get help on the different parameters use:
1515
1616
logprep --help
1717
18+
Running logprep-ng
19+
==================
20+
21+
Logprep-ng is the next generation of logprep and is still in an experimental state.
22+
To execute logprep-ng, use the following command:
23+
24+
.. code-block:: bash
25+
26+
logprep-ng run $CONFIG
27+
28+
Where :code:`$CONFIG` is the path or a url to a configuration file (see :ref:`configuration`).
29+
This command starts the logprep-ng processing pipeline with the specified configuration and options.
30+
31+
Common usage examples:
32+
33+
.. code-block:: bash
34+
35+
# Run with default configuration
36+
logprep-ng run
37+
38+
# Run with custom configuration file
39+
logprep-ng run /path/to/config.yml
40+
41+
Available options can be viewed using:
42+
43+
.. code-block:: bash
44+
45+
logprep-ng run --help
1846
1947
Event Generation
2048
----------------
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
version: 2
2+
process_count: 4
3+
config_refresh_interval: 5
4+
profile_pipelines: false
5+
restart_count: 3
6+
logger:
7+
level: INFO
8+
loggers:
9+
uvicorn:
10+
level: INFO
11+
uvicorn.access:
12+
level: INFO
13+
uvicorn.error:
14+
level: INFO
15+
KafkaOutput:
16+
level: ERROR
17+
18+
metrics:
19+
enabled: true
20+
port: 8003
21+
uvicorn_config:
22+
host: 0.0.0.0
23+
access_log: true
24+
server_header: false
25+
date_header: false
26+
workers: 1
27+
ws: none
28+
interface: asgi3
29+
backlog: 16384
30+
timeout_keep_alive: 65
31+
input:
32+
httpinput:
33+
type: ng_http_input
34+
message_backlog_size: 1500000
35+
collect_meta: true
36+
metafield_name: "@metadata"
37+
uvicorn_config:
38+
host: 0.0.0.0
39+
port: 9000
40+
workers: 1
41+
access_log: true
42+
server_header: false
43+
date_header: false
44+
ws: none
45+
interface: asgi3
46+
backlog: 16384
47+
timeout_keep_alive: 65
48+
endpoints:
49+
/auth-json: json
50+
/json: json
51+
/lab/123/(ABC|DEF)/pl.*: plaintext
52+
/lab/123/ABC/auditlog: jsonl
53+
54+
output:
55+
kafka:
56+
type: ng_confluentkafka_output
57+
topic: consumer
58+
flush_timeout: 300
59+
send_timeout: 0
60+
kafka_config:
61+
bootstrap.servers: 127.0.0.1:9092
62+
compression.type: none
63+
statistics.interval.ms: "60000"
64+
queue.buffering.max.messages: "100000000"
65+
queue.buffering.max.kbytes: "1048576"
66+
queue.buffering.max.ms: "10000"
67+
batch.size: "1000000"
68+
request.required.acks: "-1"
69+
70+
error_output:
71+
kafka:
72+
type: ng_confluentkafka_output
73+
topic: errors
74+
flush_timeout: 300
75+
send_timeout: 0
76+
kafka_config:
77+
bootstrap.servers: 127.0.0.1:9092
78+
compression.type: none
79+
statistics.interval.ms: "60000"
80+
queue.buffering.max.messages: "100000000"
81+
queue.buffering.max.kbytes: "1048576"
82+
queue.buffering.max.ms: "10000"
83+
batch.size: "1000000"
84+
request.required.acks: "-1"
Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
version: 2
2+
process_count: 500
3+
timeout: 5.0
4+
restart_count: 2
5+
config_refresh_interval: 300
6+
error_backlog_size: 1500000
7+
logger:
8+
level: INFO
9+
format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s"
10+
datefmt: "%Y-%m-%d %H:%M:%S"
11+
loggers:
12+
"py.warnings": { "level": "ERROR" }
13+
"Runner": { "level": "INFO" }
14+
"Processor": { "level": "ERROR" }
15+
"Exporter": { "level": "ERROR" }
16+
"uvicorn": { "level": "ERROR" }
17+
"uvicorn.access": { "level": "ERROR" }
18+
"OpenSearchOutput": { "level": "DEBUG" }
19+
"KafkaOutput": { "level": "ERROR" }
20+
metrics:
21+
enabled: true
22+
port: 8001
23+
24+
pipeline:
25+
- labelername:
26+
type: ng_labeler
27+
schema: examples/exampledata/rules/labeler/schema.json
28+
include_parent_labels: true
29+
rules:
30+
- examples/exampledata/rules/labeler/rules
31+
32+
- dissector:
33+
type: ng_dissector
34+
rules:
35+
- examples/exampledata/rules/dissector/rules
36+
37+
- dropper:
38+
type: ng_dropper
39+
rules:
40+
- examples/exampledata/rules/dropper/rules
41+
- filter: "test_dropper"
42+
dropper:
43+
drop:
44+
- drop_me
45+
description: "..."
46+
47+
- pre_detector:
48+
type: ng_pre_detector
49+
rules:
50+
- examples/exampledata/rules/pre_detector/rules
51+
outputs:
52+
- opensearch: sre
53+
tree_config: examples/exampledata/rules/pre_detector/tree_config.json
54+
alert_ip_list_path: examples/exampledata/rules/pre_detector/alert_ips.yml
55+
56+
- amides:
57+
type: ng_amides
58+
rules:
59+
- examples/exampledata/rules/amides/rules
60+
models_path: examples/exampledata/models/model.zip
61+
num_rule_attributions: 10
62+
max_cache_entries: 1000000
63+
decision_threshold: 0.32
64+
65+
- pseudonymizer:
66+
type: ng_pseudonymizer
67+
pubkey_analyst: examples/exampledata/rules/pseudonymizer/example_analyst_pub.pem
68+
pubkey_depseudo: examples/exampledata/rules/pseudonymizer/example_depseudo_pub.pem
69+
regex_mapping: examples/exampledata/rules/pseudonymizer/regex_mapping.yml
70+
hash_salt: a_secret_tasty_ingredient
71+
outputs:
72+
- opensearch: pseudonyms
73+
rules:
74+
- examples/exampledata/rules/pseudonymizer/rules/
75+
max_cached_pseudonyms: 1000000
76+
77+
- calculator:
78+
type: ng_calculator
79+
rules:
80+
- filter: "test_label: execute"
81+
calculator:
82+
target_field: "calculation"
83+
calc: "1 + 1"
84+
85+
input:
86+
kafka:
87+
type: ng_confluentkafka_input
88+
topic: consumer
89+
kafka_config:
90+
bootstrap.servers: 127.0.0.1:9092
91+
group.id: cgroup3
92+
enable.auto.commit: "true"
93+
auto.commit.interval.ms: "10000"
94+
enable.auto.offset.store: "false"
95+
queued.min.messages: "100000"
96+
queued.max.messages.kbytes: "65536"
97+
statistics.interval.ms: "60000"
98+
preprocessing:
99+
version_info_target_field: Logprep_version_info
100+
log_arrival_time_target_field: event.ingested
101+
hmac:
102+
target: <RAW_MSG>
103+
key: "thisisasecureandrandomkey"
104+
output_field: Full_event
105+
106+
output:
107+
opensearch:
108+
type: ng_opensearch_output
109+
hosts:
110+
- 127.0.0.1:9200
111+
default_index: processed
112+
default_op_type: create
113+
message_backlog_size: 7000
114+
timeout: 10000
115+
flush_timeout: 60
116+
user: admin
117+
secret: admin
118+
desired_cluster_status: ["green", "yellow"]
119+
chunk_size: 25
120+
kafka:
121+
type: ng_confluentkafka_output
122+
default: false
123+
topic: producer
124+
flush_timeout: 300
125+
kafka_config:
126+
bootstrap.servers: 127.0.0.1:9092
127+
statistics.interval.ms: "60000"
128+
129+
error_output:
130+
kafka_error_output:
131+
type: ng_confluentkafka_output
132+
topic: errors
133+
flush_timeout: 300
134+
send_timeout: 0
135+
kafka_config:
136+
bootstrap.servers: 127.0.0.1:9092
137+
compression.type: none
138+
statistics.interval.ms: "60000"
139+
queue.buffering.max.messages: "10"
140+
queue.buffering.max.kbytes: "1024"
141+
queue.buffering.max.ms: "1000"
142+
batch.size: "100"
143+
request.required.acks: "-1"

examples/exampledata/config/pipeline.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ restart_count: 2
55
config_refresh_interval: 5
66
error_backlog_size: 1500000
77
logger:
8-
level: INFO
8+
level: DEBUG
99
format: "%(asctime)-15s %(hostname)-5s %(name)-10s %(levelname)-8s: %(message)s"
1010
datefmt: "%Y-%m-%d %H:%M:%S"
1111
loggers:

logprep/framework/pipeline.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99

1010
import itertools
1111
import logging
12-
import logging.handlers
1312
import multiprocessing
1413

1514
# pylint: disable=logging-fstring-interpolation

logprep/framework/pipeline_manager.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,7 @@
22

33
# pylint: disable=logging-fstring-interpolation
44
import logging
5-
import logging.handlers
65
import multiprocessing
7-
import multiprocessing.managers
86
import multiprocessing.queues
97
import random
108
import sys
@@ -288,7 +286,7 @@ def restart_failed_pipeline(self):
288286

289287
if not failed_pipelines:
290288
self.restart_count = 0
291-
self.restart_timeout_ms: int = random.randint(100, 1000)
289+
self.restart_timeout_ms = random.randint(100, 1000)
292290
return
293291

294292
for index, failed_pipeline in failed_pipelines:

logprep/ng/abc/input.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import base64
66
import hashlib
77
import json
8+
import logging
89
import os
910
import zlib
1011
from abc import abstractmethod
@@ -29,6 +30,8 @@
2930
from logprep.util.time import UTC, TimeParser, TimeParserException
3031
from logprep.util.validators import dict_structure_validator
3132

33+
logger = logging.getLogger("Input")
34+
3235

3336
class InputError(LogprepException):
3437
"""Base class for Input related exceptions."""
@@ -154,8 +157,13 @@ def __next__(self) -> LogEvent | None:
154157
LogEvent | None
155158
The next event retrieved from the underlying data source.
156159
"""
157-
158-
return self.input_connector.get_next(timeout=self.timeout)
160+
event = self.input_connector.get_next(timeout=self.timeout)
161+
logger.debug(
162+
"InputIterator fetching next event with timeout %s, is None: %s",
163+
self.timeout,
164+
event is None,
165+
)
166+
return event
159167

160168

161169
class Input(Connector):
@@ -423,9 +431,7 @@ def get_next(self, timeout: float) -> LogEvent | None:
423431
input : LogEvent, None
424432
Input log data.
425433
"""
426-
427434
self.acknowledge()
428-
429435
event: dict | None = None
430436
raw_event: bytearray | None = None
431437
metadata: dict | None = None

logprep/ng/connector/dummy/input.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ def _get_event(self, timeout: float) -> tuple:
5252
if not self._config.repeat_documents:
5353
raise SourceDisconnectedWarning(self, "no documents left")
5454
del self.__dict__["_documents"]
55-
5655
document = self._documents.pop(0)
5756

5857
if (document.__class__ == type) and issubclass(document, Exception):

logprep/ng/connector/dummy/output.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
type: dummy_output
1818
"""
1919

20+
import logging
2021
from typing import TYPE_CHECKING, List
2122

2223
from attr import define, field
@@ -29,6 +30,8 @@
2930
if TYPE_CHECKING:
3031
from logprep.abc.connector import Connector # pragma: no cover
3132

33+
logger = logging.getLogger("DummyOutput")
34+
3235

3336
class DummyOutput(Output):
3437
"""
@@ -54,6 +57,8 @@ class Config(Output.Config):
5457
for testing purposes. If an exception is raised, the exception is handled
5558
by the output decorator.
5659
"""
60+
reset_on_flush: bool = field(default=False)
61+
"""If set to True, the stored events will be cleared when flush() is called."""
5762

5863
events: list[LogEvent]
5964
failed_events: list[LogEvent]
@@ -106,3 +111,6 @@ def shut_down(self):
106111

107112
def flush(self):
108113
"""Flush not implemented because it has not backlog."""
114+
if self._config.reset_on_flush:
115+
self.events.clear()
116+
logger.debug("DummyOutput flushed %s events", len(self.events))

0 commit comments

Comments
 (0)