Source: MarkTechPost
In this tutorial, we build a workflow that combines Magika’s deep-learning-based file type detection with OpenAI’s language intelligence to create a practical and insightful analysis pipeline. We begin by setting up the required libraries, securely connecting to the OpenAI API, and initializing Magika to classify files directly from raw bytes rather than relying on filenames or extensions. As we move through the tutorial, we explore batch scanning, confidence modes, spoofed-file detection, forensic-style analysis, upload-pipeline risk scoring, and structured JSON reporting. At each stage, we use GPT to translate technical scan outputs into clear explanations, security insights, and executive-level summaries, allowing us to connect low-level byte detection with meaningful real-world interpretation.
!pip install magika openai -q import os, io, json, zipfile, textwrap, hashlib, tempfile, getpass from pathlib import Path from collections import Counter from magika import Magika from magika.types import MagikaResult, PredictionMode from openai import OpenAI print("🔑 Enter your OpenAI API key (input is hidden):") api_key = getpass.getpass("OpenAI API Key: ") client = OpenAI(api_key=api_key) try: client.models.list() print("✅ OpenAI connected successfullyn") except Exception as e: raise SystemExit(f"❌ OpenAI connection failed: {e}") m = Magika() print("✅ Magika loaded successfullyn") print(f" module version : {m.get_module_version()}") print(f" model name : {m.get_model_name()}") print(f" output types : {len(m.get_output_content_types())} supported labelsn") def ask_gpt(system: str, user: str, model: str = "gpt-4o", max_tokens: int = 600) -> str: resp = client.chat.completions.create( model=model, max_tokens=max_tokens, messages=[ {"role": "system", "content": system}, {"role": "user", "content": user}, ], ) return resp.choices[0].message.content.strip() print("=" * 60) print("SECTION 1 — Core API + GPT Plain-Language Explanation") print("=" * 60) samples = { "Python": b'import osndef greet(name):n print(f"Hello, {name}")n', "JavaScript": b'const fetch = require("node-fetch");nasync function getData() { return await fetch("https://www.marktechpost.com/api"); }', "CSV": b'name,age,citynAlice,30,NYCnBob,25,LAn', "JSON": b'{"name": "Alice", "scores": [10, 20, 30], "active": true}', "Shell": b'#!/bin/bashnecho "Hello"nfor i in $(seq 1 5); do echo $i; done', "PDF magic": b'%PDF-1.4n1 0 objn<< /Type /Catalog >>nendobjn', "ZIP magic": bytes([0x50, 0x4B, 0x03, 0x04]) + bytes(26), } print(f"n{'Label':<12} {'MIME Type':<30} {'Score':>6}") print("-" * 52) magika_labels = [] for name, raw in samples.items(): res = m.identify_bytes(raw) magika_labels.append(res.output.label) print(f"{res.output.label:<12} {res.output.mime_type:<30} {res.score:>5.1%}") explanation = ask_gpt( system="You are a concise ML engineer. Explain in 4–5 sentences.", user=( f"Magika is Google's AI file-type detector. It just identified these types from raw bytes: " f"{magika_labels}. Explain how a deep-learning model detects file types from " "just bytes, and why this beats relying on file extensions." ), max_tokens=250, ) print(f"n💬 GPT on how Magika works:n{textwrap.fill(explanation, 72)}n") print("=" * 60) print("SECTION 2 — Batch Identification + GPT Summary") print("=" * 60) tmp_dir = Path(tempfile.mkdtemp()) file_specs = { "code.py": b"import sysnprint(sys.version)n", "style.css": b"body { font-family: Arial; margin: 0; }n", "data.json": b'[{"id": 1, "val": "foo"}, {"id": 2, "val": "bar"}]', "script.sh": b"#!/bin/shnecho Hello Worldn", "doc.html": b"Hello
", "config.yaml": b"server:n host: localhostn port: 8080n", "query.sql": b"CREATE TABLE t (id INT PRIMARY KEY, name TEXT);n", "notes.md": b"# Headingnn- item onen- item twon", } paths = [] for fname, content in file_specs.items(): p = tmp_dir / fname p.write_bytes(content) paths.append(p) results = m.identify_paths(paths) batch_summary = [ {"file": p.name, "label": r.output.label, "group": r.output.group, "score": f"{r.score:.1%}"} for p, r in zip(paths, results) ] print(f"n{'File':<18} {'Label':<14} {'Group':<12} {'Score':>6}") print("-" * 54) for row in batch_summary: print(f"{row['file']:<18} {row['label']:<14} {row['group']:<12} {row['score']:>6}") gpt_summary = ask_gpt( system="You are a DevSecOps expert. Be concise and practical.", user=( f"A file upload scanner detected these file types in a batch: " f"{json.dumps(batch_summary)}. " "In 3–4 sentences, summarise what kind of project this looks like " "and flag any file types that might warrant extra scrutiny." ), max_tokens=220, ) print(f"n💬 GPT project analysis:n{textwrap.fill(gpt_summary, 72)}n")
We install the required libraries, connect Magika and OpenAI, and set up the core helper function that lets us send prompts for analysis. We begin by testing Magika on various raw byte samples to see how it identifies file types without relying on file extensions. We also create a batch of sample files and use GPT to summarize what kind of project or codebase the detected file collection appears to represent.
print("=" * 60) print("SECTION 3 — Prediction Modes + GPT Mode-Selection Guidance") print("=" * 60) ambiguous = b"Hello, world. This is a short text." mode_results = {} for mode in [PredictionMode.HIGH_CONFIDENCE, PredictionMode.MEDIUM_CONFIDENCE, PredictionMode.BEST_GUESS]: m_mode = Magika(prediction_mode=mode) res = m_mode.identify_bytes(ambiguous) mode_results[mode.name] = { "label": res.output.label, "score": f"{res.score:.1%}", } print(f" {mode.name:<22} label={res.output.label:<20} score={res.score:.1%}") guidance = ask_gpt( system="You are a security engineer. Be concise (3 bullet points).", user=( f"Magika's three confidence modes returned: {json.dumps(mode_results)} " "for the same ambiguous text snippet. Give one practical use-case where each mode " "(HIGH_CONFIDENCE, MEDIUM_CONFIDENCE, BEST_GUESS) is the right choice." ), max_tokens=220, ) print(f"n💬 GPT on when to use each mode:n{guidance}n") print("=" * 60) print("SECTION 4 — MagikaResult Anatomy + GPT Field Explanation") print("=" * 60) code_snippet = b""" #!/usr/bin/env python3 from typing import List def fibonacci(n: int) -> List[int]: a, b = 0, 1 result = [] for _ in range(n): result.append(a) a, b = b, a + b return result """ res = m.identify_bytes(code_snippet) result_dict = { "output.label": res.output.label, "output.description": res.output.description, "output.mime_type": res.output.mime_type, "output.group": res.output.group, "output.extensions": res.output.extensions, "output.is_text": res.output.is_text, "dl.label": res.dl.label, "dl.description": res.dl.description, "dl.mime_type": res.dl.mime_type, "score": round(res.score, 4), } for k, v in result_dict.items(): print(f" {k:<28} = {v}") field_explanation = ask_gpt( system="You are a concise ML engineer.", user=( f"Magika returned this result object for a Python file: {json.dumps(result_dict)}. " "In 4 sentences, explain the difference between the `dl.*` fields and `output.*` fields, " "and why dl.label and output.label might differ even though there is only one score." ), max_tokens=220, ) print(f"n💬 GPT explains dl vs output:n{textwrap.fill(field_explanation, 72)}n") print("=" * 60) print("SECTION 5 — Spoofed Files + GPT Threat Assessment") print("=" * 60) spoofed_files = { "invoice.pdf": b'#!/usr/bin/env python3nprint("I am Python, not a PDF!")n', "photo.jpg": b'This is HTML masquerading as JPEG', "data.csv": bytes([0x50, 0x4B, 0x03, 0x04]) + bytes(26), "readme.txt": b'%PDF-1.4n1 0 objn<>nendobjn', "legit.py": b'import sysnprint(sys.argv)n', } ext_to_expected = {"pdf": "pdf", "jpg": "jpeg", "csv": "zip", "txt": "pdf", "py": "python"} threats = [] print(f"n{'Filename':<18} {'Expected':^10} {'Detected':^14} {'Match':^6} {'Score':>6}") print("-" * 62) for fname, content in spoofed_files.items(): ext = fname.rsplit(".", 1)[-1] expected = ext_to_expected.get(ext, ext) res = m.identify_bytes(content) detected = res.output.label match = "✅" if detected == expected else "🚨" if detected != expected: threats.append({"file": fname, "claimed_ext": ext, "actual_type": detected}) print(f"{fname:<18} {expected:^10} {detected:^14} {match:^6} {res.score:>5.1%}") threat_report = ask_gpt( system="You are a SOC analyst. Be specific and concise.", user=( f"Magika detected these extension-spoofed files: {json.dumps(threats)}. " "For each mismatch, describe in one sentence what the likely threat vector is " "and what action a security team should take." ), max_tokens=300, ) print(f"n💬 GPT threat assessment:n{threat_report}n")
We explore Magika’s prediction modes and compare how different confidence settings behave when the input is ambiguous. We then inspect the structure of the Magika result object in detail to understand the distinction between processed output fields and raw model fields. After that, we test spoofed files with misleading extensions and use GPT to explain the likely threat vectors and recommended security responses.
print("=" * 60) print("SECTION 6 — Corpus Distribution + GPT Insight") print("=" * 60) corpus = [ b"SELECT * FROM orders WHERE status='open';", b"page", b"import numpy as npnprint(np.zeros(10))", b"body { color: red; }", b'{"key": "value"}', b"name,scorenAlice,95nBob,87", b"# Titlen## Sectionn- bullet", b"echo hellonls -la", b"const x = () => 42;", b"package mainnimport "fmt"nfunc main() { fmt.Println("Go") }", b"public class Hello { public static void main(String[] a) {} }", b"fn main() { println!("Rust!"); }", b"#!/usr/bin/env rubynputs 'hello'", b"", b"[section]nkey=valuenanother=thing", b"FROM python:3.11nCOPY . /appnCMD python app.py", b"apiVersion: v1nkind: Podnmetadata:n name: test", ] all_results = [m.identify_bytes(b) for b in corpus] group_counts = Counter(r.output.group for r in all_results) label_counts = Counter(r.output.label for r in all_results) print("nBy GROUP:") for grp, cnt in sorted(group_counts.items(), key=lambda x: -x[1]): print(f" {grp:<12} {'█' * cnt} ({cnt})") print("nBy LABEL:") for lbl, cnt in sorted(label_counts.items(), key=lambda x: -x[1]): print(f" {lbl:<18} {cnt}") distribution = {"groups": dict(group_counts), "labels": dict(label_counts)} insight = ask_gpt( system="You are a staff engineer reviewing a code repository. Be concise.", user=( f"A file scanner found this type distribution: {json.dumps(distribution)}. " "In 3–4 sentences, describe what kind of repository this is, " "and suggest one thing to watch out for from a maintainability perspective." ), max_tokens=220, ) print(f"n💬 GPT repository insight:n{textwrap.fill(insight, 72)}n") print("=" * 60) print("SECTION 7 — Minimum Bytes Needed + GPT Explanation") print("=" * 60) full_python = b"#!/usr/bin/env python3nimport os, sysnprint('hello')n" * 10 probe_data = {} print(f"nFull content size: {len(full_python)} bytes") print(f"n{'Prefix (bytes)':<18} {'Label':<14} {'Score':>6}") print("-" * 40) for size in [4, 8, 16, 32, 64, 128, 256, 512]: res = m.identify_bytes(full_python[:size]) probe_data[str(size)] = {"label": res.output.label, "score": round(res.score, 3)} print(f" first {size:<10} {res.output.label:<14} {res.score:>5.1%}") probe_insight = ask_gpt( system="You are a concise ML engineer.", user=( f"Magika's identification of a Python file at different byte-prefix lengths: " f"{json.dumps(probe_data)}. " "In 3 sentences, explain why a model can identify file types from so few bytes, " "and what architectural choices make this possible." ), max_tokens=200, ) print(f"n💬 GPT on byte-level detection:n{textwrap.fill(probe_insight, 72)}n")
We analyze a mixed corpus of code and configuration content to understand the distribution of detected file groups and labels across a repository-like dataset. We use these results to let GPT infer the repository’s nature and highlight maintainability concerns based on the detected composition. We also probe how many bytes Magika needs for identification and examine how early byte-level patterns can still reveal file identity with useful confidence.
print("=" * 60) print("SECTION 8 — Upload Scanner Pipeline + GPT Risk Scoring") print("=" * 60) upload_dir = Path(tempfile.mkdtemp()) / "uploads" upload_dir.mkdir() uploads = { "report.pdf": b'%PDF-1.4n1 0 objn<>nendobjn', "data_export.csv": b"id,name,emailn1,Alice,[email protected]n2,Bob,[email protected]n", "setup.sh": b"#!/bin/bashnapt-get update && apt-get install -y curln", "config.json": b'{"debug": true, "workers": 4}', "malware.exe": bytes([0x4D, 0x5A]) + bytes(100), "index.html": b"Hello", "main.py": b"from flask import Flasknapp = Flask(__name__)n", "suspicious.txt": bytes([0x4D, 0x5A]) + bytes(50), } for fname, content in uploads.items(): (upload_dir / fname).write_bytes(content) all_paths = list(upload_dir.iterdir()) batch_results = m.identify_paths(all_paths) BLOCKED_LABELS = {"pe", "elf", "macho"} ext_map = {"pdf": "pdf", "csv": "csv", "sh": "shell", "json": "json", "exe": "pe", "html": "html", "py": "python", "txt": "txt"} scan_results = [] print(f"n{'File':<22} {'Label':<16} {'Score':>6} {'Status'}") print("-" * 65) for path, res in zip(all_paths, batch_results): o = res.output ext = path.suffix.lstrip(".") expected = ext_map.get(ext, "") mismatch = expected and (o.label != expected) if o.label in BLOCKED_LABELS: status = "🚫 BLOCKED" elif mismatch: status = f"⚠️ MISMATCH (ext:{expected})" else: status = "✅ OK" scan_results.append({ "file": path.name, "label": o.label, "group": o.group, "score": round(res.score, 3), "status": status.replace("🚫 ", "").replace("⚠️ ", "").replace("✅ ", ""), }) print(f"{path.name:<22} {o.label:<16} {res.score:>5.1%} {status}") risk_report = ask_gpt( system="You are a senior security analyst. Be structured and actionable.", user=( f"A file upload scanner produced these results: {json.dumps(scan_results)}. " "Provide a 5-sentence risk summary: identify the highest-risk files, " "explain why they're risky, and give concrete remediation steps." ), max_tokens=350, ) print(f"n💬 GPT risk report:n{risk_report}n") print("=" * 60) print("SECTION 9 — Forensics + GPT IOC Narrative") print("=" * 60) forensic_samples = [ ("sample_A", b"import renpattern = re.compile(r'\d+')n"), ("sample_B", b'{"attack": "sqli", "payload": "1 OR 1=1"}'), ("sample_C", bytes([0xFF, 0xD8, 0xFF, 0xE0]) + b"JFIF" + bytes(50)), ("sample_D", b""), ("sample_E", b"MZ" + bytes(100)), ] ioc_data = [] print(f"n{'Name':<12} {'SHA256':18} {'Label':<14} {'MIME':<28} {'is_text'}") print("-" * 80) for name, content in forensic_samples: sha = hashlib.sha256(content).hexdigest()[:16] res = m.identify_bytes(content) o = res.output ioc_data.append({ "id": name, "sha256_prefix": sha, "label": o.label, "mime": o.mime_type, "is_text": o.is_text, }) print(f"{name:<12} {sha:<18} {o.label:<14} {o.mime_type:<28} {o.is_text}") ioc_narrative = ask_gpt( system="You are a threat intelligence analyst writing an incident report.", user=( f"During a forensic investigation, these file samples were recovered: " f"{json.dumps(ioc_data)}. " "Write a concise 5-sentence Indicators of Compromise (IOC) narrative " "describing the likely attack chain and what each sample represents." ), max_tokens=350, ) print(f"n💬 GPT IOC narrative:n{ioc_narrative}n")
We simulate a real upload-scanning pipeline that classifies files, compares detected types against expected extensions, and decides whether each file should be allowed, flagged, or blocked. We then move into a forensic scenario in which we generate SHA-256 prefixes, inspect MIME types, and create structured indicators from recovered file samples. Throughout both parts, we use GPT to convert technical scan results into practical risk summaries and concise IOC-style incident narratives.
print("=" * 60) print("SECTION 10 — JSON Report + GPT Executive Summary") print("=" * 60) export_samples = { "api.py": b"from fastapi import FastAPInapp = FastAPI()[email protected]('/')ndef root(): return {}n", "schema.sql": b"CREATE TABLE users (id SERIAL PRIMARY KEY, email TEXT UNIQUE);n", "deploy.yaml": b"name: deploynon: pushnjobs:n build:n runs-on: ubuntu-latestn", "evil.exe": bytes([0x4D, 0x5A]) + bytes(100), "spoof.pdf": b'#!/usr/bin/env python3nprint("not a pdf")n', } report = [] for name, content in export_samples.items(): res = m.identify_bytes(content) o = res.output report.append({ "filename": name, "label": o.label, "description": o.description, "mime_type": o.mime_type, "group": o.group, "is_text": o.is_text, "dl_label": res.dl.label, "score": round(res.score, 4), }) print(json.dumps(report, indent=2)) exec_summary = ask_gpt( system="You are a CISO writing a two-paragraph executive summary. Be clear and non-technical.", user=( f"An AI file scanner analysed these files: {json.dumps(report)}. " "Write a two-paragraph executive summary: paragraph 1 covers what was found " "and the overall risk posture; paragraph 2 gives recommended next steps." ), max_tokens=400, ) print(f"n💬 GPT executive summary:n{exec_summary}n") out_path = "https://www.marktechpost.com/tmp/magika_openai_report.json" with open(out_path, "w") as f: json.dump({"scan_results": report, "executive_summary": exec_summary}, f, indent=2) print(f"💾 Full report saved to: {out_path}") print("n" + "=" * 60) print("✅ Magika + OpenAI Tutorial Complete!") print("=" * 60) print(""" All fixes applied (magika 1.0.2): ✗ from magika import MagikaConfig → removed (never existed) ✗ MagikaConfig(prediction_mode=m) → Magika(prediction_mode=m) ✗ m.get_model_version() → m.get_model_name() ✗ res.output_score → res.score ✗ res.dl_score / res.dl.score → res.score (score only lives on MagikaResult) MagikaResult field map (1.0.2): res.score ← the one and only confidence score res.output.label ← final label after threshold logic (use this) res.dl.label ← raw model label before thresholding (for debugging) res.output.* ← description, mime_type, group, extensions, is_text res.dl.* ← same fields but from the raw model output Sections: §1 Core API (bytes/path/stream) + GPT explains Magika's ML approach §2 Batch scanning + GPT project-type analysis §3 Confidence modes via constructor arg + GPT when-to-use guidance §4 MagikaResult anatomy + GPT explains dl vs output fields §5 Spoofed-file detection + GPT threat assessment per mismatch §6 Corpus distribution + GPT repository insight §7 Byte-prefix probing + GPT explains byte-level detection §8 Upload pipeline (allow/block/flag) + GPT risk report §9 Forensics hash+type fingerprinting + GPT IOC narrative §10 JSON report export + GPT CISO executive summary """)
We build a structured JSON report from multiple analyzed files and capture key metadata, including labels, MIME types, text status, and model confidence scores. We then use GPT to produce a non-technical executive summary that explains the overall findings, risk posture, and recommended next steps in a way that leadership can understand. Finally, we export the results to a JSON file and print a completion summary that reinforces the Magika 1.0.2 fixes and the full scope of the tutorial.
In conclusion, we saw how Magika and OpenAI work together to form a powerful AI-assisted file analysis system that is both technically robust and easy to understand. We use Magika to identify true file types, detect mismatches, inspect suspicious content, and analyze repositories or uploads at scale. At the same time, GPT helps us explain results, assess risks, and generate concise narratives for different audiences. This combination provides a workflow that is useful for developers and researchers, and also for security teams, forensic analysts, and technical decision-makers who need fast, accurate insight from file data. Overall, we create a practical end-to-end pipeline that shows how modern AI can improve file inspection, security triage, and automated reporting in a highly accessible Colab environment.
Check out the Full Codes with Notebook here. Also, feel free to follow us on Twitter and don’t forget to join our 130k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Need to partner with us for promoting your GitHub Repo OR Hugging Face Page OR Product Release OR Webinar etc.? Connect with us
Sana Hassan
Sana Hassan, a consulting intern at Marktechpost and dual-degree student at IIT Madras, is passionate about applying technology and AI to address real-world challenges. With a keen interest in solving practical problems, he brings a fresh perspective to the intersection of AI and real-life solutions.

