Skip to content

Commit

Permalink
PEP 656 musllinux support (#411)
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr authored May 3, 2021
1 parent ae2c72e commit c32c969
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 0 deletions.
136 changes: 136 additions & 0 deletions packaging/_musllinux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
"""PEP 656 support.
This module implements logic to detect if the currently running Python is
linked against musl, and what musl version is used.
"""

import contextlib
import functools
import operator
import os
import re
import struct
import subprocess
import sys
from typing import IO, Iterator, NamedTuple, Optional, Tuple


def _read_unpacked(f: IO[bytes], fmt: str) -> Tuple[int, ...]:
return struct.unpack(fmt, f.read(struct.calcsize(fmt)))


def _parse_ld_musl_from_elf(f: IO[bytes]) -> Optional[str]:
"""Detect musl libc location by parsing the Python executable.
Based on: https://gist.github.com/lyssdod/f51579ae8d93c8657a5564aefc2ffbca
ELF header: https://refspecs.linuxfoundation.org/elf/gabi4+/ch4.eheader.html
"""
f.seek(0)
try:
ident = _read_unpacked(f, "16B")
except struct.error:
return None
if ident[:4] != tuple(b"\x7fELF"): # Invalid magic, not ELF.
return None
f.seek(struct.calcsize("HHI"), 1) # Skip file type, machine, and version.

try:
# e_fmt: Format for program header.
# p_fmt: Format for section header.
# p_idx: Indexes to find p_type, p_offset, and p_filesz.
e_fmt, p_fmt, p_idx = {
1: ("IIIIHHH", "IIIIIIII", (0, 1, 4)), # 32-bit.
2: ("QQQIHHH", "IIQQQQQQ", (0, 2, 5)), # 64-bit.
}[ident[4]]
except KeyError:
return None
else:
p_get = operator.itemgetter(*p_idx)

# Find the interpreter section and return its content.
try:
_, e_phoff, _, _, _, e_phentsize, e_phnum = _read_unpacked(f, e_fmt)
except struct.error:
return None
for i in range(e_phnum + 1):
f.seek(e_phoff + e_phentsize * i)
try:
p_type, p_offset, p_filesz = p_get(_read_unpacked(f, p_fmt))
except struct.error:
return None
if p_type != 3: # Not PT_INTERP.
continue
f.seek(p_offset)
interpreter = os.fsdecode(f.read(p_filesz)).strip("\0")
if "musl" not in interpreter:
return None
return interpreter
return None


class _MuslVersion(NamedTuple):
major: int
minor: int


def _parse_musl_version(output: str) -> Optional[_MuslVersion]:
lines = [n for n in (n.strip() for n in output.splitlines()) if n]
if len(lines) < 2 or lines[0][:4] != "musl":
return None
m = re.match(r"Version (\d+)\.(\d+)", lines[1])
if not m:
return None
return _MuslVersion(major=int(m.group(1)), minor=int(m.group(2)))


@functools.lru_cache()
def _get_musl_version(executable: str) -> Optional[_MuslVersion]:
"""Detect currently-running musl runtime version.
This is done by checking the specified executable's dynamic linking
information, and invoking the loader to parse its output for a version
string. If the loader is musl, the output would be something like::
musl libc (x86_64)
Version 1.2.2
Dynamic Program Loader
"""
with contextlib.ExitStack() as stack:
try:
f = stack.enter_context(open(executable, "rb"))
except IOError:
return None
ld = _parse_ld_musl_from_elf(f)
if not ld:
return None
proc = subprocess.run([ld], stderr=subprocess.PIPE, universal_newlines=True)
return _parse_musl_version(proc.stderr)


def platform_tags(arch: str) -> Iterator[str]:
"""Generate musllinux tags compatible to the current platform.
:param arch: Should be the part of platform tag after the ``linux_``
prefix, e.g. ``x86_64``. The ``linux_`` prefix is assumed as a
prerequisite for the current platform to be musllinux-compatible.
:returns: An iterator of compatible musllinux tags.
"""
sys_musl = _get_musl_version(sys.executable)
if sys_musl is None: # Python not dynamically linked against musl.
return
for minor in range(sys_musl.minor, -1, -1):
yield f"musllinux_{sys_musl.major}_{minor}_{arch}"


if __name__ == "__main__": # pragma: no cover
import sysconfig

plat = sysconfig.get_platform()
assert plat.startswith("linux-"), "not linux"

print("plat:", plat)
print("musl:", _get_musl_version(sys.executable))
print("tags:", end=" ")
for t in platform_tags(re.sub(r"[.-]", "_", plat.split("-", 1)[-1])):
print(t, end="\n ")
3 changes: 3 additions & 0 deletions packaging/tags.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
cast,
)

from . import _musllinux

logger = logging.getLogger(__name__)

PythonVersion = Sequence[int]
Expand Down Expand Up @@ -723,6 +725,7 @@ def _linux_platforms(is_32bit: bool = _32_BIT_INTERPRETER) -> Iterator[str]:
_, arch = linux.split("_", 1)
if _have_compatible_manylinux_abi(arch):
yield from _manylinux_tags(linux, arch)
yield from _musllinux.platform_tags(arch)
yield linux


Expand Down
61 changes: 61 additions & 0 deletions tests/musllinux/build.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
# Build helper binaries for musllinux tests.
# Usages:
# build.sh # Build everything.
# build.sh $DISTRO $ARCH # Build one executable in $ARCH using $DISTRO.
#
# Either invocation ultimately runs this script in a Docker container with
# `build.sh glibc|musl $ARCH` to actually build the executable.

set -euo pipefail
set -x

UBUNTU_VERSION='focal'
ALPINE_VERSION='v3.13'

build_one_in_ubuntu () {
$1 "multiarch/ubuntu-core:${2}-${UBUNTU_VERSION}" \
bash "/home/hello-world/musllinux/build.sh" glibc "glibc-${2}"
}

build_one_in_alpine () {
$1 "multiarch/alpine:${2}-${ALPINE_VERSION}" \
sh "/home/hello-world/musllinux/build.sh" musl "musl-${2}"
}

build_in_container () {
local SOURCE="$(dirname $(dirname $(realpath ${BASH_SOURCE[0]})))"
DOCKER="docker run --rm -v ${SOURCE}:/home/hello-world"

if [[ $# -ne 0 ]]; then
"build_one_in_${1}" "$DOCKER" "$2"
return
fi

build_one_in_alpine "$DOCKER" x86_64
build_one_in_alpine "$DOCKER" i386
build_one_in_alpine "$DOCKER" aarch64
build_one_in_ubuntu "$DOCKER" x86_64
}

if [[ $# -eq 0 ]]; then
build_in_container
exit 0
elif [[ "$1" == "glibc" ]]; then
DEBIAN_FRONTEND=noninteractive apt-get update -qq \
&& apt-get install -qqy --no-install-recommends gcc libc6-dev
elif [[ "$1" == "musl" ]]; then
apk add -q build-base
else
build_in_container "$@"
exit 0
fi

build () {
local CFLAGS=""
local OUT="/home/hello-world/musllinux/${2}"
gcc -Os ${CFLAGS} -o "${OUT}-full" "/home/hello-world/hello-world.c"
head -c1024 "${OUT}-full" > "$OUT"
rm -f "${OUT}-full"
}

build "$@"
Binary file added tests/musllinux/glibc-x86_64
Binary file not shown.
Binary file added tests/musllinux/musl-aarch64
Binary file not shown.
Binary file added tests/musllinux/musl-i386
Binary file not shown.
Binary file added tests/musllinux/musl-x86_64
Binary file not shown.
140 changes: 140 additions & 0 deletions tests/test_musllinux.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
import collections
import io
import pathlib
import struct
import subprocess

import pretend
import pytest

from packaging import _musllinux
from packaging._musllinux import (
_get_musl_version,
_MuslVersion,
_parse_ld_musl_from_elf,
_parse_musl_version,
)

MUSL_AMD64 = "musl libc (x86_64)\nVersion 1.2.2\n"
MUSL_I386 = "musl libc (i386)\nVersion 1.2.1\n"
MUSL_AARCH64 = "musl libc (aarch64)\nVersion 1.1.24\n"
MUSL_INVALID = "musl libc (invalid)\n"
MUSL_UNKNOWN = "musl libc (unknown)\nVersion unknown\n"

MUSL_DIR = pathlib.Path(__file__).with_name("musllinux").resolve()

BIN_GLIBC_X86_64 = MUSL_DIR.joinpath("glibc-x86_64")
BIN_MUSL_X86_64 = MUSL_DIR.joinpath("musl-x86_64")
BIN_MUSL_I386 = MUSL_DIR.joinpath("musl-i386")
BIN_MUSL_AARCH64 = MUSL_DIR.joinpath("musl-aarch64")

LD_MUSL_X86_64 = "/lib/ld-musl-x86_64.so.1"
LD_MUSL_I386 = "/lib/ld-musl-i386.so.1"
LD_MUSL_AARCH64 = "/lib/ld-musl-aarch64.so.1"


@pytest.mark.parametrize(
"output, version",
[
(MUSL_AMD64, _MuslVersion(1, 2)),
(MUSL_I386, _MuslVersion(1, 2)),
(MUSL_AARCH64, _MuslVersion(1, 1)),
(MUSL_INVALID, None),
(MUSL_UNKNOWN, None),
],
ids=["amd64-1.2.2", "i386-1.2.1", "aarch64-1.1.24", "invalid", "unknown"],
)
def test_parse_musl_version(output, version):
assert _parse_musl_version(output) == version


@pytest.mark.parametrize(
"executable, location",
[
(BIN_GLIBC_X86_64, None),
(BIN_MUSL_X86_64, LD_MUSL_X86_64),
(BIN_MUSL_I386, LD_MUSL_I386),
(BIN_MUSL_AARCH64, LD_MUSL_AARCH64),
],
ids=["glibc", "x86_64", "i386", "aarch64"],
)
def test_parse_ld_musl_from_elf(executable, location):
with executable.open("rb") as f:
assert _parse_ld_musl_from_elf(f) == location


@pytest.mark.parametrize(
"data",
[
# Too short for magic.
b"\0",
# Enough for magic, but not ELF.
b"#!/bin/bash" + b"\0" * 16,
# ELF, but unknown byte declaration.
b"\x7fELF\3" + b"\0" * 16,
],
ids=["no-magic", "wrong-magic", "unknown-format"],
)
def test_parse_ld_musl_from_elf_invalid(data):
assert _parse_ld_musl_from_elf(io.BytesIO(data)) is None


@pytest.mark.parametrize(
"head",
[
25, # Enough for magic, but not the section definitions.
58, # Enough for section definitions, but not the actual sections.
],
)
def test_parse_ld_musl_from_elf_invalid_section(head):
data = BIN_MUSL_X86_64.read_bytes()[:head]
assert _parse_ld_musl_from_elf(io.BytesIO(data)) is None


def test_parse_ld_musl_from_elf_no_interpreter_section():
with BIN_MUSL_X86_64.open("rb") as f:
data = f.read()

# Change all sections to *not* PT_INTERP.
unpacked = struct.unpack("16BHHIQQQIHHH", data[:58])
*_, e_phoff, _, _, _, e_phentsize, e_phnum = unpacked
for i in range(e_phnum + 1):
sb = e_phoff + e_phentsize * i
se = sb + 56
section = struct.unpack("IIQQQQQQ", data[sb:se])
data = data[:sb] + struct.pack("IIQQQQQQ", 0, *section[1:]) + data[se:]

assert _parse_ld_musl_from_elf(io.BytesIO(data)) is None


@pytest.mark.parametrize(
"executable, output, version, ld_musl",
[
(MUSL_DIR.joinpath("does-not-exist"), "error", None, None),
(BIN_GLIBC_X86_64, "error", None, None),
(BIN_MUSL_X86_64, MUSL_AMD64, _MuslVersion(1, 2), LD_MUSL_X86_64),
(BIN_MUSL_I386, MUSL_I386, _MuslVersion(1, 2), LD_MUSL_I386),
(BIN_MUSL_AARCH64, MUSL_AARCH64, _MuslVersion(1, 1), LD_MUSL_AARCH64),
],
ids=["does-not-exist", "glibc", "x86_64", "i386", "aarch64"],
)
def test_get_musl_version(monkeypatch, executable, output, version, ld_musl):
def mock_run(*args, **kwargs):
return collections.namedtuple("Proc", "stderr")(output)

run_recorder = pretend.call_recorder(mock_run)
monkeypatch.setattr(_musllinux.subprocess, "run", run_recorder)

assert _get_musl_version(str(executable)) == version

if ld_musl is not None:
expected_calls = [
pretend.call(
[ld_musl],
stderr=subprocess.PIPE,
universal_newlines=True,
)
]
else:
expected_calls = []
assert run_recorder.calls == expected_calls
Loading

0 comments on commit c32c969

Please sign in to comment.