-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathavailable_tools.py
More file actions
104 lines (96 loc) · 4.34 KB
/
available_tools.py
File metadata and controls
104 lines (96 loc) · 4.34 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import logging
class VersionException(Exception):
pass
class FileIDException(Exception):
pass
class FileTypeException(Exception):
pass
def add_yaml_to_dict(table, key, yaml):
"""Add the yaml to the table, but raise an error if the id isn't unique """
if key in table:
raise FileIDException(str(key))
table.update({key: yaml})
class AvailableTools:
"""
This class is used for storing dictionaries of all the available
personalities, taskflows, and prompts.
"""
def __init__(self, yamls: dict):
self.personalities = {}
self.taskflows = {}
self.prompts = {}
self.toolboxes = {}
self.model_config = {}
self.namespace_config = {}
# Iterate through all the yaml files and divide them into categories.
# Each file should contain a header like this:
#
# seclab-taskflow-agent:
# type: taskflow
# version: 1
#
for path, yaml in yamls.items():
try:
header = yaml['seclab-taskflow-agent']
version = header['version']
if version != 1:
raise VersionException(str(version))
filekey = header['filekey']
filetype = header['filetype']
if filetype == 'personality':
add_yaml_to_dict(self.personalities, filekey, yaml)
elif filetype == 'taskflow':
add_yaml_to_dict(self.taskflows, filekey, yaml)
elif filetype == 'prompt':
add_yaml_to_dict(self.prompts, filekey, yaml)
elif filetype == 'toolbox':
add_yaml_to_dict(self.toolboxes, filekey, yaml)
elif filetype == 'model_config':
add_yaml_to_dict(self.model_config, filekey, yaml)
elif filetype == 'namespace_config':
add_yaml_to_dict(self.namespace_config, filekey, yaml)
else:
raise FileTypeException(str(filetype))
except KeyError as err:
logging.error(f'{path} does not contain the key {err.args[0]}')
except VersionException as err:
logging.error(f'{path}: seclab-taskflow-agent version {err.args[0]} is not supported')
except FileIDException as err:
logging.error(f'{path}: file ID {err.args[0]} is not unique')
except FileTypeException as err:
logging.error(f'{path}: seclab-taskflow-agent file type {err.args[0]} is not supported')
def copy_with_alias(self, alias_dict : dict) -> dict:
def _copy_add_alias_to_dict(original_dict : dict, alias_dict : dict) -> dict:
new_dict = dict(original_dict)
alias_keys = alias_dict.keys()
for k,v in original_dict.items():
for ak in alias_keys:
if k.startswith(ak) and len(k) > len(ak) and k[len(ak)] == '/':
new_key = alias_dict[ak] + k[len(ak):]
new_dict[new_key] = v
return new_dict
new_available_tools = AvailableTools({})
new_available_tools.personalities = _copy_add_alias_to_dict(self.personalities, alias_dict)
new_available_tools.taskflows = _copy_add_alias_to_dict(self.taskflows, alias_dict)
new_available_tools.prompts = _copy_add_alias_to_dict(self.prompts, alias_dict)
#toolboxes are looked up after canonicalized
new_available_tools.toolboxes = dict(self.toolboxes)
new_available_tools.model_config = _copy_add_alias_to_dict(self.model_config, alias_dict)
new_available_tools.namespace_config = _copy_add_alias_to_dict(self.namespace_config, alias_dict)
return new_available_tools
def canonicalize_toolboxes(toolboxes : list, alias_dict : dict) -> list:
"""
Toolboxes need to be canonicalized because both personalities and taskflows can use toolboxes with potentially different aliases
"""
out = set()
if not alias_dict:
return toolboxes
for tb in toolboxes:
found_alias = False
for k,v in alias_dict.items():
if tb.startswith(v) and len(tb) > len(v) and tb[len(v)] == '/':
out.add(k + tb[len(v):])
found_alias = True
if not found_alias:
out.add(tb)
return list(out)