#!/usr/bin/env python3 """Active Directory ACL abuse detection using ldap3 to find dangerous permissions.""" import argparse import json import struct from ldap3 import Server, Connection, ALL, NTLM, SUBTREE DANGEROUS_MASKS = { "GenericAll": 0x10000000, "GenericWrite": 0x40000000, "WriteDACL": 0x00040000, "WriteOwner": 0x00080000, "WriteProperty": 0x00000020, "Self": 0x00000008, "ExtendedRight": 0x00000100, "DeleteChild": 0x00000002, "Delete": 0x00010000, } ADMIN_SIDS = { "S-1-5-18", "S-1-5-32-544", "S-1-5-9", } ADMIN_RID_SUFFIXES = { "-500", "-512", "-516", "-518", "-519", "-498", } ATTACK_PATHS = { "GenericAll": { "user": "Full control allows password reset, Kerberoasting via SPN, or shadow credential attack", "group": "Full control allows adding arbitrary members to the group", "computer": "Full control allows resource-based constrained delegation attack", "organizationalUnit": "Full control allows linking malicious GPO or moving objects", }, "WriteDACL": { "user": "Can modify DACL to grant self GenericAll, then reset password", "group": "Can modify DACL to grant self write membership, then add self", "computer": "Can modify DACL to grant self full control on machine account", "organizationalUnit": "Can modify DACL to gain control over OU child objects", }, "WriteOwner": { "user": "Can take ownership then modify DACL to escalate privileges", "group": "Can take ownership of group then modify membership", "computer": "Can take ownership then configure delegation abuse", "organizationalUnit": "Can take ownership then control OU policies", }, "GenericWrite": { "user": "Can write scriptPath for logon script execution or modify SPN for Kerberoasting", "group": "Can modify group attributes including membership", "computer": "Can write msDS-AllowedToActOnBehalfOfOtherIdentity for RBCD attack", "organizationalUnit": "Can modify OU attributes and link GPO", }, } def is_admin_sid(sid: str, domain_sid: str) -> bool: if sid in ADMIN_SIDS: return True for suffix in ADMIN_RID_SUFFIXES: if sid == domain_sid + suffix: return True return False def parse_sid(raw: bytes) -> str: if len(raw) < 8: return "" revision = raw[0] sub_auth_count = raw[1] authority = int.from_bytes(raw[2:8], byteorder="big") subs = [] for i in range(sub_auth_count): offset = 8 + i * 4 if offset + 4 > len(raw): break subs.append(struct.unpack(" list: aces = [] if len(descriptor_bytes) < 20: return aces revision = descriptor_bytes[0] control = struct.unpack("= len(descriptor_bytes): return aces dacl = descriptor_bytes[dacl_offset:] if len(dacl) < 8: return aces acl_size = struct.unpack(" len(dacl): break ace_type = dacl[offset] ace_flags = dacl[offset + 1] ace_size = struct.unpack(" len(dacl): break if ace_type in (0x00, 0x05): if offset + 8 <= len(dacl): access_mask = struct.unpack(" str: try: conn.search(base_dn, f"(objectSid={sid})", attributes=["sAMAccountName", "cn"]) if conn.entries: entry = conn.entries[0] return str(entry.sAMAccountName) if hasattr(entry, "sAMAccountName") else str(entry.cn) except Exception: pass return sid def get_domain_sid(conn: Connection, base_dn: str) -> str: conn.search(base_dn, "(objectClass=domain)", attributes=["objectSid"]) if conn.entries: raw = conn.entries[0].objectSid.raw_values[0] return parse_sid(raw) return "" def analyze_acls(dc_ip: str, domain: str, username: str, password: str, target_ou: str) -> dict: server = Server(dc_ip, get_info=ALL, use_ssl=False) domain_parts = domain.split(".") base_dn = ",".join(f"DC={p}" for p in domain_parts) search_base = target_ou if target_ou else base_dn ntlm_user = f"{domain}\\{username}" conn = Connection(server, user=ntlm_user, password=password, authentication=NTLM, auto_bind=True) domain_sid = get_domain_sid(conn, base_dn) conn.search( search_base, "(|(objectClass=user)(objectClass=group)(objectClass=computer)(objectClass=organizationalUnit))", search_scope=SUBTREE, attributes=["distinguishedName", "sAMAccountName", "objectClass", "nTSecurityDescriptor"], ) findings = [] objects_scanned = 0 sid_cache = {} for entry in conn.entries: objects_scanned += 1 dn = str(entry.distinguishedName) obj_classes = [str(c) for c in entry.objectClass.values] if hasattr(entry, "objectClass") else [] obj_type = "unknown" for oc in obj_classes: if oc.lower() in ("user", "group", "computer", "organizationalunit"): obj_type = oc.lower() break if not hasattr(entry, "nTSecurityDescriptor"): continue raw_sd = entry.nTSecurityDescriptor.raw_values if not raw_sd: continue sd_bytes = raw_sd[0] aces = parse_acl(sd_bytes) for ace in aces: trustee_sid = ace["trustee_sid"] if is_admin_sid(trustee_sid, domain_sid): continue if trustee_sid not in sid_cache: sid_cache[trustee_sid] = resolve_sid(conn, base_dn, trustee_sid) trustee_name = sid_cache[trustee_sid] for perm in ace["permissions"]: if perm in ("Delete", "DeleteChild", "Self", "WriteProperty", "ExtendedRight"): severity = "medium" else: severity = "critical" attack = ATTACK_PATHS.get(perm, {}).get(obj_type, f"{perm} on {obj_type} may allow privilege escalation") findings.append({ "severity": severity, "target_object": dn, "target_type": obj_type, "trustee": trustee_name, "trustee_sid": trustee_sid, "permission": perm, "access_mask": ace["access_mask"], "ace_type": ace["ace_type"], "attack_path": attack, "remediation": f"Remove {perm} ACE for {trustee_name} on {dn}", }) conn.unbind() findings.sort(key=lambda f: 0 if f["severity"] == "critical" else 1) return { "domain": domain, "domain_sid": domain_sid, "search_base": search_base, "objects_scanned": objects_scanned, "dangerous_aces_found": len(findings), "findings": findings, } def main(): parser = argparse.ArgumentParser(description="Active Directory ACL Abuse Analyzer") parser.add_argument("--dc-ip", required=True, help="Domain Controller IP address") parser.add_argument("--domain", required=True, help="AD domain name (e.g., corp.example.com)") parser.add_argument("--username", required=True, help="Domain username for LDAP bind") parser.add_argument("--password", required=True, help="Domain user password") parser.add_argument("--target-ou", default=None, help="Target OU distinguished name to scope the search") parser.add_argument("--output", default=None, help="Output JSON file path") args = parser.parse_args() result = analyze_acls(args.dc_ip, args.domain, args.username, args.password, args.target_ou) report = json.dumps(result, indent=2) if args.output: with open(args.output, "w") as f: f.write(report) print(report) if __name__ == "__main__": main()