Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Pipfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ crcmod = "==1.7"
future = "==0.17.1"
protobuf = "==4.23.4"
psutil = "==5.9.4"
grpcio-tools = "*"

[dev-packages]
Fabric = "==1.14.1"
Expand Down
6 changes: 5 additions & 1 deletion src/clusterfuzz/_internal/bot/tasks/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -879,7 +879,11 @@ def archive_testcase_and_dependencies_in_gcs(resource_list, testcase_path: str,
archived = True
absolute_filename = testcase_path[base_len:]

if not storage.upload_signed_url(file_handle, upload_url):
if isinstance(upload_url, uworker_msg_pb2.SignedPolicyDocument):
if not storage.upload_signed_policy(file_handle, upload_url):
logs.error('Failed to upload testcase with signed policy.')
return None, None, None
elif not storage.upload_signed_url(file_handle, upload_url):
logs.error('Failed to upload testcase.')
return None, None, None

Expand Down
39 changes: 26 additions & 13 deletions src/clusterfuzz/_internal/bot/tasks/utasks/corpus_pruning_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,8 +252,8 @@ def __init__(self, uworker_input, fuzz_target, cross_pollinate_fuzzers):
uworker_input.corpus_pruning_task_input.quarantine_corpus)
self.dated_backup_gcs_url = (
uworker_input.corpus_pruning_task_input.dated_backup_gcs_url)
self.dated_backup_signed_url = (
uworker_input.corpus_pruning_task_input.dated_backup_signed_url)
self.dated_backup_signed_policy = (
uworker_input.corpus_pruning_task_input.dated_backup_signed_policy)

def restore_quarantined_units(self):
"""Restore units from the quarantine."""
Expand Down Expand Up @@ -726,9 +726,11 @@ def do_corpus_pruning(context, revision) -> CorpusPruningResult:
'regressions')
if shell.get_directory_file_count(regressions_input_dir):
shutil.copytree(regressions_input_dir, regressions_output_dir)
backup_succeeded = corpus_manager.backup_corpus(
context.dated_backup_signed_url, context.corpus,
context.minimized_corpus_path)
corpus_manager.backup_corpus(
context.dated_backup_signed_url,
context.corpus,
context.corpus_path,
dated_backup_signed_policy=context.dated_backup_signed_policy)
corpus_backup_location = (
context.dated_backup_gcs_url if backup_succeeded else None)
shell.remove_directory(regressions_output_dir)
Expand Down Expand Up @@ -1142,13 +1144,23 @@ def _create_backup_urls(fuzz_target: data_types.FuzzTarget,
backup_bucket_name, engine_name, fuzz_target.project_qualified_name(),
corpus_manager.LATEST_BACKUP_TIMESTAMP)

dated_backup_signed_url = storage.get_signed_upload_url(dated_backup_gcs_url)

corpus_pruning_task_input.dated_backup_gcs_url = dated_backup_gcs_url
corpus_pruning_task_input.latest_backup_gcs_url = latest_backup_gcs_url
corpus_pruning_task_input.dated_backup_signed_url = dated_backup_signed_url


dated_backup_signed_policy = None
if dated_backup_gcs_url:
dated_backup_signed_policy = None
if dated_backup_gcs_url:
dated_backup_signed_policy = None
if dated_backup_gcs_url:
dated_backup_signed_url, dated_backup_signed_policy = (
storage.get_signed_upload_url_with_policy(dated_backup_gcs_url))
else:
dated_backup_signed_url = None
dated_backup_signed_policy = None

corpus_pruning_task_input.dated_backup_gcs_url = dated_backup_gcs_url
corpus_pruning_task_input.latest_backup_gcs_url = latest_backup_gcs_url
corpus_pruning_task_input.dated_backup_signed_url = dated_backup_signed_url
corpus_pruning_task_input.dated_backup_signed_policy = (
dated_backup_signed_policy)
def _utask_preprocess(fuzzer_name, job_type, uworker_env):
"""Runs preprocessing for corpus pruning task."""
fuzz_target = data_handler.get_fuzz_target(fuzzer_name)
Expand Down Expand Up @@ -1199,7 +1211,8 @@ def _utask_preprocess(fuzzer_name, job_type, uworker_env):
corpus=corpus.proto_corpus,
quarantine_corpus=quarantine_corpus.proto_corpus,
corpus_crashes_blob_name=corpus_crashes_blob_name,
corpus_crashes_upload_url=corpus_crashes_upload_url)
corpus_crashes_upload_url=corpus_crashes_upload_url,
dated_backup_signed_policy=dated_backup_signed_policy)

_create_backup_urls(fuzz_target, corpus_pruning_task_input)

Expand Down
56 changes: 44 additions & 12 deletions src/clusterfuzz/_internal/bot/tasks/utasks/fuzz_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ class FuzzErrorCode:
'project_name', 'bot_name', 'job_type', 'fuzz_target', 'redzone',
'disable_ubsan', 'platform_id', 'crash_revision', 'fuzzer_name',
'window_argument', 'fuzzer_metadata', 'testcases_metadata',
'timeout_multiplier', 'test_timeout', 'data_directory'
'timeout_multiplier', 'test_timeout', 'data_directory',
'crash_upload_policy'
])

GenerateBlackboxTestcasesResult = collections.namedtuple(
Expand All @@ -119,10 +120,21 @@ class NoMoreUploadUrlsError(Exception):
class UploadUrlCollection:
"""Upload URLs collection."""

def __init__(self, upload_urls: List[uworker_msg_pb2.BlobUploadUrl]):
def __init__(self, upload_urls: List[uworker_msg_pb2.BlobUploadUrl],
upload_policy: uworker_msg_pb2.SignedPolicyDocument):
self.upload_urls = upload_urls
self.upload_policy = upload_policy

def get(self) -> uworker_msg_pb2.BlobUploadUrl:
if self.upload_policy:
# If we have a policy, generate a new key and return a BlobUploadUrl
# with the policy.
key = blobs.generate_new_blob_name()
return uworker_msg_pb2.BlobUploadUrl(
key=key,
url=self.upload_policy.url,
signed_policy=self.upload_policy)

if not self.upload_urls:
raise NoMoreUploadUrlsError

Expand Down Expand Up @@ -266,9 +278,10 @@ def archive_testcase_in_blobstore(self,
# TODO(metzman): Figure out if we need this check and if we can get rid of
# the archived return value.
self.fuzzed_key = upload_url.key
url_or_policy = upload_url.signed_policy or upload_url.url
self.archived, self.absolute_path, self.archive_filename = (
setup.archive_testcase_and_dependencies_in_gcs(
self.resource_list, self.file_path, upload_url.url))
self.resource_list, self.file_path, url_or_policy))

def is_valid(self):
"""Return true if the crash is valid for processing."""
Expand Down Expand Up @@ -750,9 +763,9 @@ def store_fuzzer_run_results(testcase_file_paths, fuzzer, fuzzer_command,
fuzzer_run_results_output = uworker_msg_pb2.StoreFuzzerRunResultsOutput() # pylint: disable=no-member
if testcase_file_paths:
with open(testcase_file_paths[0], 'rb') as sample_testcase_file_handle:
sample_testcase_file = sample_testcase_file_handle.read()
storage.upload_signed_url(sample_testcase_file,
fuzz_task_input.sample_testcase_upload_url)
storage.upload_signed_url_with_policy(
fuzz_task_input.corpus_upload_policy,
sample_testcase_file_handle)

# Store fuzzer console output.
bot_name = environment.get_value('BOT_NAME')
Expand All @@ -776,9 +789,16 @@ def preprocess_store_fuzzer_run_results(fuzz_task_input):
URLs to upload a sample testcase and the logs."""
if environment.is_engine_fuzzer_job():
return
fuzz_task_input.sample_testcase_upload_key = blobs.generate_new_blob_name()
fuzz_task_input.sample_testcase_upload_url = blobs.get_signed_upload_url(
fuzz_task_input.sample_testcase_upload_key)
fuzz_task_input.corpus_upload_key = blobs.generate_new_blob_name()

# TODO(metzman): Change all callers to use this and remove the legacy path.
_, corpus_upload_policy = blobs.get_blob_signed_upload_url(
file_size=10 * 1024 * 1024, # 10MB default
file_type='application/octet-stream')
fuzz_task_input.corpus_upload_url = corpus_upload_policy[
'url']
fuzz_task_input.corpus_upload_policy.update(
corpus_upload_policy)
script_log_upload_key = blobs.generate_new_blob_name()
fuzz_task_input.script_log_upload_url = blobs.get_signed_upload_url(
script_log_upload_key)
Expand All @@ -801,7 +821,7 @@ def postprocess_store_fuzzer_run_results(output):
logs.info('Fuzzer was recently updated, skipping results from old version.')
return
fuzzer.sample_testcase = (
uworker_input.fuzz_task_input.sample_testcase_upload_key)
uworker_input.fuzz_task_input.corpus_upload_key)
fuzzer.console_output = fuzzer_run_results.console_output
fuzzer.result = fuzzer_run_results.generated_testcase_string
fuzzer.result_timestamp = datetime.datetime.utcnow()
Expand Down Expand Up @@ -1224,7 +1244,7 @@ def key_fn(crash):
crashes = filter_crashes(crashes)
group_of_crashes = itertools.groupby(sorted(crashes, key=key_fn), key_fn)

upload_urls = UploadUrlCollection(upload_urls)
upload_urls = UploadUrlCollection(upload_urls, context.crash_upload_policy)
for _, grouped_crashes in group_of_crashes:
try:
group = CrashGroup(list(grouped_crashes), context, upload_urls)
Expand Down Expand Up @@ -1973,7 +1993,9 @@ def run(self):
testcases_metadata=testcases_metadata,
timeout_multiplier=self.timeout_multiplier,
test_timeout=self.test_timeout,
data_directory=self.data_directory),
test_timeout=self.test_timeout,
data_directory=self.data_directory,
crash_upload_policy=self.uworker_input.fuzz_task_input.crash_upload_policy),
upload_urls=list(self.uworker_input.fuzz_task_input.crash_upload_urls))

# Delete the fuzzed testcases. This was once explicitly needed since some
Expand Down Expand Up @@ -2176,10 +2198,20 @@ def _utask_preprocess(fuzzer_name, job_type, uworker_env):
use_backup=True).serialize())

fuzz_task_input.trials.extend(trials.preprocess_get_db_trials())
<<<<<<< Updated upstream
for _ in range(MAX_CRASHES_UPLOADED):
url = fuzz_task_input.crash_upload_urls.add()
url.key = blobs.generate_new_blob_name()
url.url = blobs.get_signed_upload_url(url.key)
=======

# Generate a signed policy for crash uploads.
crash_upload_key = blobs.generate_new_blob_name()
fuzz_task_input.crash_upload_policy.CopyFrom(
uworker_io.entity_to_protobuf(
storage.get_signed_policy_document(
blobs.get_gcs_path(crash_upload_key))))
>>>>>>> Stashed changes

preprocess_store_fuzzer_run_results(fuzz_task_input)

Expand Down
40 changes: 17 additions & 23 deletions src/clusterfuzz/_internal/fuzzing/corpus_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,24 +508,11 @@ def _sync_corpus_to_disk(self, corpus, directory):
# TODO(metzman): Add timeout and tolerance for missing URLs.
return fails < MAX_SYNC_ERRORS

def upload_files(self, file_paths, timeout=CORPUS_FILES_SYNC_TIMEOUT) -> bool:
def upload_files(self, file_paths, timeout=CORPUS_FILES_SYNC_TIMEOUT):
del timeout
num_upload_urls = len(self.proto_corpus.corpus.upload_urls)
if len(file_paths) > num_upload_urls:
logs.error(f'Cannot upload {len(file_paths)} filepaths, only have '
f'{len(self.proto_corpus.corpus.upload_urls)} upload urls.')
file_paths = file_paths[:num_upload_urls]

logs.info(f'Uploading {len(file_paths)} corpus files.')
results = storage.upload_signed_urls(self.proto_corpus.corpus.upload_urls,
file_paths)

# Make sure we don't reuse upload_urls.
urls_remaining = self.proto_corpus.corpus.upload_urls[len(results):]
del self.proto_corpus.corpus.upload_urls[:]
self.proto_corpus.corpus.upload_urls.extend(urls_remaining)

return results
return storage.upload_files_with_policy(
self.proto_corpus.corpus.upload_policy, file_paths)

def get_gcs_url(self):
return self.proto_corpus.corpus.gcs_url
Expand All @@ -550,13 +537,17 @@ def gcs_url_for_backup_file(backup_bucket_name, fuzzer_name,
return f'{backup_dir.rstrip("/")}/{backup_file}'


def backup_corpus(dated_backup_signed_url, corpus, directory):
def backup_corpus(dated_backup_signed_url,
corpus,
directory,
dated_backup_signed_policy=None):
"""Archive and store corpus as a backup.

Args:
dated_backup_signed_url: Signed url to upload the backup.
corpus: uworker_msg.FuzzTargetCorpus.
directory: Path to directory to be archived and backuped.
dated_backup_signed_policy: Signed policy to upload the backup.

Returns:
The backup GCS url, or None on failure.
Expand All @@ -577,8 +568,13 @@ def backup_corpus(dated_backup_signed_url, corpus, directory):
backup_archive_path = shutil.make_archive(backup_archive_path,
BACKUP_ARCHIVE_FORMAT, directory)
with open(backup_archive_path, 'rb') as fp:
data = fp.read()
if not storage.upload_signed_url(data, dated_backup_signed_url):
if dated_backup_signed_policy:
# TODO(metzman): Change all callers to use this and remove the legacy
# path.
if not storage.upload_signed_url_with_policy(
dated_backup_signed_policy, fp):
return False
elif not storage.upload_signed_url(fp, dated_backup_signed_url):
return False
except Exception as ex:
backup_succeeded = False
Expand Down Expand Up @@ -705,12 +701,10 @@ def get_proto_corpus(bucket_name,
urls = itertools.islice(urls, max_download_urls)

corpus_urls = storage.sign_urls_for_existing_files(urls, include_delete_urls)
upload_urls = storage.get_arbitrary_signed_upload_urls(
gcs_url, num_uploads=max_upload_urls)
upload_policy = storage.get_signed_upload_policy(gcs_url)

# Iterate over imap_unordered results.
for upload_url in upload_urls:
corpus.upload_urls.append(upload_url)
corpus.upload_policy.update(upload_policy)
for download_url, delete_url in corpus_urls:
corpus.corpus_urls[download_url] = delete_url

Expand Down
2 changes: 1 addition & 1 deletion src/clusterfuzz/_internal/google_cloud_utils/blobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ class BlobsError(Exception):
def _is_gcs_key(blob_key):
"""Return whether if the key is a GCS key."""
gcs_key_pattern = re.compile(
r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$')
r'^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}(/.*)?$')

return bool(gcs_key_pattern.match(blob_key))

Expand Down
Loading