forked from thevinilima/lab01
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
123 lines (108 loc) · 3.85 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import csv
import requests
token = ""
headers = {"Authorization": f"Bearer {token}"}
def get_popular_repos(total_repos, headers):
url = "https://api.github.com/graphql"
repos = []
after_cursor = None
while len(repos) < total_repos:
query = """
query ($after: String) {
search(query: "stars:>0", type: REPOSITORY, first: 100, after: $after) {
pageInfo {
endCursor
hasNextPage
}
nodes {
... on Repository {
name
createdAt
updatedAt
primaryLanguage {
name
}
owner {
login
}
stargazerCount
}
}
}
}
"""
variables = {"after": after_cursor}
response = requests.post(url, json={"query": query, "variables": variables}, headers=headers)
if response.status_code == 200:
data = response.json()["data"]["search"]
repos.extend(data["nodes"])
after_cursor = data["pageInfo"]["endCursor"]
if not data["pageInfo"]["hasNextPage"]:
break
else:
raise Exception(f"Failed to fetch repositories: {response.status_code}")
return repos[:total_repos]
def get_repo_details(repos, headers):
repo_details = []
url = "https://api.github.com/graphql"
for repo in repos:
print(f"Processing repository: {repo['name']}")
owner = repo["owner"]["login"]
repo_name = repo["name"]
query = """
query ($owner: String!, $name: String!) {
repository(owner: $owner, name: $name) {
name
createdAt
updatedAt
primaryLanguage {
name
}
pullRequests(states: MERGED) {
totalCount
}
releases {
totalCount
}
issues(states: CLOSED) {
totalCount
}
openIssues: issues(states: OPEN) {
totalCount
}
}
}
"""
variables = {"owner": owner, "name": repo_name}
response = requests.post(url, json={"query": query, "variables": variables}, headers=headers)
if response.status_code == 200:
data = response.json()["data"]["repository"]
closed_issues_ratio = (
data["issues"]["totalCount"] / (data["issues"]["totalCount"] + data["openIssues"]["totalCount"])
) if (data["issues"]["totalCount"] + data["openIssues"]["totalCount"]) > 0 else 0
repo_details.append({
'name': data["name"],
'created_at': data["createdAt"],
'total_pulls': data["pullRequests"]["totalCount"],
'total_releases': data["releases"]["totalCount"],
'updated_at': data["updatedAt"],
'language': data["primaryLanguage"]["name"] if data["primaryLanguage"] else None,
'closed_issues_ratio': round(closed_issues_ratio, 2),
})
else:
raise Exception(f"Failed to fetch repository details: {response.status_code}")
keys = repo_details[0].keys() if repo_details else []
with open('repos_details.csv', 'w', newline='', encoding='utf-8') as output_file:
dict_writer = csv.DictWriter(output_file, fieldnames=keys)
dict_writer.writeheader()
dict_writer.writerows(repo_details)
print("Saved repository details to repos_details.csv")
return repo_details
if __name__ == "__main__":
size = 1000
try:
popular_repos = get_popular_repos(size, headers)
repo_details = get_repo_details(popular_repos, headers)
print(repo_details)
except Exception as e:
print(e)