Skip to content

Commit 117b9d9

Browse files
refactor: sdk and cli commands
1 parent e5da3d0 commit 117b9d9

File tree

7 files changed

+121
-233
lines changed

7 files changed

+121
-233
lines changed

src/cloudsnake/cli/cli.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,6 @@ def version_cmd():
3939
)
4040
)
4141

42-
4342
@app.callback()
4443
def entrypoint(
4544
ctx: typer.Context,

src/cloudsnake/cli/ssm.py

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,7 @@
88

99
EC2_RUNNING_FILTER = "Name=instance-state-name,Values=running"
1010

11-
EC2_INSTANCE_SELECTOR_QUERY = (
12-
"Reservations[*].Instances[*].{"
13-
"TargetId: InstanceId, "
14-
"Name: Tags[?Key=='Name'].Value | [0]"
15-
"}"
16-
)
11+
EC2_INSTANCE_SELECTOR_QUERY = "[].{TargetId: InstanceId, Name: Tags[?Key=='Name'].Value | [0]}"
1712

1813
ssm = typer.Typer(
1914
no_args_is_help=True,
@@ -39,18 +34,20 @@ def start_session(
3934
),
4035
):
4136
ssm = SSMStartSessionWrapper(
37+
session=ctx.obj.session,
4238
profile=ctx.obj.profile,
4339
region=ctx.obj.region,
4440
)
45-
ssm.create_client(ctx.obj.session)
41+
4642
if with_instance_selector:
4743
ec2 = EC2InstanceWrapper(
44+
session=ctx.obj.session,
4845
filters=EC2_RUNNING_FILTER,
4946
query=EC2_INSTANCE_SELECTOR_QUERY,
5047
profile=ctx.obj.profile,
5148
region=ctx.obj.region,
5249
)
53-
ec2.create_client(ctx.obj.session)
50+
5451
instances = ec2.describe_ec2_instances()
5552
if not instances:
5653
typer.echo("No running instances found.")

src/cloudsnake/sdk/aws.py

Lines changed: 38 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,48 +1,51 @@
1+
from __future__ import annotations
2+
13
import logging
2-
from botocore.config import Config
4+
from abc import ABC, abstractmethod
5+
from typing import Optional, Any
36

7+
import boto3
8+
from botocore.config import Config
49

5-
class App:
6-
"""
7-
Common AWS wrapper base class for cloudsnake SDK.
8-
Provides: logging, client creation, session storage, region/profile propagation.
9-
"""
1010

11+
class App(ABC):
1112
def __init__(
1213
self,
13-
client=None,
14-
filters=None,
15-
query=None,
16-
profile=None,
17-
region=None,
18-
session=None,
14+
session: Optional[boto3.Session] = None,
15+
region: Optional[str] = None,
16+
profile: Optional[str] = None,
17+
filters: Optional[dict] = None,
18+
query: Optional[Any] = None,
19+
retries: int = 10,
1920
**kwargs,
2021
):
21-
self.log = logging.getLogger("cloudsnake")
22-
22+
self.log = logging.getLogger(self.__class__.__name__)
2323
self.filters = filters
2424
self.query = query
25-
self.client_name = client
25+
self.session = session
2626
self.profile = profile
2727
self.region = region
28-
self.session = session
29-
self.client = None
28+
self.retries = retries
29+
self._client = None
3030

31-
def create_client(self, session):
32-
"""
33-
Create a boto3 client using the provided session.
34-
Stores the session, region, and profile internally.
35-
"""
36-
self.session = session
37-
try:
38-
if not self.region:
39-
self.region = session.region_name
40-
if not self.profile and hasattr(session, "profile_name"):
41-
self.profile = session.profile_name
42-
except Exception:
43-
pass
44-
45-
config = Config(retries={"max_attempts": 10, "mode": "standard"})
46-
47-
self.client = session.client(self.client_name, config=config)
48-
return self.client
31+
@property
32+
@abstractmethod
33+
def client_name(self) -> str:
34+
pass
35+
36+
@property
37+
def client(self):
38+
if self._client is None:
39+
if not self.session:
40+
raise RuntimeError("No boto3 session available")
41+
self._client = self._create_client()
42+
return self._client
43+
44+
def _create_client(self):
45+
if not self.region and self.session.region_name:
46+
self.region = self.session.region_name
47+
if not self.profile and getattr(self.session, "profile_name", None):
48+
self.profile = self.session.profile_name
49+
50+
cfg = Config(retries={"max_attempts": self.retries, "mode": "standard"})
51+
return self.session.client(self.client_name, config=cfg)
Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,59 +1,41 @@
1+
from __future__ import annotations
2+
3+
import boto3
14
import logging
25
import os
3-
import boto3
6+
from typing import Optional
47

58

69
class SessionWrapper:
7-
"""Encapsulates AWS boto3 Session operations"""
8-
9-
def __init__(self, profile: str | None = None, region: str = "us-east-1"):
10-
"""
11-
:param profile: AWS credentials profile (defaults to value in AWS_PROFILE).
12-
:param region: AWS region.
13-
"""
10+
def __init__(self, profile: Optional[str] = None, region: str = "us-east-1"):
1411
self.log = logging.getLogger("cloudsnake.session")
15-
1612
self.profile = profile or os.getenv("AWS_PROFILE")
1713
self.region = region
1814

1915
if not self.profile:
20-
self.log.warning("No AWS profile provided. Using environment default.")
16+
self.log.warning("No AWS profile provided, falling back to environment defaults")
2117

22-
self.log.debug(
23-
f"SessionWrapper initialized with profile={self.profile} region={self.region}"
24-
)
18+
self.log.debug(f"SessionWrapper(profile={self.profile}, region={self.region})")
2519

2620
def with_local_session(self) -> boto3.Session:
27-
"""Return a boto3 Session using ~/.aws/credentials"""
28-
self.log.debug("Using local AWS credentials via ~/.aws/credentials")
29-
return boto3.Session(
30-
profile_name=self.profile,
31-
region_name=self.region,
32-
)
21+
self.log.debug("Creating local boto3.Session using ~/.aws/credentials")
22+
return boto3.Session(profile_name=self.profile, region_name=self.region)
3323

3424
def with_sts_assume_role_session(self, role_arn: str) -> boto3.Session:
35-
"""Return a boto3 Session via STS AssumeRole"""
3625
self.log.debug(f"Assuming role via STS: {role_arn}")
3726

38-
base_session = boto3.Session(
39-
profile_name=self.profile,
40-
region_name=self.region,
41-
)
42-
sts = base_session.client("sts")
43-
44-
response = sts.assume_role(
45-
RoleArn=role_arn,
46-
RoleSessionName="cloudsnake-session",
47-
)
27+
base = boto3.Session(profile_name=self.profile, region_name=self.region)
28+
sts = base.client("sts")
4829

49-
creds = response["Credentials"]
30+
res = sts.assume_role(RoleArn=role_arn, RoleSessionName="cloudsnake-session")
31+
creds = res["Credentials"]
5032

51-
assumed = boto3.Session(
33+
session = boto3.Session(
5234
aws_access_key_id=creds["AccessKeyId"],
5335
aws_secret_access_key=creds["SecretAccessKey"],
5436
aws_session_token=creds["SessionToken"],
5537
region_name=self.region,
5638
)
5739

58-
self.log.debug("AssumeRole session created successfully")
59-
return assumed
40+
self.log.debug("STS assumed role session created")
41+
return session

src/cloudsnake/sdk/ec2.py

Lines changed: 28 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,54 +1,43 @@
1+
from __future__ import annotations
2+
13
import jmespath
4+
from typing import Any, Dict, List, Optional
5+
26
from cloudsnake.helpers import parse_filters
37
from cloudsnake.sdk.aws import App
48
from botocore.exceptions import ClientError
59

610

711
class EC2InstanceWrapper(App):
8-
"""
9-
Wrapper class for managing EC2 instances.
10-
"""
11-
12-
def __init__(self, client="ec2", filters=None, query=None, **kwargs):
13-
"""
14-
Initialize the EC2 class.
15-
16-
Args:
17-
client (EC2Client): The EC2 client object.
18-
instances (list, optional): A list of EC2 instances. Defaults to None.
19-
filters (str, optional): Additional filters for querying instances. Defaults to "".
20-
query (str, optional): A query string for filtering instances. Defaults to "".
21-
**kwargs: Additional keyword arguments.
22-
23-
"""
24-
super().__init__(client, filters, query, **kwargs)
25-
self.instances = {}
26-
27-
def describe_ec2_instances(self):
28-
"""
29-
AWS EC2 describe instances.
30-
"""
31-
if self.filters:
32-
parsed_filters = parse_filters(self.filters)
33-
else:
34-
parsed_filters = []
35-
36-
self.log.info("Describing EC2 instances")
12+
def __init__(
13+
self,
14+
filters: Optional[str] = None,
15+
query: Optional[str] = None,
16+
**kwargs,
17+
):
18+
super().__init__(filters=filters, query=query, **kwargs)
19+
self.instances: Dict[str, Any] = {}
20+
21+
@property
22+
def client_name(self) -> str:
23+
return "ec2"
24+
25+
def describe_ec2_instances(self) -> Any:
26+
parsed_filters = parse_filters(self.filters) if self.filters else []
27+
3728
try:
3829
paginator = self.client.get_paginator("describe_instances")
39-
4030
for page in paginator.paginate(Filters=parsed_filters):
41-
self.instances.update(page)
31+
for reservation in page.get("Reservations", []):
32+
for instance in reservation.get("Instances", []):
33+
iid = instance.get("InstanceId")
34+
if iid:
35+
self.instances[iid] = instance
36+
37+
return jmespath.search(self.query, list(self.instances.values())) if self.query else list(self.instances.values())
4238

43-
if self.query is not None:
44-
result = jmespath.search(self.query, self.instances)
45-
return result
46-
else:
47-
return self.instances
4839
except ClientError as err:
4940
self.log.error(
50-
"Couldn't register device",
51-
err.response["Error"]["Code"],
52-
err.response["Error"]["Message"],
41+
f"EC2 describe_instances failed: {err.response['Error']['Code']} - {err.response['Error']['Message']}"
5342
)
5443
raise

src/cloudsnake/sdk/ssm_session.py

Lines changed: 25 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,49 @@
1+
from __future__ import annotations
2+
13
import errno
24
import json
35
import shutil
46
import subprocess
57
import logging
8+
from typing import Optional, Dict, Any
69

710
from cloudsnake.helpers import ignore_user_entered_signals
811
from cloudsnake.sdk.aws import App
912

1013

1114
PLUGIN_NOT_FOUND_MSG = """
1215
Session Manager Plugin not found.
13-
14-
Install it following:
1516
https://docs.aws.amazon.com/systems-manager/latest/userguide/session-manager-working-with-install-plugin.html
1617
"""
1718

1819

1920
class SSMStartSessionWrapper(App):
20-
"""Encapsulates Amazon SSM Start Session actions."""
21-
22-
def __init__(self, client="ssm", session_response_output=None, **kwargs):
23-
super().__init__(client, **kwargs)
21+
def __init__(
22+
self,
23+
session_response_output: Optional[Dict[str, Any]] = None,
24+
**kwargs,
25+
):
26+
super().__init__(**kwargs)
2427
self.session_response_output = session_response_output
2528
self.log = logging.getLogger("cloudsnake.ssm")
2629

30+
@property
31+
def client_name(self) -> str:
32+
return "ssm"
33+
2734
def _ensure_plugin_installed(self):
2835
if shutil.which("session-manager-plugin") is None:
29-
self.log.error("SessionManagerPlugin not found")
3036
raise FileNotFoundError(PLUGIN_NOT_FOUND_MSG)
3137

32-
def start_session_response(self, target: str) -> dict:
33-
"""Start an SSM session and store the response."""
34-
self.log.debug(f"Calling ssm.start_session(Target='{target}')")
35-
36-
response = self.client.start_session(
37-
Target=target,
38-
Reason="Session started by cloudsnake",
39-
)
40-
self.session_response_output = response
41-
return response
38+
def start_session_response(self, target: str) -> Dict[str, Any]:
39+
self.log.debug(f"ssm.start_session(Target={target})")
40+
res = self.client.start_session(Target=target, Reason="cloudsnake session")
41+
self.session_response_output = res
42+
return res
4243

4344
def start_session(self, target: str):
44-
"""
45-
Start an SSM session using the session-manager-plugin.
46-
Uses: self.profile and self.region inherited from App.
47-
"""
4845
self._ensure_plugin_installed()
49-
50-
self.log.info(f"Starting SSM session for instance {target}...")
46+
self.log.info(f"Starting SSM session for {target}")
5147
self.start_session_response(target)
5248

5349
try:
@@ -63,28 +59,26 @@ def start_session(self, target: str):
6359
f"https://ssm.{self.region}.amazonaws.com",
6460
]
6561
)
66-
self.log.info("SSM session closed normally")
62+
self.log.info("Session closed cleanly")
6763
return 0
6864

6965
except subprocess.CalledProcessError as e:
70-
self.log.error(f"Failed to start SSM session: {e}", exc_info=True)
66+
self.log.error(f"SSM session failed: {e}")
7167
self.terminate_session()
7268
raise
7369

7470
except OSError as ex:
7571
if ex.errno == errno.ENOENT:
76-
self.log.error("SessionManagerPlugin is missing", exc_info=True)
7772
self.terminate_session()
7873
raise FileNotFoundError(PLUGIN_NOT_FOUND_MSG) from ex
7974
else:
8075
self.log.error("OS error during SSM session", exc_info=True)
8176
raise
8277

8378
def terminate_session(self) -> None:
84-
"""Terminate the SSM session."""
8579
if self.session_response_output and "SessionId" in self.session_response_output:
86-
session_id = self.session_response_output["SessionId"]
87-
self.log.debug(f"Terminating SSM session {session_id}")
88-
self.client.terminate_session(SessionId=session_id)
80+
sid = self.session_response_output["SessionId"]
81+
self.log.debug(f"terminate_session {sid}")
82+
self.client.terminate_session(SessionId=sid)
8983
else:
90-
self.log.warning("No active session to terminate")
84+
self.log.warning("No session to terminate")

0 commit comments

Comments
 (0)