git » listen-drop-and-exec » master » tree

[master] / listen-drop-and-exec

#!/usr/bin/env python3
"""Helper that opens sockets, drops privileges and then executes a command

This utility can open TCP or UDP sockets, then drop privileges to the given
user and group, and finally executing the given command.

It can be used to run programs that expect systemd (or equivalent) to open
sockets and drop privileges for them. This can be useful when launching them
manually, or from init shell scripts.

The LISTEN_FDS, LISTEN_FDNAMES and LISTEN_PID environment variables will be
set following systemd's conventions.

Example:
  # listen-drop-and-exec \\
      --user=nobody --group=nogroup \\
      --listen_udp=53=dns_on_udp \\
      --listen_tcp=53=dns_on_tcp --listen_tcp="[::1]:8080=monitoring" \\
      dns-server --sockets_from_systemd
"""

# Copyright 2016
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import grp
import ipaddress
import os
import pwd
import socket
import subprocess
import sys

parser = argparse.ArgumentParser(
		allow_abbrev=False, description=__doc__,
		formatter_class=argparse.RawDescriptionHelpFormatter)
parser.add_argument("--listen_tcp", help="TCP port to listen on",
		default=[], action="append")
parser.add_argument("--listen_udp", help="UDP port to listen on",
		default=[], action="append")
parser.add_argument("--user", help="user to run the command as")
parser.add_argument("--group", help="group to run the command as")
parser.add_argument("command", help="command to run", metavar="COMMAND")
parser.add_argument("args", help="arguments for the command",
		nargs=argparse.REMAINDER)
args, _ = parser.parse_known_args()


def is_ipv4(addr):
	try:
		return ipaddress.ip_address(addr).version == 4
	except ValueError:
		return False

def is_ipv6(addr):
	try:
		return ipaddress.ip_address(addr).version == 6
	except ValueError:
		return False


# Open sockets.
addrs = ([(socket.SOCK_STREAM, a) for a in args.listen_tcp] +
		 [(socket.SOCK_DGRAM, a) for a in args.listen_udp])
socks = []
for stype, addr in addrs:
	name = ""
	if '=' in addr:
		addr, name = addr.split("=", 1)

	# addr can be "1.2.3.4:1111", "[::1:2]:1111" or just "1111".
	if ":" in addr:
		addr = addr.rsplit(":", 1)
		if '[' in addr[0]:
			addr[0] = addr[0][1:-1]
		addr = (addr[0], int(addr[1]))
	else:
		addr = ("", int(addr))

	host = addr[0]
	if host and is_ipv6(host):
		s = socket.socket(socket.AF_INET6, stype)
	elif host and is_ipv4(host):
		s = socket.socket(socket.AF_INET4, stype)
	else:
		s = socket.socket(type=stype)

	s.bind(addr)
	if stype == socket.SOCK_STREAM:
		s.listen()
	socks.append((s, name))


# Drop privileges.
if args.user or args.group:
	os.setgroups([])
	if args.group:
		gid = grp.getgrnam(args.group).gr_gid
		os.setgid(gid)
	if args.user:
		u = pwd.getpwnam(args.user)
		os.setuid(u.pw_uid)
		os.environ['USER'] = args.user
		os.environ['HOME'] = u.pw_dir


# Prepare environment.
fds = [s.fileno() for s, _ in socks]
os.environ['LISTEN_FDS'] = str(len(fds))
os.environ['LISTEN_FDNAMES'] = ":".join(name for _, name in socks)
os.environ['LISTEN_PID'] = str(os.getpid())

for fd in [0, 1, 2] + fds:
	os.set_inheritable(fd, True)


# Execute.
os.execvp(args.command, [args.command] + args.args)