build: Initialize Python virtual environment and install project dependencies.
Some checks failed
WLED CI / wled_build (push) Has been cancelled

This commit is contained in:
2026-02-19 22:31:26 +08:00
parent ca1319462e
commit bb17574d0c
1668 changed files with 469578 additions and 0 deletions

View File

@@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,188 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# pylint: disable=too-many-arguments, too-many-locals
# pylint: disable=too-many-branches, too-many-statements
import asyncio
import os
import signal
import subprocess
import click
from platformio import app, exception, fs, proc
from platformio.compat import IS_WINDOWS
from platformio.debug import helpers
from platformio.debug.config.factory import DebugConfigFactory
from platformio.debug.exception import DebugInvalidOptionsError
from platformio.debug.process.gdb import GDBClientProcess
from platformio.exception import ReturnErrorCode
from platformio.platform.factory import PlatformFactory
from platformio.project.config import ProjectConfig
from platformio.project.helpers import is_platformio_project
from platformio.project.options import ProjectOptions
@click.command(
"debug",
context_settings=dict(ignore_unknown_options=True),
short_help="Unified Debugger",
)
@click.option(
"-d",
"--project-dir",
default=os.getcwd,
type=click.Path(exists=True, file_okay=False, dir_okay=True, writable=True),
)
@click.option(
"-c",
"--project-conf",
type=click.Path(exists=True, file_okay=True, dir_okay=False, readable=True),
)
@click.option("--environment", "-e", metavar="<environment>")
@click.option("--load-mode", type=ProjectOptions["env.debug_load_mode"].type)
@click.option("--verbose", "-v", is_flag=True)
@click.option("--interface", type=click.Choice(["gdb"]))
@click.argument("client_extra_args", nargs=-1, type=click.UNPROCESSED)
@click.pass_context
def cli( # pylint: disable=too-many-positional-arguments
ctx,
project_dir,
project_conf,
environment,
load_mode,
verbose,
interface,
client_extra_args,
):
app.set_session_var("custom_project_conf", project_conf)
if not interface and client_extra_args:
raise click.UsageError("Please specify debugging interface")
# use env variables from Eclipse or CLion
for name in ("CWD", "PWD", "PLATFORMIO_PROJECT_DIR"):
if is_platformio_project(project_dir):
break
if os.getenv(name):
project_dir = os.getenv(name)
with fs.cd(project_dir):
project_config = ProjectConfig.get_instance(project_conf)
project_config.validate(envs=[environment] if environment else None)
env_name = environment or helpers.get_default_debug_env(project_config)
if not interface:
return helpers.predebug_project(
ctx, os.getcwd(), project_config, env_name, False, verbose
)
configure_args = (
ctx,
project_config,
env_name,
load_mode,
verbose,
client_extra_args,
)
if helpers.is_gdbmi_mode():
os.environ["PLATFORMIO_DISABLE_PROGRESSBAR"] = "true"
stream = helpers.GDBMIConsoleStream()
with proc.capture_std_streams(stream):
debug_config = _configure(*configure_args)
stream.close()
else:
debug_config = _configure(*configure_args)
_run(os.getcwd(), debug_config, client_extra_args)
return None
def _configure(
ctx, project_config, env_name, load_mode, verbose, client_extra_args
): # pylint: disable=too-many-positional-arguments
platform = PlatformFactory.from_env(env_name, autoinstall=True)
debug_config = DebugConfigFactory.new(
platform,
project_config,
env_name,
)
if "--version" in client_extra_args:
raise ReturnErrorCode(
subprocess.run(
[debug_config.client_executable_path, "--version"], check=True
).returncode
)
try:
fs.ensure_udev_rules()
except exception.InvalidUdevRules as exc:
click.echo(str(exc))
rebuild_prog = False
preload = debug_config.load_cmds == ["preload"]
load_mode = load_mode or debug_config.load_mode
if load_mode == "always":
rebuild_prog = preload or not helpers.has_debug_symbols(
debug_config.program_path
)
elif load_mode == "modified":
rebuild_prog = helpers.is_prog_obsolete(
debug_config.program_path
) or not helpers.has_debug_symbols(debug_config.program_path)
if not (debug_config.program_path and os.path.isfile(debug_config.program_path)):
rebuild_prog = True
if preload or (not rebuild_prog and load_mode != "always"):
# don't load firmware through debug server
debug_config.load_cmds = []
if rebuild_prog:
click.echo("Preparing firmware for debugging...")
helpers.predebug_project(
ctx, os.getcwd(), project_config, env_name, preload, verbose
)
# save SHA sum of newly created prog
if load_mode == "modified":
helpers.is_prog_obsolete(debug_config.program_path)
if not os.path.isfile(debug_config.program_path):
raise DebugInvalidOptionsError("Program/firmware is missed")
return debug_config
def _run(project_dir, debug_config, client_extra_args):
try:
loop = asyncio.ProactorEventLoop() if IS_WINDOWS else asyncio.get_event_loop()
except RuntimeError:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = GDBClientProcess(project_dir, debug_config)
coro = client.run(client_extra_args)
try:
signal.signal(signal.SIGINT, signal.SIG_IGN)
loop.run_until_complete(coro)
if IS_WINDOWS:
client.close()
# an issue with `asyncio` executor and STIDIN,
# it cannot be closed gracefully
proc.force_exit()
finally:
client.close()
loop.close()

View File

@@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,254 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import os
from platformio import fs, proc, util
from platformio.compat import string_types
from platformio.debug.exception import DebugInvalidOptionsError
from platformio.project.config import ProjectConfig
from platformio.project.helpers import load_build_metadata
from platformio.project.options import ProjectOptions
class DebugConfigBase: # pylint: disable=too-many-instance-attributes
DEFAULT_PORT = None
def __init__(self, platform, project_config, env_name):
self.platform = platform
self.project_config = project_config
self.env_name = env_name
self.env_options = project_config.items(env=env_name, as_dict=True)
self.build_data = self._load_build_data()
self.tool_name = None
self.board_config = {}
self.tool_settings = {}
if "board" in self.env_options:
self.board_config = platform.board_config(self.env_options["board"])
self.tool_name = self.board_config.get_debug_tool_name(
self.env_options.get("debug_tool")
)
self.tool_settings = (
self.board_config.get("debug", {})
.get("tools", {})
.get(self.tool_name, {})
)
self._load_cmds = None
self._port = None
self.server = self._configure_server()
try:
platform.configure_debug_session(self)
except NotImplementedError:
pass
@staticmethod
def cleanup_cmds(items):
items = ProjectConfig.parse_multi_values(items)
return ["$LOAD_CMDS" if item == "$LOAD_CMD" else item for item in items]
@property
def program_path(self):
return self.build_data["prog_path"]
@property
def client_executable_path(self):
return self.build_data["gdb_path"]
@property
def load_cmds(self):
if self._load_cmds is not None:
return self._load_cmds
result = self.env_options.get("debug_load_cmds")
if not result:
result = self.tool_settings.get("load_cmds")
if not result:
# legacy
result = self.tool_settings.get("load_cmd")
if not result:
result = ProjectOptions["env.debug_load_cmds"].default
return self.cleanup_cmds(result)
@load_cmds.setter
def load_cmds(self, cmds):
self._load_cmds = cmds
@property
def load_mode(self):
result = self.env_options.get("debug_load_mode")
if not result:
result = self.tool_settings.get("load_mode")
return result or ProjectOptions["env.debug_load_mode"].default
@property
def init_break(self):
missed = object()
result = self.env_options.get("debug_init_break", missed)
if result != missed:
return result
result = None
if not result:
result = self.tool_settings.get("init_break")
return result or ProjectOptions["env.debug_init_break"].default
@property
def init_cmds(self):
return self.cleanup_cmds(
self.env_options.get("debug_init_cmds", self.tool_settings.get("init_cmds"))
)
@property
def extra_cmds(self):
return self.cleanup_cmds(
self.env_options.get("debug_extra_cmds")
) + self.cleanup_cmds(self.tool_settings.get("extra_cmds"))
@property
def port(self):
return (
self._port
or self.env_options.get("debug_port")
or self.tool_settings.get("port")
or self.DEFAULT_PORT
)
@port.setter
def port(self, value):
self._port = value
@property
def upload_protocol(self):
return self.env_options.get(
"upload_protocol", self.board_config.get("upload", {}).get("protocol")
)
@property
def speed(self):
return self.env_options.get("debug_speed", self.tool_settings.get("speed"))
@property
def server_ready_pattern(self):
return self.env_options.get(
"debug_server_ready_pattern", (self.server or {}).get("ready_pattern")
)
def _load_build_data(self):
data = load_build_metadata(
os.getcwd(), self.env_name, cache=True, build_type="debug"
)
if not data:
raise DebugInvalidOptionsError("Could not load a build configuration")
return data
def _configure_server(self):
# user disabled server in platformio.ini
if "debug_server" in self.env_options and not self.env_options.get(
"debug_server"
):
return None
result = None
# specific server per a system
if isinstance(self.tool_settings.get("server", {}), list):
for item in self.tool_settings["server"][:]:
self.tool_settings["server"] = item
if util.get_systype() in item.get("system", []):
break
# user overwrites debug server
if self.env_options.get("debug_server"):
result = {
"cwd": None,
"executable": None,
"arguments": self.env_options.get("debug_server"),
}
result["executable"] = result["arguments"][0]
result["arguments"] = result["arguments"][1:]
elif "server" in self.tool_settings:
result = self.tool_settings["server"]
server_package = result.get("package")
server_package_dir = (
self.platform.get_package_dir(server_package)
if server_package
else None
)
if server_package and not server_package_dir:
self.platform.install_package(server_package)
server_package_dir = self.platform.get_package_dir(server_package)
result.update(
dict(
cwd=server_package_dir if server_package else None,
executable=result.get("executable"),
arguments=[
(
a.replace("$PACKAGE_DIR", server_package_dir)
if server_package_dir
else a
)
for a in result.get("arguments", [])
],
)
)
return self.reveal_patterns(result) if result else None
def get_init_script(self, debugger):
try:
return getattr(self, "%s_INIT_SCRIPT" % debugger.upper())
except AttributeError as exc:
raise NotImplementedError from exc
def reveal_patterns(self, source, recursive=True):
program_path = self.program_path or ""
patterns = {
"PLATFORMIO_CORE_DIR": self.project_config.get("platformio", "core_dir"),
"PYTHONEXE": proc.get_pythonexe_path(),
"PROJECT_DIR": os.getcwd(),
"PROG_PATH": program_path,
"PROG_DIR": os.path.dirname(program_path),
"PROG_NAME": os.path.basename(os.path.splitext(program_path)[0]),
"DEBUG_PORT": self.port,
"UPLOAD_PROTOCOL": self.upload_protocol,
"INIT_BREAK": self.init_break or "",
"LOAD_CMDS": "\n".join(self.load_cmds or []),
}
for key, value in patterns.items():
if key.endswith(("_DIR", "_PATH")):
patterns[key] = fs.to_unix_path(value)
def _replace(text):
for key, value in patterns.items():
pattern = "$%s" % key
text = text.replace(pattern, value or "")
return text
if isinstance(source, string_types):
source = _replace(source)
elif isinstance(source, (list, dict)):
items = enumerate(source) if isinstance(source, list) else source.items()
for key, value in items:
if isinstance(value, string_types):
source[key] = _replace(value)
elif isinstance(value, (list, dict)) and recursive:
source[key] = self.reveal_patterns(value, patterns)
data = json.dumps(source)
if any(("$" + key) in data for key in patterns):
source = self.reveal_patterns(source, patterns)
return source

View File

@@ -0,0 +1,71 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.debug.config.base import DebugConfigBase
from platformio.debug.exception import DebugInvalidOptionsError
from platformio.device.finder import SerialPortFinder, is_pattern_port
class BlackmagicDebugConfig(DebugConfigBase):
GDB_INIT_SCRIPT = """
define pio_reset_halt_target
set language c
set *0xE000ED0C = 0x05FA0004
set $busy = (*0xE000ED0C & 0x4)
while ($busy)
set $busy = (*0xE000ED0C & 0x4)
end
set language auto
end
define pio_reset_run_target
pio_reset_halt_target
end
target extended-remote $DEBUG_PORT
monitor swdp_scan
attach 1
set mem inaccessible-by-default off
$LOAD_CMDS
$INIT_BREAK
set language c
set *0xE000ED0C = 0x05FA0004
set $busy = (*0xE000ED0C & 0x4)
while ($busy)
set $busy = (*0xE000ED0C & 0x4)
end
set language auto
"""
@property
def port(self):
# pylint: disable=assignment-from-no-return
initial_port = DebugConfigBase.port.fget(self)
if initial_port and not is_pattern_port(initial_port):
return initial_port
port = SerialPortFinder(
board_config=self.board_config,
upload_protocol=self.tool_name,
prefer_gdb_port=True,
).find(initial_port)
if port:
return port
raise DebugInvalidOptionsError(
"Please specify `debug_port` for the working environment"
)
@port.setter
def port(self, value):
self._port = value

View File

@@ -0,0 +1,44 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import importlib
import re
from platformio.debug.config.generic import GenericDebugConfig
from platformio.debug.config.native import NativeDebugConfig
class DebugConfigFactory:
@staticmethod
def get_clsname(name):
name = re.sub(r"[^\da-z\_\-]+", "", name, flags=re.I)
return "%sDebugConfig" % name.lower().capitalize()
@classmethod
def new(cls, platform, project_config, env_name):
board_id = project_config.get("env:" + env_name, "board")
config_cls = None
tool_name = None
if board_id:
tool_name = platform.board_config(
project_config.get("env:" + env_name, "board")
).get_debug_tool_name(project_config.get("env:" + env_name, "debug_tool"))
try:
mod = importlib.import_module("platformio.debug.config.%s" % tool_name)
config_cls = getattr(mod, cls.get_clsname(tool_name))
except ModuleNotFoundError:
config_cls = (
GenericDebugConfig if platform.is_embedded() else NativeDebugConfig
)
return config_cls(platform, project_config, env_name)

View File

@@ -0,0 +1,34 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.debug.config.base import DebugConfigBase
class GenericDebugConfig(DebugConfigBase):
DEFAULT_PORT = ":3333"
GDB_INIT_SCRIPT = """
define pio_reset_halt_target
monitor reset halt
end
define pio_reset_run_target
monitor reset
end
target extended-remote $DEBUG_PORT
monitor init
$LOAD_CMDS
pio_reset_halt_target
$INIT_BREAK
"""

View File

@@ -0,0 +1,42 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.debug.config.base import DebugConfigBase
class JlinkDebugConfig(DebugConfigBase):
DEFAULT_PORT = ":2331"
GDB_INIT_SCRIPT = """
define pio_reset_halt_target
monitor reset
monitor halt
end
define pio_reset_run_target
monitor clrbp
monitor reset
monitor go
end
target extended-remote $DEBUG_PORT
monitor clrbp
monitor speed auto
pio_reset_halt_target
$LOAD_CMDS
$INIT_BREAK
"""
@property
def server_ready_pattern(self):
return super().server_ready_pattern or ("Waiting for GDB connection")

View File

@@ -0,0 +1,32 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.debug.config.base import DebugConfigBase
class MspdebugDebugConfig(DebugConfigBase):
DEFAULT_PORT = ":2000"
GDB_INIT_SCRIPT = """
define pio_reset_halt_target
end
define pio_reset_run_target
end
target remote $DEBUG_PORT
monitor erase
$LOAD_CMDS
pio_reset_halt_target
$INIT_BREAK
"""

View File

@@ -0,0 +1,31 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.compat import IS_WINDOWS
from platformio.debug.config.base import DebugConfigBase
class NativeDebugConfig(DebugConfigBase):
GDB_INIT_SCRIPT = """
define pio_reset_halt_target
end
define pio_reset_run_target
end
define pio_restart_target
end
$INIT_BREAK
""" + ("set startup-with-shell off" if not IS_WINDOWS else "")

View File

@@ -0,0 +1,33 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.debug.config.base import DebugConfigBase
class QemuDebugConfig(DebugConfigBase):
DEFAULT_PORT = ":1234"
GDB_INIT_SCRIPT = """
define pio_reset_halt_target
monitor system_reset
end
define pio_reset_run_target
monitor system_reset
end
target extended-remote $DEBUG_PORT
$LOAD_CMDS
pio_reset_halt_target
$INIT_BREAK
"""

View File

@@ -0,0 +1,41 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.debug.config.base import DebugConfigBase
class RenodeDebugConfig(DebugConfigBase):
DEFAULT_PORT = ":3333"
GDB_INIT_SCRIPT = """
define pio_reset_halt_target
monitor machine Reset
$LOAD_CMDS
monitor start
end
define pio_reset_run_target
pio_reset_halt_target
end
target extended-remote $DEBUG_PORT
$LOAD_CMDS
$INIT_BREAK
monitor start
"""
@property
def server_ready_pattern(self):
return super().server_ready_pattern or (
"GDB server with all CPUs started on port"
)

View File

@@ -0,0 +1,36 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from platformio.exception import PlatformioException, UserSideException
class DebugError(PlatformioException):
pass
class DebugSupportError(DebugError, UserSideException):
MESSAGE = (
"Currently, PlatformIO does not support debugging for `{0}`.\n"
"Please request support at https://github.com/platformio/"
"platformio-core/issues \nor visit -> https://docs.platformio.org"
"/page/plus/debugging.html"
)
class DebugInvalidOptionsError(DebugError, UserSideException):
pass
class DebugInitError(DebugError, UserSideException):
pass

View File

@@ -0,0 +1,160 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import sys
import time
from hashlib import sha1
from io import BytesIO
from platformio.cli import PlatformioCLI
from platformio.compat import is_bytes
from platformio.debug.exception import DebugInvalidOptionsError
from platformio.run.cli import cli as cmd_run
from platformio.run.cli import print_processing_header
from platformio.test.helpers import list_test_names
from platformio.test.result import TestSuite
from platformio.test.runners.base import TestRunnerOptions
from platformio.test.runners.factory import TestRunnerFactory
class GDBMIConsoleStream(BytesIO): # pylint: disable=too-few-public-methods
STDOUT = sys.stdout
def write(self, text):
self.STDOUT.write(escape_gdbmi_stream("~", text))
self.STDOUT.flush()
def is_gdbmi_mode():
return "--interpreter" in " ".join(PlatformioCLI.leftover_args)
def escape_gdbmi_stream(prefix, stream):
bytes_stream = False
if is_bytes(stream):
bytes_stream = True
stream = stream.decode()
if not stream:
return b"" if bytes_stream else ""
ends_nl = stream.endswith("\n")
stream = re.sub(r"\\+", "\\\\\\\\", stream)
stream = stream.replace('"', '\\"')
stream = stream.replace("\n", "\\n")
stream = '%s"%s"' % (prefix, stream)
if ends_nl:
stream += "\n"
return stream.encode() if bytes_stream else stream
def get_default_debug_env(config):
default_envs = config.default_envs()
all_envs = config.envs()
for env in default_envs:
if config.get("env:" + env, "build_type") == "debug":
return env
for env in all_envs:
if config.get("env:" + env, "build_type") == "debug":
return env
return default_envs[0] if default_envs else all_envs[0]
def predebug_project(
ctx, project_dir, project_config, env_name, preload, verbose
): # pylint: disable=too-many-arguments,too-many-positional-arguments
debug_testname = project_config.get("env:" + env_name, "debug_test")
if debug_testname:
test_names = list_test_names(project_config)
if debug_testname not in test_names:
raise DebugInvalidOptionsError(
"Unknown test name `%s`. Valid names are `%s`"
% (debug_testname, ", ".join(test_names))
)
print_processing_header(env_name, project_config, verbose)
test_runner = TestRunnerFactory.new(
TestSuite(env_name, debug_testname),
project_config,
TestRunnerOptions(
verbose=3 if verbose else 0,
without_building=False,
without_debugging=False,
without_uploading=not preload,
without_testing=True,
),
)
test_runner.start(ctx)
else:
ctx.invoke(
cmd_run,
project_dir=project_dir,
project_conf=project_config.path,
environment=[env_name],
target=["__debug"] + (["upload"] if preload else []),
verbose=verbose,
)
if preload:
time.sleep(5)
def has_debug_symbols(prog_path):
if not os.path.isfile(prog_path):
return False
matched = {
b".debug_info": False,
b".debug_abbrev": False,
b" -Og": False,
b" -g": False,
# b"__PLATFORMIO_BUILD_DEBUG__": False,
}
with open(prog_path, "rb") as fp:
last_data = b""
while True:
data = fp.read(1024)
if not data:
break
for pattern, found in matched.items():
if found:
continue
if pattern in last_data + data:
matched[pattern] = True
last_data = data
return all(matched.values())
def is_prog_obsolete(prog_path):
prog_hash_path = prog_path + ".sha1"
if not os.path.isfile(prog_path):
return True
shasum = sha1()
with open(prog_path, "rb") as fp:
while True:
data = fp.read(1024)
if not data:
break
shasum.update(data)
new_digest = shasum.hexdigest()
old_digest = None
if os.path.isfile(prog_hash_path):
with open(prog_hash_path, encoding="utf8") as fp:
old_digest = fp.read()
if new_digest == old_digest:
return False
with open(prog_hash_path, mode="w", encoding="utf8") as fp:
fp.write(new_digest)
return True

View File

@@ -0,0 +1,13 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@@ -0,0 +1,154 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import signal
import subprocess
import sys
import time
from platformio.compat import (
IS_WINDOWS,
aio_create_task,
aio_get_running_loop,
get_locale_encoding,
)
class DebugSubprocessProtocol(asyncio.SubprocessProtocol):
def __init__(self, factory):
self.factory = factory
self._is_exited = False
def connection_made(self, transport):
self.factory.connection_made(transport)
def pipe_data_received(self, fd, data):
pipe_to_cb = [
self.factory.stdin_data_received,
self.factory.stdout_data_received,
self.factory.stderr_data_received,
]
pipe_to_cb[fd](data)
def connection_lost(self, exc):
self.process_exited()
def process_exited(self):
if self._is_exited:
return
self.factory.process_exited()
self._is_exited = True
class DebugBaseProcess:
STDOUT_CHUNK_SIZE = 2048
LOG_FILE = None
def __init__(self):
self.transport = None
self._is_running = False
self._last_activity = 0
self._exit_future = None
self._stdin_read_task = None
self._std_encoding = get_locale_encoding()
async def spawn(self, *args, **kwargs):
wait_until_exit = False
if "wait_until_exit" in kwargs:
wait_until_exit = kwargs["wait_until_exit"]
del kwargs["wait_until_exit"]
for pipe in ("stdin", "stdout", "stderr"):
if pipe not in kwargs:
kwargs[pipe] = subprocess.PIPE
loop = aio_get_running_loop()
await loop.subprocess_exec(
lambda: DebugSubprocessProtocol(self), *args, **kwargs
)
if wait_until_exit:
self._exit_future = loop.create_future()
await self._exit_future
def is_running(self):
return self._is_running
def connection_made(self, transport):
self._is_running = True
self.transport = transport
def connect_stdin_pipe(self):
self._stdin_read_task = aio_create_task(self._read_stdin_pipe())
async def _read_stdin_pipe(self):
loop = aio_get_running_loop()
if IS_WINDOWS:
while True:
self.stdin_data_received(
await loop.run_in_executor(None, sys.stdin.buffer.readline)
)
else:
reader = asyncio.StreamReader()
protocol = asyncio.StreamReaderProtocol(reader)
await loop.connect_read_pipe(lambda: protocol, sys.stdin)
while True:
self.stdin_data_received(await reader.readline())
def stdin_data_received(self, data):
self._last_activity = time.time()
if self.LOG_FILE:
with open(self.LOG_FILE, "ab") as fp:
fp.write(data)
def stdout_data_received(self, data):
self._last_activity = time.time()
if self.LOG_FILE:
with open(self.LOG_FILE, "ab") as fp:
fp.write(data)
while data:
chunk = data[: self.STDOUT_CHUNK_SIZE]
print(chunk.decode(self._std_encoding, "replace"), end="", flush=True)
data = data[self.STDOUT_CHUNK_SIZE :]
def stderr_data_received(self, data):
self._last_activity = time.time()
if self.LOG_FILE:
with open(self.LOG_FILE, "ab") as fp:
fp.write(data)
print(
data.decode(self._std_encoding, "replace"),
end="",
file=sys.stderr,
flush=True,
)
def process_exited(self):
self._is_running = False
self._last_activity = time.time()
# Allow terminating via SIGINT/CTRL+C
signal.signal(signal.SIGINT, signal.default_int_handler)
if self._stdin_read_task:
self._stdin_read_task.cancel()
self._stdin_read_task = None
if self._exit_future:
self._exit_future.set_result(True)
self._exit_future = None
def terminate(self):
if not self.is_running() or not self.transport:
return
try:
self.transport.kill()
self.transport.close()
except: # pylint: disable=bare-except
pass

View File

@@ -0,0 +1,104 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import hashlib
import os
import signal
import tempfile
from platformio import fs, proc
from platformio.cache import ContentCache
from platformio.compat import IS_WINDOWS, hashlib_encode_data
from platformio.debug.process.base import DebugBaseProcess
from platformio.debug.process.server import DebugServerProcess
from platformio.project.helpers import get_project_cache_dir
class DebugClientProcess(DebugBaseProcess):
def __init__(self, project_dir, debug_config):
super().__init__()
self.project_dir = project_dir
self.debug_config = debug_config
self._server_process = None
self._session_id = None
if not os.path.isdir(get_project_cache_dir()):
os.makedirs(get_project_cache_dir())
self.working_dir = tempfile.mkdtemp(
dir=get_project_cache_dir(), prefix=".piodebug-"
)
self._target_is_running = False
self._errors_buffer = b""
async def run(self):
session_hash = (
self.debug_config.client_executable_path + self.debug_config.program_path
)
self._session_id = hashlib.sha1(hashlib_encode_data(session_hash)).hexdigest()
self._kill_previous_session()
if self.debug_config.server:
self._server_process = DebugServerProcess(self.debug_config)
self.debug_config.port = await self._server_process.run()
def connection_made(self, transport):
super().connection_made(transport)
self._lock_session(transport.get_pid())
# Disable SIGINT and allow GDB's Ctrl+C interrupt
signal.signal(signal.SIGINT, lambda *args, **kwargs: None)
self.connect_stdin_pipe()
def process_exited(self):
if self._server_process:
self._server_process.terminate()
super().process_exited()
def close(self):
self._unlock_session()
if self.working_dir and os.path.isdir(self.working_dir):
fs.rmtree(self.working_dir)
def __del__(self):
self.close()
def _kill_previous_session(self):
assert self._session_id
pid = None
with ContentCache() as cc:
pid = cc.get(self._session_id)
cc.delete(self._session_id)
if not pid:
return
if IS_WINDOWS:
kill = ["Taskkill", "/PID", pid, "/F"]
else:
kill = ["kill", pid]
try:
proc.exec_command(kill)
except: # pylint: disable=bare-except
pass
def _lock_session(self, pid):
if not self._session_id:
return
with ContentCache() as cc:
cc.set(self._session_id, str(pid), "1h")
def _unlock_session(self):
if not self._session_id:
return
with ContentCache() as cc:
cc.delete(self._session_id)

View File

@@ -0,0 +1,181 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import signal
import time
from platformio import telemetry
from platformio.compat import aio_get_running_loop, is_bytes
from platformio.debug import helpers
from platformio.debug.exception import DebugInitError
from platformio.debug.process.client import DebugClientProcess
class GDBClientProcess(DebugClientProcess):
PIO_SRC_NAME = ".pioinit"
INIT_COMPLETED_BANNER = "PlatformIO: Initialization completed"
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self._target_is_running = False
self._errors_buffer = b""
async def run(self, extra_args): # pylint: disable=arguments-differ
await super().run()
self.generate_init_script(os.path.join(self.working_dir, self.PIO_SRC_NAME))
gdb_path = self.debug_config.client_executable_path or "gdb"
# start GDB client
args = [
gdb_path,
"-q",
"--directory",
self.working_dir,
"--directory",
self.project_dir,
"-l",
"10",
]
args.extend(list(extra_args or []))
gdb_data_dir = self._get_data_dir(gdb_path)
if gdb_data_dir:
args.extend(["--data-directory", gdb_data_dir])
args.append(self.debug_config.program_path)
await self.spawn(*args, cwd=self.project_dir, wait_until_exit=True)
@staticmethod
def _get_data_dir(gdb_path):
if "msp430" in gdb_path:
return None
gdb_data_dir = os.path.abspath(
os.path.join(os.path.dirname(gdb_path), "..", "share", "gdb")
)
return gdb_data_dir if os.path.isdir(gdb_data_dir) else None
def generate_init_script(self, dst):
# default GDB init commands depending on debug tool
commands = self.debug_config.get_init_script("gdb").split("\n")
if self.debug_config.init_cmds:
commands = self.debug_config.init_cmds
commands.extend(self.debug_config.extra_cmds)
if not any("define pio_reset_run_target" in cmd for cmd in commands):
commands = [
"define pio_reset_run_target",
" echo Warning! Undefined pio_reset_run_target command\\n",
" monitor reset",
"end",
] + commands
if not any("define pio_reset_halt_target" in cmd for cmd in commands):
commands = [
"define pio_reset_halt_target",
" echo Warning! Undefined pio_reset_halt_target command\\n",
" monitor reset halt",
"end",
] + commands
if not any("define pio_restart_target" in cmd for cmd in commands):
commands += [
"define pio_restart_target",
" pio_reset_halt_target",
" $INIT_BREAK",
" %s" % ("continue" if self.debug_config.init_break else "next"),
"end",
]
banner = [
"echo PlatformIO Unified Debugger -> https://bit.ly/pio-debug\\n",
"echo PlatformIO: debug_tool = %s\\n" % self.debug_config.tool_name,
"echo PlatformIO: Initializing remote target...\\n",
]
footer = ["echo %s\\n" % self.INIT_COMPLETED_BANNER]
commands = banner + commands + footer
with open(dst, mode="w", encoding="utf8") as fp:
fp.write("\n".join(self.debug_config.reveal_patterns(commands)))
def stdin_data_received(self, data):
super().stdin_data_received(data)
if b"-exec-run" in data:
if self._target_is_running:
token, _ = data.split(b"-", 1)
self.stdout_data_received(token + b"^running\n")
return
if self.debug_config.platform.is_embedded():
data = data.replace(b"-exec-run", b"-exec-continue")
if b"-exec-continue" in data:
self._target_is_running = True
if b"-gdb-exit" in data or data.strip() in (b"q", b"quit"):
# Allow terminating via SIGINT/CTRL+C
signal.signal(signal.SIGINT, signal.default_int_handler)
self.transport.get_pipe_transport(0).write(b"pio_reset_run_target\n")
self.transport.get_pipe_transport(0).write(data)
def stdout_data_received(self, data):
super().stdout_data_received(data)
self._handle_error(data)
# go to init break automatically
if self.INIT_COMPLETED_BANNER.encode() in data:
telemetry.log_debug_started(self.debug_config)
self._auto_exec_continue()
def console_log(self, msg):
if helpers.is_gdbmi_mode():
msg = helpers.escape_gdbmi_stream("~", msg)
self.stdout_data_received(msg if is_bytes(msg) else msg.encode())
def _auto_exec_continue(self):
auto_exec_delay = 0.5 # in seconds
if self._last_activity > (time.time() - auto_exec_delay):
aio_get_running_loop().call_later(0.1, self._auto_exec_continue)
return
if not self.debug_config.init_break or self._target_is_running:
return
self.console_log(
"PlatformIO: Resume the execution to `debug_init_break = %s`\n"
% self.debug_config.init_break
)
self.console_log(
"PlatformIO: More configuration options -> https://bit.ly/pio-debug\n"
)
if self.debug_config.platform.is_embedded():
self.transport.get_pipe_transport(0).write(
b"0-exec-continue\n" if helpers.is_gdbmi_mode() else b"continue\n"
)
else:
self.transport.get_pipe_transport(0).write(
b"0-exec-run\n" if helpers.is_gdbmi_mode() else b"run\n"
)
self._target_is_running = True
def stderr_data_received(self, data):
super().stderr_data_received(data)
self._handle_error(data)
def _handle_error(self, data):
self._errors_buffer = (self._errors_buffer + data)[-8192:] # keep last 8 KBytes
if not (
self.PIO_SRC_NAME.encode() in self._errors_buffer
and b"Error in sourced" in self._errors_buffer
):
return
telemetry.log_debug_exception(
DebugInitError(self._errors_buffer.decode()), self.debug_config
)
self.transport.close()

View File

@@ -0,0 +1,149 @@
# Copyright (c) 2014-present PlatformIO <contact@platformio.org>
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
import os
import re
import time
from platformio import fs
from platformio.compat import IS_MACOS, IS_WINDOWS
from platformio.debug.exception import DebugInvalidOptionsError
from platformio.debug.helpers import escape_gdbmi_stream, is_gdbmi_mode
from platformio.debug.process.base import DebugBaseProcess
from platformio.proc import where_is_program
class DebugServerProcess(DebugBaseProcess):
STD_BUFFER_SIZE = 1024
def __init__(self, debug_config):
super().__init__()
self.debug_config = debug_config
self._ready = False
self._std_buffer = {"out": b"", "err": b""}
async def run(self): # pylint: disable=too-many-branches
server = self.debug_config.server
if not server:
return None
server_executable = server["executable"]
if not server_executable:
return None
if server["cwd"]:
server_executable = os.path.join(server["cwd"], server_executable)
if (
IS_WINDOWS
and not server_executable.endswith(".exe")
and os.path.isfile(server_executable + ".exe")
):
server_executable = server_executable + ".exe"
if not os.path.isfile(server_executable):
server_executable = where_is_program(server_executable)
if not os.path.isfile(server_executable):
raise DebugInvalidOptionsError(
"Could not launch Debug Server '%s'. Please check that it "
"is installed and is included in a system PATH\n"
"See https://docs.platformio.org/page/plus/debugging.html"
% server_executable
)
openocd_pipe_allowed = all(
[
not self.debug_config.env_options.get(
"debug_port", self.debug_config.tool_settings.get("port")
),
"gdb" in self.debug_config.client_executable_path,
"openocd" in server_executable,
]
)
if openocd_pipe_allowed:
args = []
if server["cwd"]:
args.extend(["-s", server["cwd"]])
args.extend(
["-c", "gdb_port pipe; tcl_port disabled; telnet_port disabled"]
)
args.extend(server["arguments"])
str_args = " ".join(
[arg if arg.startswith("-") else '"%s"' % arg for arg in args]
)
return fs.to_unix_path('| "%s" %s' % (server_executable, str_args))
env = os.environ.copy()
# prepend server "lib" folder to LD path
if (
not IS_WINDOWS
and server["cwd"]
and os.path.isdir(os.path.join(server["cwd"], "lib"))
):
ld_key = "DYLD_LIBRARY_PATH" if IS_MACOS else "LD_LIBRARY_PATH"
env[ld_key] = os.path.join(server["cwd"], "lib")
if os.environ.get(ld_key):
env[ld_key] = "%s:%s" % (env[ld_key], os.environ.get(ld_key))
# prepend BIN to PATH
if server["cwd"] and os.path.isdir(os.path.join(server["cwd"], "bin")):
env["PATH"] = "%s%s%s" % (
os.path.join(server["cwd"], "bin"),
os.pathsep,
os.environ.get("PATH", os.environ.get("Path", "")),
)
await self.spawn(
*([server_executable] + server["arguments"]), cwd=server["cwd"], env=env
)
await self._wait_until_ready()
return self.debug_config.port
async def _wait_until_ready(self):
ready_pattern = self.debug_config.server_ready_pattern
timeout = 60 if ready_pattern else 10
elapsed = 0
delay = 0.5
auto_ready_delay = 0.5
while not self._ready and self.is_running() and elapsed < timeout:
await asyncio.sleep(delay)
if not ready_pattern:
self._ready = self._last_activity < (time.time() - auto_ready_delay)
elapsed += delay
def _check_ready_by_pattern(self, data):
if self._ready:
return self._ready
ready_pattern = self.debug_config.server_ready_pattern
if ready_pattern:
if ready_pattern.startswith("^"):
self._ready = re.match(
ready_pattern,
data.decode("utf-8", "ignore"),
)
else:
self._ready = ready_pattern.encode() in data
return self._ready
def stdout_data_received(self, data):
super().stdout_data_received(
escape_gdbmi_stream("@", data) if is_gdbmi_mode() else data
)
self._std_buffer["out"] += data
self._check_ready_by_pattern(self._std_buffer["out"])
self._std_buffer["out"] = self._std_buffer["out"][-1 * self.STD_BUFFER_SIZE :]
def stderr_data_received(self, data):
super().stderr_data_received(data)
self._std_buffer["err"] += data
self._check_ready_by_pattern(self._std_buffer["err"])
self._std_buffer["err"] = self._std_buffer["err"][-1 * self.STD_BUFFER_SIZE :]