|
| 1 | +#!/bin/bash |
| 2 | +# Common functions for ohmx-adiff-builder scripts |
| 3 | + |
| 4 | +upload_diff_files() { |
| 5 | + # Optional parameter: "once" to execute only once, any other value or empty for continuous loop |
| 6 | + local mode="${1:-loop}" |
| 7 | + |
| 8 | + mkdir -p "$(dirname "$UPLOAD_TRACK_FILE")" |
| 9 | + touch "$UPLOAD_TRACK_FILE" |
| 10 | + |
| 11 | + declare -A uploaded_md5s |
| 12 | + while read -r line; do |
| 13 | + file=$(echo "$line" | awk '{print $1}') |
| 14 | + hash=$(echo "$line" | awk '{print $2}') |
| 15 | + uploaded_md5s["$file"]="$hash" |
| 16 | + done < "$UPLOAD_TRACK_FILE" |
| 17 | + |
| 18 | + # Function to process files once |
| 19 | + process_upload() { |
| 20 | + echo "Uploading files at $(date)..." |
| 21 | + # Search in bucket-data (where compressed changeset files are) |
| 22 | + # and also in bucket-data/replication/minute (where replication files are) |
| 23 | + # Use process substitution to avoid subshell |
| 24 | + while IFS= read -r filepath; do |
| 25 | + [ -z "$filepath" ] || [ ! -f "$filepath" ] && continue |
| 26 | + filename=$(basename "$filepath") |
| 27 | + current_md5=$(md5sum "$filepath" | awk '{print $1}') |
| 28 | + |
| 29 | + if [[ -n "${uploaded_md5s[$filename]}" ]]; then |
| 30 | + if [[ "${uploaded_md5s[$filename]}" == "$current_md5" ]]; then |
| 31 | + echo "Skipping unchanged: $filename" |
| 32 | + continue |
| 33 | + else |
| 34 | + echo "File changed: $filename — reuploading" |
| 35 | + fi |
| 36 | + else |
| 37 | + echo "New file: $filename — uploading" |
| 38 | + fi |
| 39 | + |
| 40 | + aws s3 cp "$filepath" "s3://$AWS_S3_BUCKET/ohm-augmented-diffs/changesets/$filename" \ |
| 41 | + --content-type "application/xml" \ |
| 42 | + --content-encoding "gzip" && \ |
| 43 | + uploaded_md5s["$filename"]="$current_md5" |
| 44 | + done < <(find "$BUCKET_DIR" -type f -name '*.adiff' -mmin -60 2>/dev/null) |
| 45 | + |
| 46 | + # Update control file |
| 47 | + : > "$UPLOAD_TRACK_FILE" |
| 48 | + for fname in "${!uploaded_md5s[@]}"; do |
| 49 | + echo "$fname ${uploaded_md5s[$fname]}" >> "$UPLOAD_TRACK_FILE" |
| 50 | + done |
| 51 | + } |
| 52 | + |
| 53 | + if [[ "$mode" == "once" ]]; then |
| 54 | + # Execute only once |
| 55 | + process_upload |
| 56 | + else |
| 57 | + # Execute in continuous loop |
| 58 | + while true; do |
| 59 | + process_upload |
| 60 | + sleep 60 |
| 61 | + done |
| 62 | + fi |
| 63 | +} |
| 64 | + |
| 65 | +# Function to download and generate adiff files for a seqno range |
| 66 | +# Downloads .osc files from replication server, generates .adiff files, and updates osmx database |
| 67 | +download_and_generate_adiffs() { |
| 68 | + local seqno_min=$1 |
| 69 | + local seqno_max=$2 |
| 70 | + |
| 71 | + echo "Downloading and generating adiffs for range: $seqno_min - $seqno_max" |
| 72 | + |
| 73 | + eval "$(mise activate bash --shims)" |
| 74 | + |
| 75 | + for seqno in $(seq "$seqno_min" "$seqno_max"); do |
| 76 | + echo "Processing seqno: $seqno" |
| 77 | + |
| 78 | + # Get the replication URL for this seqno |
| 79 | + # Format: seqno is padded to 9 digits, split into 3 parts: XXX/XXX/XXX |
| 80 | + seqno_padded=$(printf "%09d" "$seqno") |
| 81 | + part1=$(echo "$seqno_padded" | cut -c1-3) |
| 82 | + part2=$(echo "$seqno_padded" | cut -c4-6) |
| 83 | + part3=$(echo "$seqno_padded" | cut -c7-9) |
| 84 | + url="${REPLICATION_URL}/${part1}/${part2}/${part3}.osc.gz" |
| 85 | + |
| 86 | + # Download and decompress the .osc file |
| 87 | + if ! curl -sL "$url" | gzip -d > "${seqno}.osc" 2>/dev/null; then |
| 88 | + echo "Warning: Failed to download seqno $seqno from $url, skipping..." |
| 89 | + continue |
| 90 | + fi |
| 91 | + |
| 92 | + # Check if .osc file was downloaded successfully |
| 93 | + if [ ! -f "${seqno}.osc" ] || [ ! -s "${seqno}.osc" ]; then |
| 94 | + echo "Warning: Empty or missing .osc file for seqno $seqno, skipping..." |
| 95 | + continue |
| 96 | + fi |
| 97 | + |
| 98 | + # Generate augmented diff |
| 99 | + tmpfile=$(mktemp) |
| 100 | + if ! python augmented_diff.py "$OSMX_DB_PATH" "${seqno}.osc" | xmlstarlet format > "$tmpfile" 2>/dev/null; then |
| 101 | + echo "Warning: Failed to generate adiff for seqno $seqno, skipping..." |
| 102 | + rm -f "${seqno}.osc" "$tmpfile" |
| 103 | + continue |
| 104 | + fi |
| 105 | + |
| 106 | + # Move adiff to replication directory |
| 107 | + mkdir -p "$REPLICATION_ADIFFS_DIR" |
| 108 | + mv "$tmpfile" "$REPLICATION_ADIFFS_DIR/${seqno}.adiff" |
| 109 | + |
| 110 | + # Get timestamp from replication state (if available) or use current time |
| 111 | + timestamp=$(date -u +%Y-%m-%dT%H:%M:%SZ) |
| 112 | + |
| 113 | + # Update osmx database |
| 114 | + if osmx update "$OSMX_DB_PATH" "${seqno}.osc" "$seqno" "$timestamp" --commit 2>/dev/null; then |
| 115 | + echo "Successfully processed seqno $seqno" |
| 116 | + else |
| 117 | + echo "Warning: Failed to update osmx database for seqno $seqno" |
| 118 | + fi |
| 119 | + |
| 120 | + # Clean up .osc file |
| 121 | + rm -f "${seqno}.osc" |
| 122 | + done |
| 123 | +} |
| 124 | + |
| 125 | +# Function to process adiff files by sequence number (seqno) range |
| 126 | +# Splits adiffs, moves them to bucket-data, merges split adiffs, uploads, and cleans up |
| 127 | +process_adiff_range() { |
| 128 | + local seqno_start=$1 |
| 129 | + local seqno_end=$2 |
| 130 | + |
| 131 | + mkdir -p "$REPLICATION_ADIFFS_DIR" "$SPLIT_ADIFFS_DIR" "$CHANGESET_DIR" "$BUCKET_DIR/replication/minute" |
| 132 | + |
| 133 | + echo "Processing range: $seqno_start - $seqno_end" |
| 134 | + |
| 135 | + # Find files in the specified range |
| 136 | + for seqno in $(seq "$seqno_start" "$seqno_end"); do |
| 137 | + adiff_file="$REPLICATION_ADIFFS_DIR/${seqno}.adiff" |
| 138 | + echo "Processing adiff file: $adiff_file" |
| 139 | + [ ! -f "$adiff_file" ] && continue |
| 140 | + |
| 141 | + seqno=$(basename -s .adiff "$adiff_file") |
| 142 | + tmpdir=$(mktemp -d) |
| 143 | + |
| 144 | + # split the adiff file |
| 145 | + python split_adiff.py "$adiff_file" "$tmpdir" |
| 146 | + |
| 147 | + for file in "$tmpdir"/*.adiff; do |
| 148 | + [ ! -f "$file" ] && continue |
| 149 | + changeset=$(basename -s .adiff "$file") |
| 150 | + mkdir -p "${SPLIT_ADIFFS_DIR}/${changeset}" |
| 151 | + mv "$file" "${SPLIT_ADIFFS_DIR}/${changeset}/${seqno}.adiff" |
| 152 | + done |
| 153 | + |
| 154 | + rm -rf "$tmpdir" |
| 155 | + |
| 156 | + # move the adiff file to the output directory. this means it won't be processed |
| 157 | + # again in the future and can be uploaded to R2 and deleted locally. |
| 158 | + # compress it first |
| 159 | + tmpfile=$(mktemp) |
| 160 | + gzip -c < "$adiff_file" > "$tmpfile" |
| 161 | + # move it into place atomically |
| 162 | + mkdir -p "${BUCKET_DIR}/replication/minute" |
| 163 | + mv "$tmpfile" "${BUCKET_DIR}/replication/minute/$(basename "$adiff_file")" |
| 164 | + rm "$adiff_file" |
| 165 | + done |
| 166 | + |
| 167 | + # merge all our split files, potentially updating existing changesets. |
| 168 | + # this is done using a makefile script in order to avoid needlessly reprocessing |
| 169 | + # changesets whose set of input (split-adiffs/) files haven't changed. |
| 170 | + [ -f "merge.mk" ] && make -f merge.mk || true |
| 171 | + |
| 172 | + upload_diff_files "once" |
| 173 | + |
| 174 | + # clean up old stage-data that we don't need anymore |
| 175 | + [ -f "gc.sh" ] && ./gc.sh "$SPLIT_ADIFFS_DIR" "$CHANGESET_DIR" || true |
| 176 | +} |
0 commit comments