Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for JQ queries and JSON output #404

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions awslogs/bin.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,30 @@ def add_date_range_arguments(parser, default_start="5m"):
help="JMESPath query to use in filtering the response data",
)

get_parser.add_argument(
"-j",
"--jq",
action="store",
dest="jq",
help="jq query to use in filtering the response data",
)

get_parser.add_argument(
"--jq-all",
action="store_true",
dest="jq_all",
default=False,
help="Use all results from the jq query (by default only the first result is used)",
)

get_parser.add_argument(
"--json",
action="store_true",
dest="json",
default=False,
help="Output logs in JSON format for processing by other tools",
)

# groups
groups_parser = subparsers.add_parser("groups", description="List groups")
groups_parser.set_defaults(func="list_groups")
Expand All @@ -202,6 +226,10 @@ def add_date_range_arguments(parser, default_start="5m"):
# Parse input
options, _ = parser.parse_known_args(argv)

if getattr(options, "query", None) and getattr(options, "jq", None):
sys.stderr.write("Cannot use both --query and --jq at the same time\n")
return 1

try:
logs = AWSLogs(**vars(options))
if not hasattr(options, "func"):
Expand Down
134 changes: 95 additions & 39 deletions awslogs/core.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
import errno
import os
import re
import sys
import os
import time
import errno
from datetime import datetime, timedelta
from collections import deque
from datetime import datetime, timedelta
from typing import TypeAlias

import boto3
import botocore
from botocore.compat import json, total_seconds

import jmespath

from termcolor import colored
import jq
from botocore.compat import json, total_seconds
from dateutil.parser import parse
from dateutil.tz import tzutc
from termcolor import colored

from . import exceptions

Expand All @@ -25,12 +25,12 @@ def milis2iso(milis):


def boto3_client(
aws_profile,
aws_access_key_id,
aws_secret_access_key,
aws_session_token,
aws_region,
aws_endpoint_url,
aws_profile,
aws_access_key_id,
aws_secret_access_key,
aws_session_token,
aws_region,
aws_endpoint_url,
):
core_session = botocore.session.get_session()
core_session.set_config_variable("profile", aws_profile)
Expand All @@ -52,8 +52,26 @@ def boto3_client(
)


class AWSLogs(object):
# noinspection PyProtectedMember
JQProgram: TypeAlias = jq._Program
# noinspection PyProtectedMember
JQProgramWithInput: TypeAlias = jq._ProgramWithInput


class JQAdapter:
# noinspection PyShadowingBuiltins
def __init__(self, jq_query: JQProgram, all: bool = False):
self.jq = jq_query
self.all = all

def search(self, data):
j: JQProgramWithInput = self.jq.input_value(data)
if self.all:
return j.all()
return j.first()


class AWSLogs(object):
ACTIVE = 1
EXHAUSTED = 2
WATCH_SLEEP = 2
Expand Down Expand Up @@ -82,8 +100,14 @@ def __init__(self, **kwargs):
self.start = self.parse_datetime(kwargs.get("start"))
self.end = self.parse_datetime(kwargs.get("end"))
self.query = kwargs.get("query")
self.query_expression = None
self.jq = kwargs.get("jq")
self.jq_all = kwargs.get("jq_all")
if self.query is not None:
self.query_expression = jmespath.compile(self.query)
elif self.jq is not None:
self.query_expression = JQAdapter(jq.compile(self.jq), all=self.jq_all)
self.json_out = kwargs.get("json")
self.log_group_prefix = kwargs.get("log_group_prefix")
self.client = boto3_client(
self.aws_profile,
Expand Down Expand Up @@ -178,32 +202,64 @@ def consumer():
else:
return

output = []
if self.output_group_enabled:
output.append(
self.color(
self.log_group_name.ljust(group_length, " "), "green"
if self.json_out:
message = event["message"]
if message[0] == "{":
try:
message = json.loads(event["message"])
except json.JSONDecodeError:
pass
if self.query_expression is not None and not isinstance(message, str):
# noinspection PyBroadException
try:
message = self.query_expression.search(message)
except Exception as _:
pass

metadata = {
"log_group": self.log_group_name,
"log_stream": event["logStreamName"],
"timestamp": event["timestamp"],
"ingestion_time": event["ingestionTime"],
}

output = {
"awslogs": metadata,
}
if isinstance(message, dict):
output.update(message)
else:
output["message"] = message

print(json.dumps(output))

else:
output = []
if self.output_group_enabled:
output.append(
self.color(
self.log_group_name.ljust(group_length, " "), "green"
)
)
)
if self.output_stream_enabled:
output.append(
self.color(
event["logStreamName"].ljust(max_stream_length, " "), "cyan"
if self.output_stream_enabled:
output.append(
self.color(
event["logStreamName"].ljust(max_stream_length, " "), "cyan"
)
)
)
if self.output_timestamp_enabled:
output.append(self.color(milis2iso(event["timestamp"]), "yellow"))
if self.output_ingestion_time_enabled:
output.append(self.color(milis2iso(event["ingestionTime"]), "blue"))

message = event["message"]
if self.query is not None and message[0] == "{":
parsed = json.loads(event["message"])
message = self.query_expression.search(parsed)
if not isinstance(message, str):
message = json.dumps(message)
output.append(message.rstrip())
print(" ".join(output))
if self.output_timestamp_enabled:
output.append(self.color(milis2iso(event["timestamp"]), "yellow"))
if self.output_ingestion_time_enabled:
output.append(self.color(milis2iso(event["ingestionTime"]), "blue"))

message = event["message"]
if self.query_expression is not None and message[0] == "{":
parsed = json.loads(event["message"])
message = self.query_expression.search(parsed)
if not isinstance(message, str):
message = json.dumps(message)
output.append(message.rstrip())
print(" ".join(output))

try:
sys.stdout.flush()
Expand Down Expand Up @@ -256,7 +312,7 @@ def get_streams(self, log_group_name=None):
# no firstEventTimestamp.
yield stream["logStreamName"]
elif max(stream["firstEventTimestamp"], window_start) <= min(
stream["lastIngestionTime"], window_end
stream["lastIngestionTime"], window_end
):
yield stream["logStreamName"]

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
"jmespath>=1.0.1",
"termcolor>=2.4.0",
"python-dateutil>=2.9.0",
"jq>=1.8.0",
]


Expand Down
33 changes: 32 additions & 1 deletion tests/test_it.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ class TestAWSLogsDatetimeParse(unittest.TestCase):
@patch("awslogs.core.boto3_client")
@patch("awslogs.core.datetime")
def test_parse_datetime(self, datetime_mock, botoclient):

awslogs = AWSLogs()
datetime_mock.utcnow.return_value = datetime(2015, 1, 1, 3, 0, 0, 0)
datetime_mock.return_value = datetime(1970, 1, 1)
Expand Down Expand Up @@ -380,6 +379,38 @@ def test_main_get_query(self, mock_stdout, botoclient):
self.assertEqual(output, expected)
assert exit_code == 0

@patch("awslogs.core.boto3_client")
@patch("sys.stdout", new_callable=StringIO)
@patch("os.isatty", lambda fd: True)
@patch("sys.stdout.isatty", lambda: True)
def test_main_get_jq(self, mock_stdout, botoclient):
self.set_json_logs(botoclient)
exit_code = main("awslogs get AAA DDD --jq .foo --color=always".split())
output = mock_stdout.getvalue()
expected = (
"\x1b[32mAAA\x1b[0m \x1b[36mDDD\x1b[0m bar\n"
'\x1b[32mAAA\x1b[0m \x1b[36mEEE\x1b[0m {"bar": "baz"}\n'
"\x1b[32mAAA\x1b[0m \x1b[36mDDD\x1b[0m Hello 3 👎\n"
)
self.assertEqual(output, expected)
assert exit_code == 0

@patch("awslogs.core.boto3_client")
@patch("sys.stdout", new_callable=StringIO)
@patch("os.isatty", lambda fd: True)
@patch("sys.stdout.isatty", lambda: True)
def test_main_get_json(self, mock_stdout, botoclient):
self.set_json_logs(botoclient)
exit_code = main("awslogs get AAA DDD --json".split())
output = mock_stdout.getvalue()
expected = (
'{"awslogs": {"log_group": "AAA", "log_stream": "DDD", "timestamp": 0, "ingestion_time": 5000}, "foo": "bar"}\n'
'{"awslogs": {"log_group": "AAA", "log_stream": "EEE", "timestamp": 0, "ingestion_time": 5000}, "foo": {"bar": "baz"}}\n'
'{"awslogs": {"log_group": "AAA", "log_stream": "DDD", "timestamp": 0, "ingestion_time": 5006}, "message": "Hello 3 \\ud83d\\udc4e"}\n'
)
self.assertEqual(output, expected)
assert exit_code == 0

@patch("awslogs.core.boto3_client")
@patch("sys.stdout", new_callable=StringIO)
def test_get_nogroup(self, mock_stdout, botoclient):
Expand Down