-
Notifications
You must be signed in to change notification settings - Fork 340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
New Intel: Slack #1184
New Intel: Slack #1184
Changes from 8 commits
f5d66f5
1769d64
f541896
ba3cce8
638e1c8
2d9cbcd
0ea183b
fdc78fe
b1c4ca9
b6a1881
54a8b0c
2cc8200
931cd17
4cce8e7
4c5710f
08ac0a8
4af4082
c4ee352
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,71 @@ | ||
import logging | ||
|
||
import neo4j | ||
from slack_sdk import WebClient | ||
from slack_sdk.http_retry.builtin_handlers import RateLimitErrorRetryHandler | ||
|
||
import cartography.intel.slack.channels | ||
import cartography.intel.slack.groups | ||
import cartography.intel.slack.team | ||
import cartography.intel.slack.users | ||
from cartography.config import Config | ||
from cartography.util import timeit | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@timeit | ||
def start_slack_ingestion(neo4j_session: neo4j.Session, config: Config) -> None: | ||
""" | ||
If this module is configured, perform ingestion of Slack data. Otherwise warn and exit | ||
:param neo4j_session: Neo4J session for database interface | ||
:param config: A cartography.config object | ||
:return: None | ||
""" | ||
|
||
if not config.slack_token or not config.slack_teams: | ||
logger.info( | ||
'Slack import is not configured - skipping this module. ' | ||
'See docs to configure.', | ||
) | ||
return | ||
|
||
common_job_parameters = { | ||
"UPDATE_TAG": config.update_tag, | ||
} | ||
|
||
rate_limit_handler = RateLimitErrorRetryHandler(max_retry_count=1) | ||
slack_client = WebClient(token=config.slack_token) | ||
slack_client.retry_handlers.append(rate_limit_handler) | ||
|
||
for team_id in config.slack_teams.split(','): | ||
logger.info("Syncing team %s", team_id) | ||
common_job_parameters['TEAM_ID'] = team_id | ||
cartography.intel.slack.team.sync( | ||
neo4j_session, | ||
slack_client, | ||
team_id, | ||
config.update_tag, | ||
common_job_parameters, | ||
) | ||
cartography.intel.slack.users.sync( | ||
neo4j_session, | ||
slack_client, | ||
team_id, | ||
config.update_tag, | ||
common_job_parameters, | ||
) | ||
cartography.intel.slack.channels.sync( | ||
neo4j_session, | ||
slack_client, | ||
team_id, | ||
config.update_tag, | ||
common_job_parameters, | ||
) | ||
cartography.intel.slack.groups.sync( | ||
neo4j_session, | ||
slack_client, | ||
team_id, | ||
config.update_tag, | ||
common_job_parameters, | ||
) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
import logging | ||
from typing import Any | ||
from typing import Dict | ||
from typing import List | ||
from typing import Optional | ||
|
||
import neo4j | ||
from slack_sdk import WebClient | ||
|
||
from cartography.client.core.tx import load | ||
from cartography.graph.job import GraphJob | ||
from cartography.models.slack.channels import SlackChannelSchema | ||
from cartography.util import timeit | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@timeit | ||
def sync( | ||
neo4j_session: neo4j.Session, | ||
slack_client: WebClient, | ||
team_id: str, | ||
update_tag: int, | ||
common_job_parameters: Dict[str, Any], | ||
) -> None: | ||
channels = get(slack_client, team_id) | ||
load_channels(neo4j_session, channels, team_id, update_tag) | ||
cleanup(neo4j_session, common_job_parameters) | ||
|
||
|
||
@timeit | ||
def get(slack_client: WebClient, team_id: str, cursor: Optional[str] = None) -> List[Dict[str, Any]]: | ||
channels: List[Dict[str, Any]] = [] | ||
channels_info = slack_client.conversations_list(cursor=cursor, team_id=team_id) | ||
for m in channels_info['channels']: | ||
channels.append(m) | ||
if m['is_archived']: | ||
channels.append(m) | ||
jychp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
else: | ||
for i in _get_membership(slack_client, m['id']): | ||
channel_m = m.copy() | ||
channel_m['member_id'] = i | ||
channels.append(channel_m) | ||
next_cursor = channels_info.get('response_metadata', {}).get('next_cursor', '') | ||
if next_cursor != '': | ||
channels.extend(get(slack_client, team_id, cursor=next_cursor)) | ||
return channels | ||
|
||
|
||
def _get_membership(slack_client: WebClient, slack_channel: str, cursor: Optional[str] = None) -> List[str]: | ||
jychp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
result = [] | ||
memberships = slack_client.conversations_members(channel=slack_channel, cursor=cursor) | ||
for m in memberships['members']: | ||
result.append(m) | ||
next_cursor = memberships.get('response_metadata', {}).get('next_cursor', '') | ||
if next_cursor != '': | ||
result.extend(_get_membership(slack_client, slack_channel, cursor=next_cursor)) | ||
return result | ||
|
||
|
||
def load_channels( | ||
neo4j_session: neo4j.Session, | ||
data: List[Dict[str, Any]], | ||
team_id: str, | ||
update_tag: int, | ||
) -> None: | ||
|
||
jychp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
load( | ||
neo4j_session, | ||
SlackChannelSchema(), | ||
data, | ||
lastupdated=update_tag, | ||
TEAM_ID=team_id, | ||
) | ||
|
||
|
||
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict[str, Any]) -> None: | ||
GraphJob.from_node_schema(SlackChannelSchema(), common_job_parameters).run(neo4j_session) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
import logging | ||
from itertools import zip_longest | ||
from typing import Any | ||
from typing import Dict | ||
from typing import List | ||
from typing import Optional | ||
|
||
import neo4j | ||
from slack_sdk import WebClient | ||
|
||
from cartography.client.core.tx import load | ||
from cartography.graph.job import GraphJob | ||
from cartography.models.slack.group import SlackGroupSchema | ||
from cartography.util import timeit | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@timeit | ||
def sync( | ||
neo4j_session: neo4j.Session, | ||
slack_client: WebClient, | ||
team_id: str, | ||
update_tag: int, | ||
common_job_parameters: Dict[str, Any], | ||
) -> None: | ||
groups = get(slack_client, team_id) | ||
formated_groups = transform(groups) | ||
load_groups(neo4j_session, formated_groups, team_id, update_tag) | ||
cleanup(neo4j_session, common_job_parameters) | ||
|
||
|
||
@timeit | ||
def get(slack_client: WebClient, team_id: str, cursor: Optional[str] = None) -> List[Dict[str, Any]]: | ||
groups: List[Dict[str, Any]] = [] | ||
groups_info = slack_client.usergroups_list( | ||
cursor=cursor, | ||
include_count=True, | ||
include_users=True, | ||
include_disabled=True, | ||
team_id=team_id, | ||
) | ||
for g in groups_info['usergroups']: | ||
groups.append(g) | ||
next_cursor = groups_info.get('response_metadata', {}).get('next_cursor', '') | ||
if next_cursor != '': | ||
groups.extend(get(slack_client, team_id, cursor=next_cursor)) | ||
return groups | ||
|
||
|
||
@timeit | ||
def transform(groups: List[Dict[str, Any]]) -> List[Dict[str, Any]]: | ||
splitted_groups: List[Dict[str, Any]] = [] | ||
for group in groups: | ||
for ms in zip_longest(group['users'], group['prefs']['channels']): | ||
jychp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
formated_group = group.copy() | ||
formated_group.pop('users') | ||
formated_group.pop('prefs') | ||
formated_group['member_id'] = ms[0] | ||
formated_group['channel_id'] = ms[1] | ||
splitted_groups.append(formated_group) | ||
return splitted_groups | ||
|
||
|
||
def load_groups( | ||
neo4j_session: neo4j.Session, | ||
data: List[Dict[str, Any]], | ||
team_id: str, | ||
update_tag: int, | ||
) -> None: | ||
|
||
jychp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
load( | ||
neo4j_session, | ||
SlackGroupSchema(), | ||
data, | ||
lastupdated=update_tag, | ||
TEAM_ID=team_id, | ||
) | ||
|
||
|
||
def cleanup(neo4j_session: neo4j.Session, common_job_parameters: Dict[str, Any]) -> None: | ||
GraphJob.from_node_schema(SlackGroupSchema(), common_job_parameters).run(neo4j_session) |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,44 @@ | ||
import logging | ||
from typing import Any | ||
from typing import Dict | ||
|
||
import neo4j | ||
from slack_sdk import WebClient | ||
|
||
from cartography.client.core.tx import load | ||
from cartography.models.slack.team import SlackTeamSchema | ||
from cartography.util import timeit | ||
|
||
logger = logging.getLogger(__name__) | ||
|
||
|
||
@timeit | ||
def sync( | ||
neo4j_session: neo4j.Session, | ||
slack_client: WebClient, | ||
team_id: str, | ||
update_tag: int, | ||
common_job_parameters: Dict[str, Any], | ||
) -> None: | ||
team = get(slack_client, team_id) | ||
load_team(neo4j_session, team, update_tag) | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure we don't need a cleanup? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can not create an automatic cleanup, but clearly I can create a manual one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Right, since Team is the sub resource for Slack, I think we can get away with not having an automatic cleanup. |
||
|
||
@timeit | ||
def get(slack_client: WebClient, team_id: str) -> Dict[str, Any]: | ||
team_info = slack_client.team_info(team_id=team_id) | ||
return team_info['team'] | ||
|
||
|
||
def load_team( | ||
neo4j_session: neo4j.Session, | ||
data: Dict[str, Any], | ||
update_tag: int, | ||
) -> None: | ||
|
||
jychp marked this conversation as resolved.
Show resolved
Hide resolved
|
||
load( | ||
neo4j_session, | ||
SlackTeamSchema(), | ||
[data], | ||
lastupdated=update_tag, | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know there are different types of slack tokens, and they work a bit differently at the Team and Enterprise levels. But but have you considered using the auth.teams.list method for discovery?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will explore this.