Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions web-agent/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,11 +96,11 @@ Steps for customer

3. Run command:
```commandline
python3 worker.py --serverUrl 'https://app.armorcode.com' --apiKey `<apiKey>` --index 0 --timeout 30
python3 worker.py --serverUrl 'https://app.armorcode.com' --apiKey `<apiKey>`
```
4. If you don't want to do certificates validations (needed in case if VM don't have any certificates assigned and making https request) pass this extra argument at the end
4. If you want to do certificates validations (needed in case if API end point requires valid certificates) pass this extra argument at the end
```commandline
--verify=False
--verify
```

5. If you have HTTPS proxy to make calls to ArmorCode API, add this argument. ex ##
Expand Down
124 changes: 124 additions & 0 deletions web-agent/Test/AgentIntegrationTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import os
import time
import unittest
import subprocess
import threading
import zipfile
from random import random
from urllib.request import urlretrieve

import Mock_ArmorCode_server
from MockClientServer import client_app
from Resources import TestCases
from TestHelper import state_manager

file_directory = "Resources/Files/FilesToUpload"

class TestArmorCodeAgent(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Create necessary directories
os.makedirs(file_directory, exist_ok=True)

# Create files mentioned in TestCases.py
# for case in TestCases.test_cases:
# if case['response_file_path']:
# file_path = os.path.join(file_directory, case['response_file_path'])
# if case['task_id'] == 'big_file_task':
# # Create a 300 MB zip file
# cls.create_large_zip_file(file_path, 300 * 1024 * 1024) # 300 MB in bytes
# else:
# with open(file_path, 'w') as f:
# f.write(f"Mock content for {case['task_id']}")

# Download worker.py and requirements.txt
urlretrieve('https://raw.githubusercontent.com/armor-code/agent/refs/heads/main/web-agent/app/worker.py', 'worker.py')
urlretrieve('https://raw.githubusercontent.com/armor-code/agent/refs/heads/main/web-agent/requirements.txt', 'requirements.txt')

# Install requirements
subprocess.run(['pip3', 'install', '-r', 'requirements.txt'])

#Start Mock_ArmorCode_server
cls.ac_thread = threading.Thread(target=lambda: Mock_ArmorCode_server.ac_app.run(port=5000))
cls.ac_thread.daemon = True
cls.ac_thread.start()

# Start MockClientServer
cls.client_thread = threading.Thread(target=lambda: client_app.run(port=5001))
cls.client_thread.daemon = True
cls.client_thread.start()

# Wait for servers to start
time.sleep(2)

@staticmethod
def create_large_zip_file(file_path, size_bytes):
with zipfile.ZipFile(file_path, 'w', zipfile.ZIP_DEFLATED) as zip_file:
# Create a large file with random data
chunk_size = 1024 * 1024 # 1 MB
remaining_size = size_bytes
while remaining_size > 0:
chunk = os.urandom(min(chunk_size, remaining_size))

zip_file.writestr(f"data_{remaining_size}.bin", chunk)
remaining_size -= len(chunk)

def setUp(self):
# Clear task_result_map before each test
Mock_ArmorCode_server.task_result_map.clear()

def run_worker_and_wait(self, task_id):
# Start worker process
worker_process = subprocess.Popen(['python3', 'worker.py', '--serverUrl', 'http://localhost:5000', '--apiKey', 'test_api_key', '--index', 'test_index', '--timeout', '30', '--verify', 'False', '--debugMode', 'False']);

# Wait for task result (max 60 seconds)
start_time = time.time()
while time.time() - start_time < 60:
if task_id in Mock_ArmorCode_server.task_result_map:
break
time.sleep(1)

# Stop worker process
worker_process.terminate()
worker_process.wait()

return Mock_ArmorCode_server.task_result_map.get(task_id)

def test_tasks(self):
for case in TestCases.test_cases:
with self.subTest(task_id=case['task_id']):
# Set task type to return
state_manager.set_task_type(case['task_id'])
# Run worker and wait for result
result = self.run_worker_and_wait(case['task_id'])

# Assert that we got a result
self.assertIsNotNone(result, f"No result received for task {case['task_id']}")

# Add more specific assertions based on expected results
# For example:
if case['task_id'] == 'json_task':
self.assertEqual(result['status'], 'success')
self.assertIn('data', result)
elif case['task_id'] == 'timeout_task':
self.assertEqual(result['status'], 'error')
self.assertIn('timeout', result['message'].lower())
# Add more assertions for other task types

@classmethod
def tearDownClass(cls):
# Clean up created files
for case in TestCases.test_cases:
if case['response_file_path']:
file_path = os.path.join('Test/Resources/Files/S3_upload', case['response_file_path'])
if os.path.exists(file_path):
os.remove(file_path)

# Remove downloaded files
if os.path.exists('worker.py'):
os.remove('worker.py')
if os.path.exists('requirements.txt'):
os.remove('requirements.txt')

if __name__ == '__main__':
unittest.main()
5 changes: 5 additions & 0 deletions web-agent/Test/HttpTeleportTask.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
class HttpTeleportTask:
def __init__(self, task_id, url, method):
self.task_id = task_id
self.url = url
self.method = method
36 changes: 36 additions & 0 deletions web-agent/Test/LocalTest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import os

import requests


def test_mock_s3_upload():
# URL of the mock S3 upload endpoint
url = 'http://localhost:5000/mock-s3-upload'

# Path to a test file
test_file_path = 'test_file.txt'

# Create a test file
with open(test_file_path, 'w') as f:
f.write('This is a test file for mock S3 upload.')

# Open the file and send it in a PUT request
with open(test_file_path, 'rb') as f:
files = {'file': ('test_file.txt', f)}
response = requests.put(url, files=files)

# Print the response
print(f"Status Code: {response.status_code}")
print(f"Response: {response.json()}")

# Clean up - remove the test file
os.remove(test_file_path)

# Check if the file was uploaded to the S3_upload folder
uploaded_file_path = os.path.join('Resources', 'Files', 'S3_upload', 'test_file.txt')
if os.path.exists(uploaded_file_path):
print(f"File successfully uploaded to {uploaded_file_path}")
else:
print("File upload failed")

test_mock_s3_upload()
46 changes: 46 additions & 0 deletions web-agent/Test/MockClientServer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os

from flask import Flask, request, jsonify, send_file
from werkzeug.serving import run_simple
import Resources.TestCases as TestCases

client_app = Flask("client_server")
test_cases = TestCases.test_cases
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
UPLOAD_DIR = os.path.join(BASE_DIR, 'Test', 'Resources', 'Files', 'S3_upload')


client_app = Flask("client_server")


@client_app.before_request
def initialize_tasks():
return


@client_app.route('/<path:path>', methods=['GET'])
def serve_file(path):
for case in test_cases:
if case['url_path'] == f"/{path}":
if path == "timeout":
return "", 504
elif path == "error_400":
return "", 400
elif path == "error_500":
return "", 500
elif case['response_file_path']:
# Construct the full path to the file
file_path = os.path.join(UPLOAD_DIR, os.path.basename(case['response_file_path']))
if os.path.exists(file_path):
return send_file(
file_path,
mimetype=case['headers'].get('Content-Type', 'application/octet-stream'),
as_attachment=True
)
else:
return f"File not found: {file_path}", 404
return "", 404

if __name__ == "__main__":
# Run both servers
run_simple('localhost', 5001, client_app, use_reloader=True, use_debugger=True, use_evalex=True)
141 changes: 141 additions & 0 deletions web-agent/Test/Mock_ArmorCode_server.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import json
import gzip
import os

import requests
from flask import Flask, request, jsonify, send_file
from werkzeug.serving import run_simple
import Resources.TestCases as TestCases

from werkzeug.utils import secure_filename
from TaskStatus import TaskStatusEnum
from TestHelper import VerificationResult, verify_task_result, state_manager

import HttpTeleportTask

# Mock Armorcode Server
ac_app = Flask("ac_server")


task_map = {}
task_result_map = dict()

config_file = "config.properties"
# Function to read the variable from the file



# Armorcode Server Routes
@ac_app.before_request
def initialize_tasks():
global task_map, task_result_map
task_map = TestCases.retrun_task_map()
task_keys = TestCases.return_key_list()
for key in task_keys:
task_map[key] = TaskStatusEnum.IN_QUEUE

@ac_app.route('/api/http-teleport/get-task', methods=['GET'])
def get_task():
# Get taskType from query params, default to 'default' if not provided
task_type = state_manager.get_task_type()
task = task_map.get(task_type, None)

if task is None:
return jsonify({"error": "No task found"}), 404

task_map[task_type] = TaskStatusEnum.IN_PROGRESS

return jsonify({"data": {
"taskId": task["task_id"],
"url": f"http://localhost:5001{task['url_path']}",
"method": "GET"
}})

@ac_app.route('/api/http-teleport/put-result', methods=['POST'])
def put_result():
print(f"Received result: {request.json}")

result = request.json
if not result or 'taskId' not in result:
return jsonify({"status": "error", "message": "Invalid result format"}), 400

task_id = result['taskId']
task = next((t for t in TestCases.test_cases if t['task_id'] == task_id), None)

if not task:
return jsonify({"status": "error", "message": "Task not found"}), 404

verification_result, message = verify_task_result(task, result)

if verification_result == VerificationResult.SUCCESS:
task_map[task_id] = TaskStatusEnum.PASSED
return jsonify({"status": "success", "message": message}), 200

else:
task_map[task_id] = TaskStatusEnum.FAILED
return jsonify({"status": "error", "message": message}), 400

@ac_app.route('/api/http-teleport/upload-result', methods=['POST'])
def upload_result():
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400

if 'task' not in request.form:
return jsonify({"error": "No task data"}), 400

task_data = request.form['task']

if file:
filename = secure_filename(file.filename)
file.save(os.path.join(ac_app.config['UPLOAD_FOLDER'], filename))

# Process the task data as needed
# For example, you might want to save it to a database or perform some operation

return jsonify({
"data": f"File {filename} uploaded successfully for task",
"success": True,
"message": "Upload completed"
}), 200

@ac_app.route('/api/http-teleport/upload-url', methods=['GET'])
def get_s3_path():
task_id = request.args.get('fileName', '')
return jsonify({
"data": {
"putUrl": f"http://localhost:5000/mock-s3-upload?taskId={task_id}",
"getUrl": f"http://localhost:5000/mock-s3-download?taskId={task_id}"
}
})

@ac_app.route('/mock-s3-upload', methods=['PUT'])
def mock_s3_upload():
task_id = request.args.get('taskId')
if not task_id:
return jsonify({"error": "Missing taskId parameter"}), 400
if 'file' not in request.files:
return jsonify({"error": "No file part"}), 400

file = request.files['file']
if file.filename == '':
return jsonify({"error": "No selected file"}), 400

if file:
filename = secure_filename(file.filename)
upload_folder = os.path.join('Resources', 'Files', 'S3_upload')

if not os.path.exists(upload_folder):
os.makedirs(upload_folder)

file_path = os.path.join(upload_folder, filename)
file.save(file_path)

return jsonify({"message": "File uploaded successfully", "filename": filename}), 200



if __name__ == "__main__":
run_simple('localhost', 5000, ac_app, use_reloader=True, use_debugger=True, use_evalex=True)
Empty file.
Loading