-
Notifications
You must be signed in to change notification settings - Fork 0
/
sever.py
129 lines (121 loc) · 4.45 KB
/
sever.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# from bluetooth import *
#
# server_sock = BluetoothSocket(RFCOMM)
# server_sock.bind(("", PORT_ANY))
# server_sock.listen(1)
#
# port = server_sock.getsockname()[1]
#
# uuid = "94f39d29-7d6d-437d-973b-fba39e49d4ee"
#
# advertise_service(server_sock, "SampleServer",
# service_id=uuid,
# service_classes=[uuid, SERIAL_PORT_CLASS],
# profiles=[SERIAL_PORT_PROFILE],
# # protocols = [ OBEX_UUID ]
# )
#
# print("Waiting for connection on RFCOMM channel %d" % port)
#
# client_sock, client_info = server_sock.accept()
# print("Accepted connection from ", client_info)
#
#
# #receive
# # try:
# # while True:
# # data = client_sock.recv(1024)
# # if len(data) == 0: break
# # print("received [%s]" % data)
# # except IOError:
# # pass
# #print("disconnected")
#
# #send
# while True:
# data = input()
# if len(data) == 0: break
# sock.send(data)
#
# client_sock.close()
# server_sock.close()
import bluetooth
from bluetooth.btcommon import BluetoothError
import time
class DeviceConnector:
TARGET_NAME = "DESKTOP-55TDMGV"
TARGET_ADDRESS = None
TARGET_PORT = 4
SOCKET = None
def __init__(self):
pass
def getConnectionInstance(self):
self.deviceDiscovery()
if(DeviceConnector.TARGET_ADDRESS is not None):
print('Device found!')
self.connect_bluetooth_addr()
return DeviceConnector.SOCKET
else:
print('Could not find target bluetooth device nearby')
def deviceDiscovery(self):
tries=0
# uuid = "00001101-0000-1000-8000-00805F9B34FB"
# search for the SampleServer service
# service_matches = find_service(uuid = addr )
#
# if len(service_matches) == 0:
# print("couldn't find the SampleServer service =(")
# sys.exit(0)
try:
nearby_devices = bluetooth.discover_devices(lookup_names = True, duration=5)
while nearby_devices.__len__() == 0 and tries < 3: #多次
nearby_devices = bluetooth.discover_devices(lookup_names = True, duration=5) ##查找。名称可见
tries += 1
time.sleep (2)
print ('couldn\'t connect! trying again...')
for bdaddr, name in nearby_devices:
print(bdaddr,name)
for bdaddr, name in nearby_devices:
if bdaddr and name == DeviceConnector.TARGET_NAME: ##查找目标
DeviceConnector.TARGET_ADDRESS = bdaddr
# DeviceConnector.TARGET_NAME = name
except BluetoothError as e:
print ('bluetooth is off')
def connect_bluetooth_addr(self):
for i in range(1,5):
time.sleep(1)
sock = bluetooth.BluetoothSocket(bluetooth.RFCOMM) #服务器协议选择
try:
sock.connect((DeviceConnector.TARGET_ADDRESS, 1)) # DeviceConnector.TARGET_PORT #连接目标
sock.setblocking(False) #阻塞
data = '12345'
sock.send( data.encode('utf-8') )
DeviceConnector.SOCKET = sock
print('it has connected a device successfully')
return
except BluetoothError as e:
print('Could not connect to the device')
DeviceConnector.SOCKET.close()
return None
def createService(self,way=bluetooth.RFCOMM):
server_sock = bluetooth.BluetoothSocket(way) ## bluetooth.L2CAP ## RFCOMM ##
server_sock.bind(('', 4))
server_sock.listen(2) #监听
print('开始监听....................')
try:
while True:
client_sock, address = server_sock.accept() # 接受请求
print("Accepted connection from ", address)
while True:
data = client_sock.recv(1024)#等待接受数据。 数据长度为1(这个根据自己的情况任意改,只有接受够这么多长度的数据,才会结束这个语句)
if not data:
break
client_sock.send("server") # 数据返回
print("received [%s]" % data.decode('utf-8') )
except IOError:
pass
client_sock.close() #连接关闭
server_sock.close()
bluez = DeviceConnector()
bluez.getConnectionInstance()
bluez.createService()