diff --git a/blt/urls.py b/blt/urls.py index b3c75bd14..0757f5026 100644 --- a/blt/urls.py +++ b/blt/urls.py @@ -125,6 +125,7 @@ vote_count, ) from website.views.organization import ( + CodeSimilarityAnalyze, CreateHunt, DomainDetailView, DomainList, @@ -778,6 +779,16 @@ path("teams/delete-team/", delete_team, name="delete_team"), path("teams/leave-team/", leave_team, name="leave_team"), path("teams/kick-member/", kick_member, name="kick_member"), + path( + "similarity-check/", + TemplateView.as_view(template_name="similarity.html"), + name="similarity_check", + ), + path( + "api/code-similarity/analyze/", + CodeSimilarityAnalyze.as_view(), + name="code_similarity_analyze", + ), ] if settings.DEBUG: diff --git a/pyproject.toml b/pyproject.toml index 2d6b2583f..aa3cd8293 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -81,6 +81,10 @@ matplotlib = "^3.10.0" openpyxl = "^3.1.5" atproto = "^0.0.55" slack-bolt = "^1.22.0" +gitpython = "^3.1.43" +transformers = "^4.47.1" +torch = "^2.5.1" +scikit-learn = "^1.6.0" [tool.poetry.group.dev.dependencies] black = "^24.8.0" diff --git a/website/similarity_utils.py b/website/similarity_utils.py new file mode 100644 index 000000000..b4ff803ef --- /dev/null +++ b/website/similarity_utils.py @@ -0,0 +1,377 @@ +import ast +import csv +import difflib +import io +import os +import re + +import torch +from sklearn.metrics.pairwise import cosine_similarity +from transformers import AutoModel, AutoTokenizer + +# Initialize CodeBERT model and tokenizer +tokenizer = AutoTokenizer.from_pretrained("microsoft/codebert-base") +model = AutoModel.from_pretrained("microsoft/codebert-base") + + +def process_similarity_analysis(repo1_path, repo2_path): + """ + Process the similarity analysis between two repositories. + :param repo1_path: Path to the first repository + :param repo2_path: Path to the second repository + :return: Similarity score and matching details + """ + # Dummy data for now, will be replaced by actual parsing logic + matching_details = { + "functions": [], + "models": [], + } + + # Step 1: Extract function signatures and content + functions1 = extract_function_signatures_and_content(repo1_path) + functions2 = extract_function_signatures_and_content(repo2_path) + + # Compare functions + for func1 in functions1: + for func2 in functions2: + print(func1["signature"]["name"], func2["signature"]["name"]) + # Name similarity + name_similarity_difflib = ( + difflib.SequenceMatcher( + None, func1["signature"]["name"], func2["signature"]["name"] + ).ratio() + * 100 + ) + name_similarity_codebert = analyze_code_similarity_with_codebert( + func1["signature"]["name"], func2["signature"]["name"] + ) + name_similarity = (name_similarity_difflib + name_similarity_codebert) / 2 + + # Signature similarity + signature1 = f"{func1['signature']['name']}({', '.join(func1['signature']['args'])})" + signature2 = f"{func2['signature']['name']}({', '.join(func2['signature']['args'])})" + signature_similarity_difflib = ( + difflib.SequenceMatcher(None, signature1, signature2).ratio() * 100 + ) + signature_similarity_codebert = analyze_code_similarity_with_codebert( + signature1, signature2 + ) + signature_similarity = ( + signature_similarity_difflib + signature_similarity_codebert + ) / 2 + + # Content similarity + content_similarity = analyze_code_similarity_with_codebert( + func1["full_text"], func2["full_text"] + ) + + # Aggregate similarity + overall_similarity = (name_similarity + signature_similarity + content_similarity) / 3 + if overall_similarity > 50: # You can set the threshold here + matching_details["functions"].append( + { + "name1": func1["signature"]["name"], + "name2": func2["signature"]["name"], + "name_similarity": round(name_similarity, 2), + "signature_similarity": round(signature_similarity, 2), + "content_similarity": round(content_similarity, 2), + "similarity": round(overall_similarity, 2), + } + ) + + # Step 2: Compare Django models + models1 = extract_django_models(repo1_path) + models2 = extract_django_models(repo2_path) + + # Compare models and fields + for model1 in models1: + for model2 in models2: + model_similarity = ( + difflib.SequenceMatcher(None, model1["name"], model2["name"]).ratio() * 100 + ) + + model_fields_similarity = compare_model_fields(model1, model2) + matching_details["models"].append( + { + "name1": model1["name"], + "name2": model2["name"], + "similarity": round(model_similarity, 2), + "field_comparison": model_fields_similarity, + } + ) + + # Convert matching_details to CSV + csv_file = convert_matching_details_to_csv(matching_details) + + return matching_details, csv_file + + +def convert_matching_details_to_csv(matching_details): + """ + Convert matching details dictionary to a CSV file. + :param matching_details: Dictionary containing matching details + :return: CSV file as a string + """ + output = io.StringIO() + writer = csv.writer(output) + + # Write function similarities + writer.writerow(["Function Similarities"]) + writer.writerow( + [ + "Name1", + "Name2", + "Name Similarity", + "Signature Similarity", + "Content Similarity", + "Overall Similarity", + ] + ) + for func in matching_details["functions"]: + writer.writerow( + [ + func["name1"], + func["name2"], + func["name_similarity"], + func["signature_similarity"], + func["content_similarity"], + func["similarity"], + ] + ) + + # Write model similarities + writer.writerow([]) + writer.writerow(["Model Similarities"]) + writer.writerow(["Name1", "Name2", "Model Name Similarity", "Overall Field Similarity"]) + for model in matching_details["models"]: + writer.writerow( + [ + model["name1"], + model["name2"], + model["similarity"], + model["field_comparison"]["overall_field_similarity"], + ] + ) + + # Write field comparison details + writer.writerow( + [ + "Field1 Name", + "Field1 Type", + "Field2 Name", + "Field2 Type", + "Field Name Similarity", + "Field Type Similarity", + "Overall Similarity", + ] + ) + for field in model["field_comparison"]["field_comparison_details"]: + writer.writerow( + [ + field["field1_name"], + field["field1_type"], + field["field2_name"], + field["field2_type"], + field["field_name_similarity"], + field["field_type_similarity"], + field["overall_similarity"], + ] + ) + + return output.getvalue() + + +def analyze_code_similarity_with_codebert(code1, code2): + """ + Analyze the semantic similarity between two code snippets using CodeBERT embeddings. + :param code1: First code snippet + :param code2: Second code snippet + :return: Similarity score (0-100) + """ + + # Tokenize and encode inputs + inputs_code1 = tokenizer( + code1, return_tensors="pt", truncation=True, max_length=512, padding="max_length" + ) + inputs_code2 = tokenizer( + code2, return_tensors="pt", truncation=True, max_length=512, padding="max_length" + ) + + # Generate embeddings + with torch.no_grad(): + outputs_code1 = model(**inputs_code1) + outputs_code2 = model(**inputs_code2) + + # Use mean pooling over the last hidden state to get sentence-level embeddings + embedding_code1 = outputs_code1.last_hidden_state.mean(dim=1) + embedding_code2 = outputs_code2.last_hidden_state.mean(dim=1) + + # Compute cosine similarity + similarity = cosine_similarity(embedding_code1.numpy(), embedding_code2.numpy()) + similarity_score = similarity[0][0] * 100 # Scale similarity to 0-100 + + return round(similarity_score, 2) + + +def extract_function_signatures_and_content(repo_path): + """ + Extract function signatures (name, parameters) and full text from Python files. + :param repo_path: Path to the repository + :return: List of function metadata (signature + full text) + """ + functions = [] + for root, dirs, files in os.walk(repo_path): + for file in files: + if file.endswith(".py"): + file_path = os.path.join(root, file) + with open(file_path, "r") as f: + try: + file_content = f.read() + tree = ast.parse(file_content, filename=file) + for node in ast.walk(tree): + if isinstance(node, ast.FunctionDef): + signature = { + "name": node.name, + "args": [arg.arg for arg in node.args.args], + "defaults": [ + ast.dump(default) for default in node.args.defaults + ], + } + # Extract function body as full text + function_text = ast.get_source_segment(file_content, node) + function_data = { + "signature": signature, + "full_text": function_text, # Full text of the function + } + functions.append(function_data) + except Exception as e: + print(f"Error parsing {file_path}: {e}") + return functions + + +def extract_django_models(repo_path): + """ + Extract Django model names and fields from the given repository. + :param repo_path: Path to the repository + :return: List of models with their fields + """ + models = [] + + # Walk through the repository directory + for root, dirs, files in os.walk(repo_path): + for file in files: + if file.endswith(".py"): # Only process Python files + file_path = os.path.join(root, file) + + # Open the file and read its contents + with open(file_path, "r") as f: + lines = f.readlines() + model_name = None + fields = [] + + for line in lines: + line = line.strip() + # Look for class definition that inherits from models.Model + if line.startswith("class ") and "models.Model" in line: + if model_name: # Save the previous model if exists + models.append({"name": model_name, "fields": fields}) + model_name = line.split("(")[0].replace("class ", "").strip() + fields = [] # Reset fields when a new model starts + + else: + # Match field definitions like: name = models.CharField(max_length=...) + match = re.match(r"^\s*(\w+)\s*=\s*models\.(\w+)", line) + if match: + field_name = match.group(1) + field_type = match.group(2) + fields.append({"field_name": field_name, "field_type": field_type}) + + # Match other field types like ForeignKey, ManyToManyField, etc. + match_complex = re.match( + r"^\s*(\w+)\s*=\s*models\.(ForeignKey|ManyToManyField|OneToOneField)\((.*)\)", + line, + ) + if match_complex: + field_name = match_complex.group(1) + field_type = match_complex.group(2) + field_params = match_complex.group(3).strip() + fields.append( + { + "field_name": field_name, + "field_type": field_type, + "parameters": field_params, + } + ) + + # Add the last model if the file ends without another class + if model_name: + models.append({"name": model_name, "fields": fields}) + + return models + + +def compare_model_fields(model1, model2): + """ + Compare the names and fields of two Django models using difflib. + Compares model names, field names, and field types to calculate similarity scores. + + :param model1: First model's details (e.g., {'name': 'User', 'fields': [...]}) + :param model2: Second model's details (e.g., {'name': 'Account', 'fields': [...]}) + :return: Dictionary containing name and field similarity details + """ + # Compare model names + model_name_similarity = ( + difflib.SequenceMatcher(None, model1["name"], model2["name"]).ratio() * 100 + ) + + # Initialize field comparison details + field_comparison_details = [] + + # Get fields from both models + fields1 = model1.get("fields", []) + fields2 = model2.get("fields", []) + + for field1 in fields1: + for field2 in fields2: + print(field1, field2) + # Compare field names + field_name_similarity = ( + difflib.SequenceMatcher(None, field1["field_name"], field2["field_name"]).ratio() + * 100 + ) + + # Compare field types + field_type_similarity = ( + difflib.SequenceMatcher(None, field1["field_type"], field2["field_type"]).ratio() + * 100 + ) + + # Average similarity between the field name and type + overall_similarity = (field_name_similarity + field_type_similarity) / 2 + + # Append details for each field comparison + if overall_similarity > 50: + field_comparison_details.append( + { + "field1_name": field1["field_name"], + "field1_type": field1["field_type"], + "field2_name": field2["field_name"], + "field2_type": field2["field_type"], + "field_name_similarity": round(field_name_similarity, 2), + "field_type_similarity": round(field_type_similarity, 2), + "overall_similarity": round(overall_similarity, 2), + } + ) + + # Calculate overall similarity across all fields + if field_comparison_details: + total_similarity = sum([entry["overall_similarity"] for entry in field_comparison_details]) + overall_field_similarity = total_similarity / len(field_comparison_details) + else: + overall_field_similarity = 0.0 + + return { + "model_name_similarity": round(model_name_similarity, 2), + "field_comparison_details": field_comparison_details, + "overall_field_similarity": round(overall_field_similarity, 2), + } diff --git a/website/templates/includes/sidenav.html b/website/templates/includes/sidenav.html index 194d7a982..c30ef9c58 100644 --- a/website/templates/includes/sidenav.html +++ b/website/templates/includes/sidenav.html @@ -124,6 +124,15 @@ Trademarks +
  • + +
    + +
    + SimilarityScan +
    +
  • diff --git a/website/templates/similarity.html b/website/templates/similarity.html new file mode 100644 index 000000000..5ddec615a --- /dev/null +++ b/website/templates/similarity.html @@ -0,0 +1,398 @@ +{% extends "base.html" %} +{% block content %} + {% include "includes/sidenav.html" %} + + +
    +

    Similarity Check

    + +
    +
    + High Similarity +
    +
    + Medium Similarity +
    +
    + Low/No Similarity +
    +
    + +
    +
    +
    + +
    +
    + +
    +
    +
    +
    + +
    +
    + +
    +
    + + +
    + +
    +

    Results

    +
    +
    + +
    + +
    +
    + +{% endblock content %} diff --git a/website/views/organization.py b/website/views/organization.py index 3c429ffc7..7d49d781d 100644 --- a/website/views/organization.py +++ b/website/views/organization.py @@ -1,5 +1,7 @@ import ipaddress import json +import os +import tempfile from collections import defaultdict from datetime import datetime, timedelta, timezone from decimal import Decimal @@ -26,8 +28,11 @@ from django.views.decorators.http import require_POST from django.views.generic import FormView, ListView, TemplateView, View from django.views.generic.edit import CreateView +from git import Repo # Requires GitPython library from rest_framework import status from rest_framework.authtoken.models import Token +from rest_framework.response import Response +from rest_framework.views import APIView from blt import settings from website.forms import CaptchaForm, HuntForm, IpReportForm, UserProfileForm @@ -49,6 +54,7 @@ Winner, ) from website.services.blue_sky_service import BlueSkyService +from website.similarity_utils import process_similarity_analysis from website.utils import format_timedelta, get_client_ip, get_github_issue_title @@ -1816,3 +1822,84 @@ def checkIN_detail(request, report_id): "blockers": report.blockers, } return render(request, "sizzle/checkin_detail.html", context) + + +class CodeSimilarityAnalyze(APIView): + def post(self, request, *args, **kwargs): + # Extract and validate data from request + type1 = request.data.get("type1") # 'github' or 'zip' + type2 = request.data.get("type2") # 'github' or 'zip' + + if type1 == "github": + repo1 = request.data.get("repo1") # GitHub URL + elif type1 == "zip": + repo1 = request.FILES.get("repo1") # ZIP file + + if type2 == "github": + repo2 = request.data.get("repo2") # GitHub URL + elif type2 == "zip": + repo2 = request.FILES.get("repo2") # ZIP file + + if not repo1 or not repo2 or not type1 or not type2: + return Response( + {"error": "Both repositories and their types are required."}, + status=status.HTTP_400_BAD_REQUEST, + ) + + if type1 not in ["github", "zip"] or type2 not in ["github", "zip"]: + return Response( + {"error": "Invalid type. Must be 'github' or 'zip'."}, + status=status.HTTP_400_BAD_REQUEST, + ) + + try: + temp_dir = tempfile.mkdtemp() + repo1_path = self.download_or_extract(repo1, type1, temp_dir, "repo1") + repo2_path = self.download_or_extract(repo2, type2, temp_dir, "repo2") + + matching_details, csv_file = process_similarity_analysis(repo1_path, repo2_path) + + except ValueError as e: + return Response( + {"error": "An unexpected error occurred, please try again later."}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + except Exception as e: + # return Response({"error": str(e)}, status=status.HTTP_500_INTERNAL_SERVER_ERROR) + return Response( + {"error": "An unexpected error occurred, please try again later."}, + status=status.HTTP_500_INTERNAL_SERVER_ERROR, + ) + response = Response( + { + "status": "success", + "matching_details": matching_details, # Detailed function/model similarity + }, + status=status.HTTP_200_OK, + ) + + # response["Content-Disposition"] = 'attachment; filename="similarity_report.csv"' + # response["Content-Type"] = "text/csv" + # response.content = csv_file + + return response + + def download_or_extract(self, source, source_type, temp_dir, repo_name): + """ + Download or extract the repository based on the type (GitHub or ZIP). + :param source: GitHub URL or ZIP file path + :param source_type: "github" or "zip" + :param temp_dir: Temporary directory for processing + :param repo_name: Prefix for naming (repo1 or repo2) + :return: Path to the extracted repository + """ + + dest_path = os.path.join(temp_dir, repo_name) + if source_type == "github": + # Clone the GitHub repository + Repo.clone_from(source, dest_path) + + elif source_type == "zip": + pass + + return dest_path