YaraValidator API
An API is available to integrate YaraValidator in any process involving Yara rules, should you choose to do so. Interactions with the backend return strictly the same information as using the web portal.
The API is quite straightforward to use. Interactions can be broken down into 3 steps:
POST
a yara rule on the/validate
endpoint and get an identifier for the corresponding job.- Poll the status of the job on
/task/<id>
until its status isfinished
(or, but hopefully not,failed
). - Read the results in the object returned with the
finished
status.
Sample data returned by the API can be found below:
{
"data": {
"status": "finished",
"task_id": "2f44f89b4da4d460bfed47548983f9a65557792a6cef8a92e9d4a0a62dcc33dd",
"task_result": {
"2.0.0": [
"Error",
"Line 13: error: syntax error, unexpected _IDENTIFIER_, expecting _CONDITION_"
],
"2.1.0": [
"Error",
"Line 13: error: syntax error, unexpected _IDENTIFIER_, expecting _CONDITION_"
],
"...": [
"...",
"..."
],
"4.4.0": [
"OK",
null
]
}
}
}
Python script
Below, you will find a complete Python script which automates submissions to the platform and prints results to the terminal.
ivan@box:~$ python3 api.py -h
usage: api.py [-h] [-r] rule
Compile a Yara rule on all existing Yara versions.
positional arguments:
rule The rule to test
options:
-h, --help show this help message and exit
-r Scan folder recursively and test all rules.
#!/usr/bin/python3
import argparse
import json
import os
import requests
import time
from urllib.parse import urljoin
import click
import columnar
URL = "https://yaravalidator.manalyzer.org/"
s = requests.Session()
def compile_rule(r: str) -> dict:
with open(r, 'rb') as rule_file:
to_upload = {'file': rule_file}
response = s.post(urljoin(URL, "/validate"), files=to_upload)
if response.status_code != 200:
raise RuntimeError(f"Error: the endpoint returned an invalid status ({response.status_code})")
api_msg = json.loads(response.text)
return poll_api(api_msg["data"]["task_id"])
def poll_api(task_id: str) -> dict:
response = s.get(urljoin(URL, f"/task/{task_id}"))
api_msg = json.loads(response.text)
if api_msg["status"] == "queued" or api_msg["status"] == "started":
time.sleep(1) # Job hasn't run yet, retry in a bit
return poll_api(task_id)
elif api_msg["status"] == "failed":
raise RuntimeError("Error: the task did not complete successfully.")
else:
return api_msg["data"]["task_result"]
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Compile a Yara rule on all existing Yara versions.')
parser.add_argument('rule', help='The rule to test', type=str)
parser.add_argument("-r", action="store_true", help="Scan folder recursively and test all rules.")
args = parser.parse_args()
if not args.r:
file_list = [args.rule]
else:
file_list = []
if not os.path.isdir(args.rule):
print(f"Error: {args.rule} is not a directory!")
else:
for root, dirs, files in os.walk(args.rule):
for file in files:
if file.endswith('.yar') or file.endswith('.yara'):
file_list.append(os.path.join(root, file))
if not len(file_list):
print("No files found.")
else:
for f in file_list:
try:
compilation_status = compile_rule(f)
except RuntimeError as e:
print(f"Error when processing {f}: {str(e)}")
continue
# We obtained results from the API. Print them nicely to the console.
list_results = [[key, value[0], value[1]] for key, value in compilation_status.items()]
header = ["Yara version", "Compilation status", "Errors"]
patterns = [
('^OK$', lambda text: click.style(text, fg='bright_green')),
('^Error$', lambda text: click.style(text, fg='bright_red', bold=True)),
('^Slow|Warnings$', lambda text: click.style(text, fg='bright_yellow')),
]
print(f"{f}:")
table = columnar.columnar(list_results, headers=header, no_borders=True, patterns=patterns)
print(table)