Skip to content

Commit 6b7bb8b

Browse files
authored
Log compression (#45, PR #52)
1 parent 6317217 commit 6b7bb8b

File tree

4 files changed

+170
-12
lines changed

4 files changed

+170
-12
lines changed

CONFIG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ TODO: explain how to set limits, with default, project and spider specificity.
4747

4848
### [joblogs] section
4949
* `logs_dir` - a directory to store log files collected on k8s cluster (implemented only for Kubernetes). If you are using a Persistent Volume, keep in mind, that the provided path should be mounted in the deployment manifest. Read and write permissions should be granted to allow actions with log files in the provided directory, thus a [securityContext](https://kubernetes.io/docs/tasks/configure-pod-container/security-context/) was added to the deployment manifest.
50-
50+
* `compression_method` - a method to compress log files. Available options are `gzip` `bzip2`, `lzma`, `brotli` and `none`. If no compression_method is provided, it defaults to `none`.
5151

5252

5353
### Kubernetes API interaction

requirements.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,5 @@ flask>=2.0.0
44
natsort>=8.0.0
55
Flask-BasicAuth>=0.2.0
66
MarkupSafe>=2.1.5
7-
apache-libcloud>=3.8.0
7+
apache-libcloud>=3.8.0
8+
brotli>=1.1.0 # only if you plan to use brotli compression for log files in joblogs module

scrapyd_k8s/object_storage/libcloud_driver.py

Lines changed: 39 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
InvalidContainerNameError,
99
)
1010
from libcloud.storage.providers import get_driver
11+
from scrapyd_k8s.object_storage.log_compressor import Compression
1112

1213
logger = logging.getLogger(__name__)
1314
logging.basicConfig(level=logging.DEBUG)
@@ -63,6 +64,9 @@ def __init__(self, config):
6364
logger.error("Container name is not set in the configuration.")
6465
raise ValueError("Container name is not set")
6566

67+
# Reading the compression method from the config
68+
self.compression_method = config.joblogs().get('compression_method', "none")
69+
6670
args_envs = config.joblogs_storage(self._storage_provider)
6771
args = {}
6872
for arg, value in args_envs.items():
@@ -136,22 +140,47 @@ def upload_file(self, project, spider, local_path):
136140
Logs information about the upload status or errors encountered.
137141
"""
138142
job_id = os.path.basename(local_path).replace('.txt', '')
139-
object_name = f"logs/{project}/{spider}/{job_id}.log"
143+
compressed_file_path = None
144+
file_to_upload = local_path
145+
object_name = None
146+
140147
try:
148+
object_name = f"logs/{project}/{spider}/{job_id}.log"
149+
if self.compression_method != 'none':
150+
try:
151+
compression = Compression(self.compression_method)
152+
compressed_file_path = compression.compress(local_path)
153+
file_to_upload = compressed_file_path
154+
extension = compression.get_extension()
155+
object_name = f"logs/{project}/{spider}/{job_id}.log.{extension}"
156+
except Exception as e:
157+
logger.error(f"Compression failed, will upload uncompressed file: {e}")
158+
# Fallback to uncompressed upload
159+
object_name = f"logs/{project}/{spider}/{job_id}.log"
160+
141161
container = self.driver.get_container(container_name=self._container_name)
142-
self.driver.upload_object(
143-
local_path,
144-
container,
145-
object_name,
146-
extra=None,
147-
verify_hash=False,
148-
headers=None
149-
)
150-
logger.info(f"Successfully uploaded '{object_name}' to container '{self._container_name}'.")
162+
with open(file_to_upload, 'rb') as file:
163+
self.driver.upload_object_via_stream(
164+
file,
165+
container,
166+
object_name,
167+
extra=None,
168+
headers=None
169+
)
170+
if self.compression_method and self.compression_method != 'none' and compressed_file_path != local_path:
171+
logger.info(
172+
f"Successfully uploaded compressed file '{object_name}' to container '{self._container_name}'.")
173+
else:
174+
logger.info(f"Successfully uploaded file '{object_name}' to container '{self._container_name}'.")
151175
except (ObjectError, ContainerDoesNotExistError, InvalidContainerNameError) as e:
152176
logger.exception(f"Error uploading the file '{object_name}': {e}")
153177
except Exception as e:
154178
logger.exception(f"An unexpected error occurred while uploading '{object_name}': {e}")
179+
finally:
180+
# Remove temporary file even if upload fails
181+
if compressed_file_path and os.path.exists(compressed_file_path):
182+
os.remove(compressed_file_path)
183+
logger.debug(f"Removed temporary compressed file '{compressed_file_path}'.")
155184

156185
def object_exists(self, prefix):
157186
"""
Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,128 @@
1+
import gzip
2+
import bz2
3+
import lzma
4+
import tempfile
5+
import logging
6+
7+
logger = logging.getLogger(__name__)
8+
9+
try:
10+
import brotli
11+
BROTLI_AVAILABLE = True
12+
except ImportError:
13+
BROTLI_AVAILABLE = False
14+
logger.warning("Brotli module not available. Brotli compression will not be supported.")
15+
16+
class Compression:
17+
"""
18+
A class to handle compression of logs in different formats (gzip, bz2, lzma, brotli) using disk-based files.
19+
"""
20+
21+
SUPPORTED_METHODS = ['gzip', 'bzip2', 'lzma', 'brotli']
22+
COMPRESSION_CHUNK_SIZE = 1024
23+
24+
COMPRESSION_EXTENSIONS = {
25+
'gzip': 'gz',
26+
'bzip2': 'bz2',
27+
'lzma': 'xz',
28+
'brotli': 'br'
29+
}
30+
31+
def __init__(self, method="gzip"):
32+
"""
33+
Initializes the compression method.
34+
35+
Parameters
36+
----------
37+
method : str
38+
The compression method to use. Default is 'gzip'.
39+
40+
Raises
41+
------
42+
ValueError
43+
If the compression method is not supported.
44+
"""
45+
if method not in self.SUPPORTED_METHODS:
46+
raise ValueError(
47+
f"Unsupported compression method: {method}. Supported methods are {', '.join(self.SUPPORTED_METHODS)}")
48+
self.method = method
49+
self._compression_handlers = {
50+
'gzip': self._handle_streaming_compression(gzip.open),
51+
'bzip2': self._handle_streaming_compression(bz2.BZ2File),
52+
'lzma': self._handle_streaming_compression(lzma.open),
53+
'brotli': self._handle_brotli_compression
54+
}
55+
56+
def compress(self, input_file_path):
57+
"""
58+
Compresses the given input file and saves the compressed file on disk.
59+
60+
Parameters
61+
----------
62+
input_file_path : str
63+
The path to the file to compress.
64+
65+
Returns
66+
-------
67+
str
68+
Path to the compressed file.
69+
"""
70+
# Create a temporary file for the compressed data
71+
with tempfile.NamedTemporaryFile(delete=False, suffix=f'.log.{self.method}') as temp_file:
72+
temp_compressed_file = temp_file.name
73+
74+
try:
75+
# Get the appropriate compression handler and call it
76+
handler = self._compression_handlers[self.method]
77+
result = handler(input_file_path, temp_compressed_file)
78+
79+
logger.info(
80+
f"Successfully compressed file to '{temp_compressed_file}' using {self.method} compression.")
81+
return result
82+
except Exception as e:
83+
logger.error(f"Error during compression: {e}")
84+
raise
85+
86+
def get_extension(self):
87+
"""
88+
Returns the file extension for the current compression method.
89+
90+
Returns
91+
-------
92+
str
93+
The file extension for the current compression method.
94+
"""
95+
return self.COMPRESSION_EXTENSIONS.get(self.method, self.method)
96+
97+
def _handle_streaming_compression(self, open_func):
98+
"""
99+
Create a handler for streaming compression methods (gzip, bzip2, lzma).
100+
101+
Parameters
102+
----------
103+
open_func : callable
104+
The function to open a compressed file (e.g., gzip.open).
105+
106+
Returns
107+
-------
108+
callable
109+
A function that handles the compression.
110+
"""
111+
112+
def handler(input_file_path, output_file_path):
113+
with open(input_file_path, 'rb') as f_in:
114+
with open_func(output_file_path, 'wb') as f_out:
115+
while chunk := f_in.read(self.COMPRESSION_CHUNK_SIZE):
116+
f_out.write(chunk)
117+
return output_file_path
118+
119+
return handler
120+
121+
def _handle_brotli_compression(self, input_file_path, output_file_path):
122+
"""Handle the brotli compression method."""
123+
with open(input_file_path, 'rb') as f_in:
124+
compressed_data = brotli.compress(f_in.read())
125+
with open(output_file_path, 'wb') as f_out:
126+
f_out.write(compressed_data)
127+
return output_file_path
128+

0 commit comments

Comments
 (0)