Skip to content
Snippets Groups Projects
Commit e84e3e7c authored by Paul Sokolovsky's avatar Paul Sokolovsky
Browse files

esp8266: Rework webrepl_setup to run over wired REPL.

parent fa3a108e
No related branches found
No related tags found
No related merge requests found
...@@ -59,18 +59,16 @@ def start(port=8266, password=None): ...@@ -59,18 +59,16 @@ def start(port=8266, password=None):
stop() stop()
if password is None: if password is None:
try: try:
import port_config import webrepl_cfg
_webrepl.password(port_config.WEBREPL_PASS) _webrepl.password(webrepl_cfg.PASS)
setup_conn(port, accept_conn) setup_conn(port, accept_conn)
print("Started webrepl in normal mode") print("Started webrepl in normal mode")
except: except:
import webrepl_setup print("WebREPL is not configured, run 'import webrepl_setup'")
setup_conn(port, webrepl_setup.handle_conn)
print("Started webrepl in setup mode")
else: else:
_webrepl.password(password) _webrepl.password(password)
setup_conn(port, accept_conn) setup_conn(port, accept_conn)
print("Started webrepl in normal mode") print("Started webrepl in manual override mode")
def start_foreground(port=8266): def start_foreground(port=8266):
......
import sys import sys
import socket #import uos as os
import time import os
import machine
from websocket import *
import websocket_helper
def setup_server():
s = socket.socket()
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ai = socket.getaddrinfo("0.0.0.0", 8266)
addr = ai[0][4]
s.bind(addr) RC = "./boot.py"
s.listen(1) CONFIG = "./webrepl_cfg.py"
return s
def getpass(stream, prompt): def input_choice(prompt, choices):
stream.write(prompt)
passwd = b""
while 1: while 1:
c = stream.read(1) resp = input(prompt)
if c in (b"\r", b"\n"): if resp in choices:
stream.write("\r\n") return resp
return passwd
passwd += c
stream.write("*")
def handle_conn(listen_sock):
cl, remote_addr = listen_sock.accept()
print("""
First-time WebREPL connection has been received. WebREPL initial setup
will now start over this connection. During setup, UART REPL will be
non-responsive. After setup finishes, the board will be rebooted. In
case of error during setup, current session will continue.
If you receive this message unexpectedly, it may mean that your WebREPL
connection is being hacked (power off board if unsure).
""")
websocket_helper.server_handshake(cl)
ws = websocket(cl)
ws.write("""\
Welcome to MicroPython WebREPL!\r
\r
This is the first time you connect to WebREPL, so please set a password\r
to use for the following WebREPL sessions. Once you enter the password\r
twice, your board will reboot with WebREPL running in active mode. On\r
some boards, you may need to press reset button or reconnect power.\r
\r
""")
def getpass(prompt):
return input(prompt)
def input_pass():
while 1: while 1:
passwd1 = getpass(ws, "New password: ") passwd1 = getpass("New password: ")
if len(passwd1) < 4: if len(passwd1) < 4:
ws.write("Password too short\r\n") print("Password too short")
continue continue
elif len(passwd1) > 9: elif len(passwd1) > 9:
ws.write("Password too long\r\n") print("Password too long")
continue continue
passwd2 = getpass(ws, "Confirm password: ") passwd2 = getpass("Confirm password: ")
if passwd1 == passwd2: if passwd1 == passwd2:
break return passwd1
ws.write("Passwords do not match\r\n") print("Passwords do not match")
with open("port_config.py", "w") as f:
f.write("WEBREPL_PASS = %r\n" % passwd1.decode("ascii"))
ws.write("Password successfully set, restarting...\r\n") def exists(fname):
cl.close() try:
time.sleep(2) with open(fname):
import machine pass
machine.reset() return True
except OSError:
return False
def copy_stream(s_in, s_out):
buf = bytearray(64)
while 1:
sz = s_in.readinto(buf)
s_out.write(buf, sz)
def get_daemon_status():
with open(RC) as f:
for l in f:
if "webrepl" in l:
if l.startswith("#"):
return False
return True
return None
def add_daemon():
with open(RC) as old_f, open(RC + ".tmp", "w") as new_f:
new_f.write("import webrepl\nwebrepl.start()\n")
copy_stream(old_f, new_f)
def change_daemon(action):
LINES = ("import webrepl", "webrepl.start()")
with open(RC) as old_f, open(RC + ".tmp", "w") as new_f:
for l in old_f:
for patt in LINES:
if patt in l:
if action and l.startswith("#"):
l = l[1:]
elif not action and not l.startswith("#"):
l = "#" + l
new_f.write(l)
# FatFs rename() is not POSIX compliant, will raise OSError if
# dest file exists.
os.remove(RC)
os.rename(RC + ".tmp", RC)
def main():
status = get_daemon_status()
print("WebREPL daemon auto-start status:", "enabled" if status else "disabled")
print("\nWould you like to (E)nable or (D)isable it running on boot?")
print("(Empty line to quit)")
resp = input("> ").upper()
if resp == "E":
if exists(CONFIG):
resp2 = input_choice("Would you like to change WebREPL password? (y/n) ", ("y", "n", ""))
else:
print("To enable WebREPL, you must set password for it")
resp2 = "y"
if resp2 == "y":
passwd = input_pass()
with open(CONFIG, "w") as f:
f.write("PASS = %r\n" % passwd)
if resp not in ("D", "E") or (resp == "D" and not status) or (resp == "E" and status):
print("No further action required")
sys.exit()
change_daemon(resp == "E")
print("Changes will be activated after reboot")
resp = input_choice("Would you like to reboot now? (y/n) ", ("y", "n", ""))
if resp == "y":
machine.reset()
def test(): main()
s = setup_server()
handle_conn(s)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment