forked from m3philis/rc_active_user_ldap_sync
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrocketchat_ldap_sync.py
184 lines (156 loc) · 5.78 KB
/
rocketchat_ldap_sync.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
#!/usr/bin/env python3
"""RocketChat LDAP sync for activating/deactivating users"""
# Copyright:
# 2019 M3philis <github.com/m3philis>
# 2019 P. H. <github.com/perfide>
# License:
# BSD-2-Clause (BSD 2-Clause "Simplified" License)
# https://spdx.org/licenses/BSD-2-Clause.html
# License for "ABOUT YOU GmbH":
# All permissions granted (sämtliche Nutzungsrechte werden gewährt)
# included
import logging
import os
import sys
# 3rd-party
import ldap3 # pip3 install ldap3 / sudo apt-get install python3-ldap3
from rocketchat_API import rocketchat # pip3 install rocketchat_API
try:
LDAP_PASSWORD = os.environ['LDAP_PASSWORD']
LDAP_SEARCH_BASE = os.environ['LDAP_SEARCH_BASE']
LDAP_SERVER = os.environ['LDAP_SERVER']
LDAP_USER = os.environ['LDAP_USER']
RC_PASSWORD = os.environ['RC_PASSWORD']
RC_SERVER = os.environ['RC_SERVER']
RC_USER = os.environ['RC_USER']
except KeyError as e:
print("ENV variable {} is not set!".format(e))
sys.exit(3)
LOG = logging.getLogger('rc-ldap-user-state-sync')
LOG.setLevel(logging.DEBUG)
LOG_HANDLER = logging.StreamHandler()
LOG_HANDLER.setLevel(logging.DEBUG)
LOG.addHandler(LOG_HANDLER)
def get_ldap_user_cns(conn, search_base):
"""Search LDAP for inetOrgPerson objects
Args:
conn (ldap3.core.connection.Connection): a connected LDAP object
search_base (str): base-dn to be used for the search
Returns:
set: user-CNs
"""
ldap_user_cns = set()
LOG.info('searching for inetOrgPerson')
search_result = conn.search(
search_base=search_base,
search_filter='(objectclass=inetOrgPerson)',
search_scope=ldap3.SUBTREE,
attributes=['cn'])
if not search_result or conn.result['result'] != 0:
LOG.error('user-search failed')
return ldap_user_cns
LOG.info('search-results: {}'.format(len(conn.response)))
for entry in conn.response:
ldap_user_cns.add(entry['attributes']['cn'][0].replace('-', '.'))
return ldap_user_cns
# end def get_user_cns
def get_rc_entries(rocket):
"""Search RocketChat users with LDAP login
Args:
rocket (rocketchat_API.rocketchat.RocketChat): a connected RocketChat object
Returns:
dict: usernames
"""
rc_entries = dict()
rc_total_entries = rocket.users_list().json()['total']
for page in range(int(rc_total_entries/100)+1):
for user in rocket.users_list(count=100, offset=page*100).json()['users']:
username = user['username']
try:
ldap = user['ldap']
except KeyError:
LOG.debug("Local user: {}".format(username))
continue
if ldap:
rc_entries[username] = dict()
rc_entries[username]['state'] = user['active']
rc_entries[username]['userID'] = user['_id']
return rc_entries
# end def get_rc_entries
def sync_rc_state_with_ldap(ldap_user_cns, rc_user_entries):
"""Sync state from LDAP to RocketChat
Args:
ldap_user_cns (set): user CNs from LDAP
rc_user_entries (dict): user and state from RocketChat
Returns:
dict: updated state for users
"""
rc_user_entries_updated = dict()
for user in rc_user_entries:
if user in ldap_user_cns:
if not rc_user_entries[user]['state']:
rc_user_entries_updated[user] = rc_user_entries[user]
rc_user_entries_updated[user]['state'] = True
else:
if rc_user_entries[user]['state']:
rc_user_entries_updated[user] = rc_user_entries[user]
rc_user_entries_updated[user]['state'] = False
return rc_user_entries_updated
# end def sync_rc_state_with_ldap
def set_rc_state(rc_user_entries_updated, rocket):
"""Set new state for user in RocketChat
Args:
rc_user_entries_updated (dict): user and state for RocketChat
rocket (rocketchat_API.rocketchat.RocketChat): a connected RocketChat object
Returns:
none
"""
LOG.info("Updated user: {}".format(len(rc_user_entries_updated)))
for user, username in rc_user_entries_updated.items():
LOG.debug("User: {}, State: {}".format(user, username['state']))
rocket.users_update(user_id=username['userID'], active=username['state'])
def main():
server = ldap3.Server(
LDAP_SERVER,
get_info=ldap3.ALL)
conn = ldap3.Connection(
server,
user=LDAP_USER,
password=LDAP_PASSWORD,
auto_bind=False,
auto_range=True,
receive_timeout=2)
# try ldap connection with TLS
try:
start_tls_result = conn.start_tls()
except ldap3.core.exceptions.LDAPSocketOpenError as e:
LOG.error('failed to open socket: {}'.format(e))
return 1
except ldap3.core.exceptions.LDAPStartTLSError as e:
# wrap socket error: _ssl.c:835: The handshake operation timed out
LOG.error('failed to start TLS: {}'.format(e))
return 1
except ldap3.core.exceptions.LDAPSocketReceiveError as e:
# error receiving data: timed out
LOG.error('timeout while connecting: {}'.format(e))
return 1
assert start_tls_result is True
LOG.debug('start_tls succeeded')
bind_result = conn.bind()
if not bind_result:
LOG.error('bind failed')
return 2
LOG.debug('bind succeeded')
rocket = rocketchat.RocketChat(RC_USER, RC_PASSWORD, server_url=RC_SERVER)
ldap_user_cns = get_ldap_user_cns(conn, LDAP_SEARCH_BASE)
rc_user_entries = get_rc_entries(rocket)
rc_user_entries_updated = sync_rc_state_with_ldap(
ldap_user_cns,
rc_user_entries)
set_rc_state(rc_user_entries_updated, rocket)
return 0
try:
if __name__ == '__main__':
sys.exit(main())
except KeyboardInterrupt:
LOG.error('\ninterrupted by keyboard')