Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

decrypt won't work in cygwin because it calls mpeg with absolute paths #198

Open
philgoetz opened this issue Mar 27, 2024 · 14 comments
Open
Labels
bug Something isn't working

Comments

@philgoetz
Copy link

philgoetz commented Mar 27, 2024

I'm using audible-cli in the OS cygwin, which is a Linux shell for Windows.
A peculiarity of Cygwin is that the Windows full path

D:\data

must be passed in command line arguments either as

/cygdrive/d/data

or

"D:\data"

(or "D:/data", but that isn't very relevant), supposing that all of the computer's Windows drives are mounted under /cygdrive .

Commands work as usual if they're called with relative paths. But audible-cli decrypt converts everything to absolute paths before calling ffmpeg.

If I call audible decrypt giving the destination directory -d as a relative path, and the input file using a Windows absolute path,

$ audible decrypt -d decrypted/ "D:/data/audio_/Audible/downloads/aaxc/Wittgenstein_A_Very_Short_Introduction-AAX_22_32.aaxc"

I get this:

error: Voucher file /cygdrive/d/data/audio_/Audible/downloads/D:/data/audio_/Audible/downloads/aaxc/Wittgenstein_A_Very_Short_Introduction-AAX_22_32.voucher not found.

decrypt assumed the thing starting with "D:/" was a relative path.

If I use a Cygwin absolute path,

$ audible decrypt -d decrypted/ /cygdrive/d/data/audio_/Audible/downloads/aaxc/Wittgenstein_A_Very_Short_Introduction-AAX_22_32.aaxc

I get this:

File "/usr/lib/python3.9/subprocess.py", line 528, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['ffmpeg', '-v', 'quiet', '-stats', '-audible_key', '2473234d6da8338415238ef30152c27d', '-audible_iv', '964b5ea028b1922c1f91eda9730e705f', '-i', '/cygdrive/d/data/audio_/Audible/downloads/aaxc/Wittgenstein_A_Very_Short_Introduction-AAX_22_32.aaxc', '-c', 'copy', '/cygdrive/d/data/audio_/Audible/downloads/decrypted/Wittgenstein_A_Very_Short_Introduction-AAX_22_32.m4b']' returned non-zero exit status 1.

Here decrypt expanded the relative paths into the absolute paths starting with /cygdrive/d, but ffmpeg can't read this because ffmpeg for Windows presumably calls native Windows functions instead of Unix commands to parse its paths.

If I use all relative paths,

$ audible decrypt -d decrypted/ aaxc/Wittgenstein_A_Very_Short_Introduction-AAX_22_32.aaxc

I get this:

File "/usr/lib/python3.9/subprocess.py", line 528, in run
raise CalledProcessError(retcode, process.args,
subprocess.CalledProcessError: Command '['ffmpeg', '-v', 'quiet', '-stats', '-audible_key', '2473234d6da8338415238ef30152c27d', '-audible_iv', '964b5ea028b1922c1f91eda9730e705f', '-i', '/cygdrive/d/data/audio_/Audible/downloads/aaxc/Wittgenstein_A_Very_Short_Introduction-AAX_22_32.aaxc', '-c', 'copy', '/cygdrive/d/data/audio_/Audible/downloads/decrypted/Wittgenstein_A_Very_Short_Introduction-AAX_22_32.m4b']' returned non-zero exit status 1.

Here decrypt expanded the relative paths into absolute paths, but they were expanded into Cygwin absolute paths. The exit status 1 probably means ffmpeg couldn't find the files because it can't find /cygdrive (which is not an actual directory on the hard drive).

It would probably work if decrypt passed on relative paths without expanding them.
I think this is caused by the line
target_dir=pathlib.Path(directory).resolve()
in "def cli( ... )" in cmd_decrypt.py

@philgoetz
Copy link
Author

I tried to fix it myself, by reading in the file and directory arguments as a string, and creating a pathlib.Path out of it which is stored separately from the original string, and passing the original string to ffmpeg. But I couldn't, because I don't understand how variables are declared in Python. I keep getting an undefined variable error.

@mkb79
Copy link
Owner

mkb79 commented Mar 27, 2024

Below is the cmd_decrypt.py code without resolving the Path-object. Maybe this will help you.

"""Removes encryption of aax and aaxc files.

This is a proof-of-concept and for testing purposes only.

No error handling.
Need further work. Some options do not work or options are missing.

Needs at least ffmpeg 4.4
"""


import json
import operator
import pathlib
import re
import subprocess  # noqa: S404
import tempfile
import typing as t
from enum import Enum
from functools import reduce
from glob import glob
from shlex import quote
from shutil import which

import click
from click import echo, secho

from audible_cli.decorators import pass_session
from audible_cli.exceptions import AudibleCliException


class ChapterError(AudibleCliException):
    """Base class for all chapter errors."""


class SupportedFiles(Enum):
    AAX = ".aax"
    AAXC = ".aaxc"

    @classmethod
    def get_supported_list(cls):
        return list(set(item.value for item in cls))

    @classmethod
    def is_supported_suffix(cls, value):
        return value in cls.get_supported_list()

    @classmethod
    def is_supported_file(cls, value):
        return pathlib.PurePath(value).suffix in cls.get_supported_list()


def _get_input_files(
    files: t.Union[t.Tuple[str], t.List[str]],
    recursive: bool = True
) -> t.List[pathlib.Path]:
    filenames = []
    for filename in files:
        # if the shell does not do filename globbing
        expanded = list(glob(filename, recursive=recursive))

        if (
            len(expanded) == 0
            and '*' not in filename
            and not SupportedFiles.is_supported_file(filename)
        ):
            raise(click.BadParameter("{filename}: file not found or supported."))

        expanded_filter = filter(
            lambda x: SupportedFiles.is_supported_file(x), expanded
        )
        expanded = list(map(lambda x: pathlib.Path(x), expanded_filter))
        filenames.extend(expanded)

    return filenames


def recursive_lookup_dict(key: str, dictionary: t.Dict[str, t.Any]) -> t.Any:
    if key in dictionary:
        return dictionary[key]
    for value in dictionary.values():
        if isinstance(value, dict):
            try:
                item = recursive_lookup_dict(key, value)
            except KeyError:
                continue
            else:
                return item
            
    raise KeyError


def get_aaxc_credentials(voucher_file: pathlib.Path):
    if not voucher_file.exists() or not voucher_file.is_file():
        raise AudibleCliException(f"Voucher file {voucher_file} not found.")

    voucher_dict = json.loads(voucher_file.read_text())
    try:
        key = recursive_lookup_dict("key", voucher_dict)
        iv = recursive_lookup_dict("iv", voucher_dict)
    except KeyError:
        raise AudibleCliException(f"No key/iv found in file {voucher_file}.") from None

    return key, iv


class ApiChapterInfo:
    def __init__(self, content_metadata: t.Dict[str, t.Any]) -> None:
        chapter_info = self._parse(content_metadata)
        self._chapter_info = chapter_info

    @classmethod
    def from_file(cls, file: t.Union[pathlib.Path, str]) -> "ApiChapterInfo":
        file = pathlib.Path(file)
        if not file.exists() or not file.is_file():
            raise ChapterError(f"Chapter file {file} not found.")
        content_string = pathlib.Path(file).read_text("utf-8")
        content_json = json.loads(content_string)
        return cls(content_json)

    @staticmethod
    def _parse(content_metadata: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
        if "chapters" in content_metadata:
            return content_metadata

        try:
            return recursive_lookup_dict("chapter_info", content_metadata)
        except KeyError:
            raise ChapterError("No chapter info found.") from None

    def count_chapters(self):
        return len(self.get_chapters())

    def get_chapters(self, separate_intro_outro=False):
        def extract_chapters(initial, current):
            if "chapters" in current:
                return initial + [current] + current["chapters"]
            else:
                return initial + [current]

        chapters = list(
            reduce(
                extract_chapters,
                self._chapter_info["chapters"],
                [],
            )
        )

        if separate_intro_outro:
            return self._separate_intro_outro(chapters)

        return chapters

    def get_intro_duration_ms(self):
        return self._chapter_info["brandIntroDurationMs"]

    def get_outro_duration_ms(self):
        return self._chapter_info["brandOutroDurationMs"]

    def get_runtime_length_ms(self):
        return self._chapter_info["runtime_length_ms"]

    def is_accurate(self):
        return self._chapter_info["is_accurate"]

    def _separate_intro_outro(self, chapters):
        echo("Separate Audible Brand Intro and Outro to own Chapter.")
        chapters.sort(key=operator.itemgetter("start_offset_ms"))
    
        first = chapters[0]
        intro_dur_ms = self.get_intro_duration_ms()
        first["start_offset_ms"] = intro_dur_ms
        first["start_offset_sec"] = round(first["start_offset_ms"] / 1000)
        first["length_ms"] -= intro_dur_ms
    
        last = chapters[-1]
        outro_dur_ms = self.get_outro_duration_ms()
        last["length_ms"] -= outro_dur_ms
    
        chapters.append(
            {
                "length_ms": intro_dur_ms,
                "start_offset_ms": 0,
                "start_offset_sec": 0,
                "title": "Intro",
            }
        )
        chapters.append(
            {
                "length_ms": outro_dur_ms,
                "start_offset_ms": self.get_runtime_length_ms() - outro_dur_ms,
                "start_offset_sec": round(
                    (self.get_runtime_length_ms() - outro_dur_ms) / 1000
                ),
                "title": "Outro",
            }
        )
        chapters.sort(key=operator.itemgetter("start_offset_ms"))
    
        return chapters


class FFMeta:
    SECTION = re.compile(r"\[(?P<header>[^]]+)\]")
    OPTION = re.compile(r"(?P<option>.*?)\s*(?:(?P<vi>=)\s*(?P<value>.*))?$")

    def __init__(self, ffmeta_file: t.Union[str, pathlib.Path]) -> None:
        self._ffmeta_raw = pathlib.Path(ffmeta_file).read_text("utf-8")
        self._ffmeta_parsed = self._parse_ffmeta()

    def _parse_ffmeta(self):
        parsed_dict = {}
        start_section = "_"
        cursec = parsed_dict[start_section] = {}
        num_chap = 0

        for line in iter(self._ffmeta_raw.splitlines()):
            mo = self.SECTION.match(line)
            if mo:
                sec_name = mo.group("header")
                if sec_name == "CHAPTER":
                    num_chap += 1
                    if sec_name not in parsed_dict:
                        parsed_dict[sec_name] = {}
                    cursec = parsed_dict[sec_name][num_chap] = {}
                else:
                    cursec = parsed_dict[sec_name] = {}
            else:
                match = self.OPTION.match(line)
                cursec.update({match.group("option"): match.group("value")})

        return parsed_dict

    def count_chapters(self):
        return len(self._ffmeta_parsed["CHAPTER"])

    def set_chapter_option(self, num, option, value):
        chapter = self._ffmeta_parsed["CHAPTER"][num]
        for chapter_option in chapter:
            if chapter_option == option:
                chapter[chapter_option] = value

    def write(self, filename):
        fp = pathlib.Path(filename).open("w", encoding="utf-8")
        d = "="

        for section in self._ffmeta_parsed:
            if section == "_":
                self._write_section(fp, None, self._ffmeta_parsed[section], d)
            elif section == "CHAPTER":
                # TODO: Tue etwas
                for chapter in self._ffmeta_parsed[section]:
                    self._write_section(
                        fp, section, self._ffmeta_parsed[section][chapter], d
                    )
            else:
                self._write_section(fp, section, self._ffmeta_parsed[section], d)

    @staticmethod
    def _write_section(fp, section_name, section_items, delimiter):
        """Write a single section to the specified `fp`."""
        if section_name is not None:
            fp.write(f"[{section_name}]\n")

        for key, value in section_items.items():
            if value is None:
                fp.write(f"{key}\n")
            else:
                fp.write(f"{key}{delimiter}{value}\n")

    def update_chapters_from_chapter_info(
        self,
        chapter_info: ApiChapterInfo,
        force_rebuild_chapters: bool = False,
        separate_intro_outro: bool = False
    ) -> None:
        if not chapter_info.is_accurate():
            echo("Metadata from API is not accurate. Skip.")
            return

        if chapter_info.count_chapters() != self.count_chapters():
            if force_rebuild_chapters:
                echo("Force rebuild chapters due to chapter mismatch.")
            else:
                raise ChapterError("Chapter mismatch")

        echo(f"Found {chapter_info.count_chapters()} chapters to prepare.")

        api_chapters = chapter_info.get_chapters(separate_intro_outro)

        num_chap = 0
        new_chapters = {}
        for chapter in api_chapters:
            chap_start = chapter["start_offset_ms"]
            chap_end = chap_start + chapter["length_ms"]
            num_chap += 1
            new_chapters[num_chap] = {
                "TIMEBASE": "1/1000",
                "START": chap_start,
                "END": chap_end,
                "title": chapter["title"],
            }
        self._ffmeta_parsed["CHAPTER"] = new_chapters


def _get_voucher_filename(file: pathlib.Path) -> pathlib.Path:
    return file.with_suffix(".voucher")


def _get_chapter_filename(file: pathlib.Path) -> pathlib.Path:
    base_filename = file.stem.rsplit("-", 1)[0]
    return file.with_name(base_filename + "-chapters.json")


def _get_ffmeta_file(file: pathlib.Path, tempdir: pathlib.Path) -> pathlib.Path:
    metaname = file.with_suffix(".meta").name
    metafile = tempdir / metaname
    return metafile


class FfmpegFileDecrypter:
    def __init__(
        self,
        file: pathlib.Path,
        target_dir: pathlib.Path,
        tempdir: pathlib.Path,
        activation_bytes: t.Optional[str],
        overwrite: bool,
        rebuild_chapters: bool,
        force_rebuild_chapters: bool,
        skip_rebuild_chapters: bool,
        separate_intro_outro: bool
    ) -> None:
        file_type = SupportedFiles(file.suffix)

        credentials = None
        if file_type == SupportedFiles.AAX:
            if activation_bytes is None:
                raise AudibleCliException(
                    "No activation bytes found. Do you ever run "
                    "`audible activation-bytes`?"
                )
            credentials = activation_bytes
        elif file_type == SupportedFiles.AAXC:
            voucher_filename = _get_voucher_filename(file)
            credentials = get_aaxc_credentials(voucher_filename)

        self._source = file
        self._credentials: t.Optional[t.Union[str, t.Tuple[str]]] = credentials
        self._target_dir = target_dir
        self._tempdir = tempdir
        self._overwrite = overwrite
        self._rebuild_chapters = rebuild_chapters
        self._force_rebuild_chapters = force_rebuild_chapters
        self._skip_rebuild_chapters = skip_rebuild_chapters
        self._separate_intro_outro = separate_intro_outro
        self._api_chapter: t.Optional[ApiChapterInfo] = None
        self._ffmeta: t.Optional[FFMeta] = None
        self._is_rebuilded: bool = False

    @property
    def api_chapter(self) -> ApiChapterInfo:
        if self._api_chapter is None:
            try:
                voucher_filename = _get_voucher_filename(self._source)
                self._api_chapter = ApiChapterInfo.from_file(voucher_filename)
            except ChapterError:
                voucher_filename = _get_chapter_filename(self._source)
                self._api_chapter = ApiChapterInfo.from_file(voucher_filename)
            echo(f"Using chapters from {voucher_filename}")
        return self._api_chapter

    @property
    def ffmeta(self) -> FFMeta:
        if self._ffmeta is None:
            metafile = _get_ffmeta_file(self._source, self._tempdir)

            base_cmd = [
                "ffmpeg",
                "-v",
                "quiet",
                "-stats",
            ]
            if isinstance(self._credentials, tuple):
                key, iv = self._credentials
                credentials_cmd = [
                    "-audible_key",
                    quote(key),
                    "-audible_iv",
                    quote(iv),
                ]
            else:
                credentials_cmd = [
                    "-activation_bytes",
                    quote(self._credentials),
                ]    
            base_cmd.extend(credentials_cmd)
    
            extract_cmd = [
                "-i",
                quote(str(self._source)),
                "-f",
                "ffmetadata",
                str(metafile),
            ]
            base_cmd.extend(extract_cmd)

            subprocess.check_output(base_cmd, text=True)  # noqa: S603
            self._ffmeta = FFMeta(metafile)

        return self._ffmeta

    def rebuild_chapters(self) -> None:
        if not self._is_rebuilded:
            self.ffmeta.update_chapters_from_chapter_info(
                self.api_chapter, self._force_rebuild_chapters, self._separate_intro_outro
            )
            self._is_rebuilded = True

    def run(self):
        oname = self._source.with_suffix(".m4b").name
        outfile = self._target_dir / oname

        if outfile.exists():
            if self._overwrite:
                secho(f"Overwrite {outfile}: already exists", fg="blue")
            else:
                secho(f"Skip {outfile}: already exists", fg="blue")
                return

        base_cmd = [
            "ffmpeg",
            "-v",
            "quiet",
            "-stats",
        ]
        if self._overwrite:
            base_cmd.append("-y")
        if isinstance(self._credentials, tuple):
            key, iv = self._credentials
            credentials_cmd = [
                "-audible_key",
                quote(key),
                "-audible_iv",
                quote(iv),
            ]
        else:
            credentials_cmd = [
                "-activation_bytes",
                quote(self._credentials),
            ]    
        base_cmd.extend(credentials_cmd)
        base_cmd.extend(
            [
                "-i",
                quote(str(self._source)),
            ]
        )

        if self._rebuild_chapters:
            metafile = _get_ffmeta_file(self._source, self._tempdir)
            try:
                self.rebuild_chapters()
                self.ffmeta.write(metafile)
            except ChapterError:
                if self._skip_rebuild_chapters:
                    echo("Skip rebuild chapters due to chapter mismatch.")
                else:
                    raise
            else:
                base_cmd.extend(
                    [
                        "-i",
                        quote(str(metafile)),
                        "-map_metadata",
                        "0",
                        "-map_chapters",
                        "1",
                    ]
                )

        base_cmd.extend(
            [
                "-c",
                "copy",
                quote(str(outfile)),
            ]
        )

        subprocess.check_output(base_cmd, text=True)  # noqa: S603

        echo(f"File decryption successful: {outfile}")

@click.command("decrypt")
@click.argument("files", nargs=-1)
@click.option(
    "--dir",
    "-d",
    "directory",
    type=click.Path(exists=True, dir_okay=True),
    default=pathlib.Path.cwd(),
    help="Folder where the decrypted files should be saved.",
    show_default=True
)
@click.option(
    "--all",
    "-a",
    "all_",
    is_flag=True,
    help="Decrypt all aax and aaxc files in current folder."
)
@click.option("--overwrite", is_flag=True, help="Overwrite existing files.")
@click.option(
    "--rebuild-chapters",
    "-r",
    is_flag=True,
    help="Rebuild chapters with chapters from voucher or chapter file."
)
@click.option(
    "--force-rebuild-chapters",
    "-f",
    is_flag=True,
    help=(
        "Force rebuild chapters with chapters from voucher or chapter file "
        "if the built-in chapters in the audio file mismatch. "
        "Only use with `--rebuild-chapters`."
    ),
)
@click.option(
    "--skip-rebuild-chapters",
    "-t",
    is_flag=True,
    help=(
        "Decrypt without rebuilding chapters when chapters mismatch. "
        "Only use with `--rebuild-chapters`."
    ),
)
@click.option(
    "--separate-intro-outro",
    "-s",
    is_flag=True,
    help=(
        "Separate Audible Brand Intro and Outro to own Chapter. "
        "Only use with `--rebuild-chapters`."
    ),
)
@pass_session
def cli(
    session,
    files: str,
    directory: t.Union[pathlib.Path, str],
    all_: bool,
    overwrite: bool,
    rebuild_chapters: bool,
    force_rebuild_chapters: bool,
    skip_rebuild_chapters: bool,
    separate_intro_outro: bool,
):
    """Decrypt audiobooks downloaded with audible-cli.

    FILES are the names of the file to decrypt.
    Wildcards `*` and recursive lookup with `**` are supported.

    Only FILES with `aax` or `aaxc` suffix are processed. 
    Other files are skipped silently.
    """
    if not which("ffmpeg"):
        ctx = click.get_current_context()
        ctx.fail("ffmpeg not found")

    if (force_rebuild_chapters or skip_rebuild_chapters or separate_intro_outro) and not rebuild_chapters:
        raise click.BadOptionUsage(
            "`--force-rebuild-chapters`, `--skip-rebuild-chapters` and `--separate-intro-outro` can "
            "only be used together with `--rebuild-chapters`"
        )

    if force_rebuild_chapters and skip_rebuild_chapters:
        raise click.BadOptionUsage(
            "`--force-rebuild-chapters` and `--skip-rebuild-chapters` can "
            "not be used together"
        )

    if all_:
        if files:
            raise click.BadOptionUsage(
                "If using `--all`, no FILES arguments can be used."
            )
        files = [f"*{suffix}" for suffix in SupportedFiles.get_supported_list()]

    files = _get_input_files(files, recursive=True)
    with tempfile.TemporaryDirectory() as tempdir:
        for file in files:
            decrypter = FfmpegFileDecrypter(
                file=file,
                target_dir=pathlib.Path(directory),
                tempdir=pathlib.Path(tempdir),
                activation_bytes=session.auth.activation_bytes,
                overwrite=overwrite,
                rebuild_chapters=rebuild_chapters,
                force_rebuild_chapters=force_rebuild_chapters,
                skip_rebuild_chapters=skip_rebuild_chapters,
                separate_intro_outro=separate_intro_outro
            )
            decrypter.run()

@philgoetz
Copy link
Author

Thank you! I was able to get that to run correctly.

However, I am mystified at what happened when I tried to get audible to call this version of the plugin, and now my audible-cli installation is in a bizarre state, in which it executes the wrong file to run the plugin.

First, I saved the code you gave in my plugins directory as cmd_decrypt-relpath.py, but audible-cli was unable to find it:

audible decrypt-relpath -d decrypted/ aaxc/Wittgenstein_A_Very_Short_Introduction-AAX_22_32.aaxc
Usage: audible [OPTIONS] COMMAND [ARGS]...
Try 'audible -h' for help.

Error: No such command 'decrypt-relpath'.

So I typed

mv cmd_decrypt.py cmd_decrypt0.py
mv cmd_decrypt-relpath.py cmd_decrypt.py

Then I got

File "/cygdrive/d/data/audio_/Audible/audible-cli/plugins/cmd_decrypt0.py", line 457, in run
quote(fileraw),
NameError: name 'fileraw' is not defined

This is EXTREMELY strange. The variable 'fileraw' is one I'd added to that copy of cmd_decrypt.py in my attempt to make it use relative paths. Somehow audible-cli knew that I had renamed cmd_decrypt.py to cmd_decrypt0.py, and ignored cmd_decrypt.py and called cmd_decrypt0.py instead! (As you can see in the filename given above.) This should not be possible; Unix doesn't work that way. Files are located only by filename, and any record, anywhere, of which file to call should not know that I renamed the file.

I was only able to fix it by editing cmd_decrypt0.py, deleting everything, pasting in your new code, and saving that file. So now the decrypt command runs the plugin file cmd_decrypt0.py rather than cmd_decrypt.py. Do you have any idea how that is possible?

@mkb79
Copy link
Owner

mkb79 commented Mar 27, 2024

The filename does not matter, if a command name was given. So if you rename the file, you have to rewrite the file on line 494 @click.command("decrypt"). There you can rename the command name decrypt to decrypt-relname or whatever you want.

Edit:
If you write @click.command(), it will use the file name without the leading cmd_ as command name. This is helpful if you rename the filename often.

@mkb79
Copy link
Owner

mkb79 commented Mar 27, 2024

Since audible-cli v0.3.0 you can run audible [SUBCOMMAND-NAME] -h. This will print you out the file used for the plugin command.

@philgoetz
Copy link
Author

The filename does not matter, if a command name was given. So if you rename the file, you have to rewrite the file on line 494 @click.command("decrypt"). There you can rename the command name decrypt to decrypt-relname or whatever you want.

Edit: If you write @click.command(), it will use the file name without the leading cmd_ as command name. This is helpful if you rename the filename often.

This still doesn't make sense. No file anywhere has the word "decrypt0" in it. The plugin should have run cmd_decrypt.py in order to find line 494 of that file. It ran cmd_decrypt0.py first, before it could read line 494 of that file.

@mkb79
Copy link
Owner

mkb79 commented Mar 27, 2024

You can run audible -h to print out all available commands. The short help for plugin commands will start with [P] to indicate, that this command is loaded from a file or package. Now you can run audible [PLUGIN-COMMAND] -h to show you the used file.

Search in the plugin file the line which starts with @click.command(. Now you have two options. You can replace it with @click.command() to use the filename as command name. Or you can replace it with @click.command("YOUR_COMMAND_NAME") to replace it with your own command name. Then you can verify with audible -h if the command is found and audible YOUR_COMMAND_NAME -h to verify that the correct file is used.

The import order have some rules, if the command name exists multiple times. The last import replace all other commands with the same name.

@philgoetz
Copy link
Author

philgoetz commented Mar 27, 2024

I can now run audible decrypt -all, but not with the -r option, because with -r, decrypt calls ffmpeg to write a temp file to a directory that doesn't exist. This doesn't seem to have anything to do with relative paths. (This isn't important to me, since I don't think I need the chapter reconstruction feature. I probably have to convert everything to mp3 files, because this is for a 20-year-old MP3 player that probably doesn't know about m4b or chapter titles. I just thought you'd want to know.)

$ audible decrypt --all -r -t -d ../decrypted/
...
subprocess.CalledProcessError: Command '['ffmpeg', '-stats', '-audible_key', '694fe75f59fd53f67ebadd14d6b25dad', '-audible_iv', '3a19aecf05d875693c3a7a89a3cf3f15', '-i', 'ADHD_Mental_Health_Training_The_Course_Includes_Autism_ADHD_PTSD_OCD_Depression_Stress_Anxiety_Eating__Sleeping_Disorders_Personality_Disorders_Gender_Dysphoria__other_Mental_Disorders-AAX_22_32.aaxc', '-f', 'ffmetadata', '/tmp/tmpxzls123a/ADHD_Mental_Health_Training_The_Course_Includes_Autism_ADHD_PTSD_OCD_Depression_Stress_Anxiety_Eating__Sleeping_Disorders_Personality_Disorders_Gender_Dysphoria__other_Mental_Disorders-AAX_22_32.meta']' returned non-zero exit status 1.

The directory /tmp/tmpxzls123a doesn't exist, so the program crashes. (Note I had to remove -v quiet from the call to ffmpeg to get that stack trace.)

There is another, less-important bug: this error-raising doesn't work:

if (force_rebuild_chapters or skip_rebuild_chapters or separate_intro_outro) and not rebuild_chapters:
    raise click.BadOptionUsage(
        "`--force-rebuild-chapters`, `--skip-rebuild-chapters` and `--separate-intro-outro` can "
        "only be used together with `--rebuild-chapters`"
    )

It produces

File "/cygdrive/d/data/audio_/Audible/audible-cli/plugins/cmd_decrypt0.py", line 570, in cli
raise click.BadOptionUsage(
TypeError: init() missing 1 required positional argument: 'message'

This is probably a problem with all the error messages in the plugin.

@philgoetz
Copy link
Author

philgoetz commented Mar 27, 2024

You can run audible -h to print out all available commands. The short help for plugin commands will start with [P] to indicate, that this command is loaded from a file or package. Now you can run audible [PLUGIN-COMMAND] -h to show you the used file.

The mystery is how audible-cli knew that the file which had been named "cmd_decrypt.py" the last time it ran, now knows that file's name was changed by a shell command to "cmd_decrypt0.py". Unix doesn't have any way of notifying a program, let alone one that isn't running at the time, that a filename was changed from the command line. CORRECTION: Python might use inotifywait (from inotify-tools) with the -m, --monitor option, to keep track of plugin names. You must be using a Python library to register plugins. But that would mean Python leaves a process always running that might be constantly monitoring a large number of files. That would be very bad.

I don't need to know the answer to run audible-cli, but I'm very curious how it found out that the filename had been changed. That just shouldn't be possible.

@mkb79
Copy link
Owner

mkb79 commented Mar 27, 2024

Let me investigate in temp directory issue. But I'll think it’s Cygwin related. I'll report back.

To your plugin question. The answer is easy. On every command run, audible-cli will search in the plugin directory for files with the schema cmd_[…].py. In this file must be a Python function named def cli(…). This function is decorated with @click.command(…) to indicate, that this is a command with or without a specific name.

@mkb79
Copy link
Owner

mkb79 commented Mar 27, 2024

I've rewrote the decrypt script below to use the current working dir as tempdir. If this works for you, the issue is Cygwin related.

"""Removes encryption of aax and aaxc files.

This is a proof-of-concept and for testing purposes only.

No error handling.
Need further work. Some options do not work or options are missing.

Needs at least ffmpeg 4.4
"""


import json
import operator
import pathlib
import re
import subprocess  # noqa: S404
import tempfile
import typing as t
from enum import Enum
from functools import reduce
from glob import glob
from shlex import quote
from shutil import which

import click
from click import echo, secho

from audible_cli.decorators import pass_session
from audible_cli.exceptions import AudibleCliException


class ChapterError(AudibleCliException):
    """Base class for all chapter errors."""


class SupportedFiles(Enum):
    AAX = ".aax"
    AAXC = ".aaxc"

    @classmethod
    def get_supported_list(cls):
        return list(set(item.value for item in cls))

    @classmethod
    def is_supported_suffix(cls, value):
        return value in cls.get_supported_list()

    @classmethod
    def is_supported_file(cls, value):
        return pathlib.PurePath(value).suffix in cls.get_supported_list()


def _get_input_files(
    files: t.Union[t.Tuple[str], t.List[str]],
    recursive: bool = True
) -> t.List[pathlib.Path]:
    filenames = []
    for filename in files:
        # if the shell does not do filename globbing
        expanded = list(glob(filename, recursive=recursive))

        if (
            len(expanded) == 0
            and '*' not in filename
            and not SupportedFiles.is_supported_file(filename)
        ):
            raise(click.BadParameter("{filename}: file not found or supported."))

        expanded_filter = filter(
            lambda x: SupportedFiles.is_supported_file(x), expanded
        )
        expanded = list(map(lambda x: pathlib.Path(x), expanded_filter))
        filenames.extend(expanded)

    return filenames


def recursive_lookup_dict(key: str, dictionary: t.Dict[str, t.Any]) -> t.Any:
    if key in dictionary:
        return dictionary[key]
    for value in dictionary.values():
        if isinstance(value, dict):
            try:
                item = recursive_lookup_dict(key, value)
            except KeyError:
                continue
            else:
                return item
            
    raise KeyError


def get_aaxc_credentials(voucher_file: pathlib.Path):
    if not voucher_file.exists() or not voucher_file.is_file():
        raise AudibleCliException(f"Voucher file {voucher_file} not found.")

    voucher_dict = json.loads(voucher_file.read_text())
    try:
        key = recursive_lookup_dict("key", voucher_dict)
        iv = recursive_lookup_dict("iv", voucher_dict)
    except KeyError:
        raise AudibleCliException(f"No key/iv found in file {voucher_file}.") from None

    return key, iv


class ApiChapterInfo:
    def __init__(self, content_metadata: t.Dict[str, t.Any]) -> None:
        chapter_info = self._parse(content_metadata)
        self._chapter_info = chapter_info

    @classmethod
    def from_file(cls, file: t.Union[pathlib.Path, str]) -> "ApiChapterInfo":
        file = pathlib.Path(file)
        if not file.exists() or not file.is_file():
            raise ChapterError(f"Chapter file {file} not found.")
        content_string = pathlib.Path(file).read_text("utf-8")
        content_json = json.loads(content_string)
        return cls(content_json)

    @staticmethod
    def _parse(content_metadata: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
        if "chapters" in content_metadata:
            return content_metadata

        try:
            return recursive_lookup_dict("chapter_info", content_metadata)
        except KeyError:
            raise ChapterError("No chapter info found.") from None

    def count_chapters(self):
        return len(self.get_chapters())

    def get_chapters(self, separate_intro_outro=False):
        def extract_chapters(initial, current):
            if "chapters" in current:
                return initial + [current] + current["chapters"]
            else:
                return initial + [current]

        chapters = list(
            reduce(
                extract_chapters,
                self._chapter_info["chapters"],
                [],
            )
        )

        if separate_intro_outro:
            return self._separate_intro_outro(chapters)

        return chapters

    def get_intro_duration_ms(self):
        return self._chapter_info["brandIntroDurationMs"]

    def get_outro_duration_ms(self):
        return self._chapter_info["brandOutroDurationMs"]

    def get_runtime_length_ms(self):
        return self._chapter_info["runtime_length_ms"]

    def is_accurate(self):
        return self._chapter_info["is_accurate"]

    def _separate_intro_outro(self, chapters):
        echo("Separate Audible Brand Intro and Outro to own Chapter.")
        chapters.sort(key=operator.itemgetter("start_offset_ms"))
    
        first = chapters[0]
        intro_dur_ms = self.get_intro_duration_ms()
        first["start_offset_ms"] = intro_dur_ms
        first["start_offset_sec"] = round(first["start_offset_ms"] / 1000)
        first["length_ms"] -= intro_dur_ms
    
        last = chapters[-1]
        outro_dur_ms = self.get_outro_duration_ms()
        last["length_ms"] -= outro_dur_ms
    
        chapters.append(
            {
                "length_ms": intro_dur_ms,
                "start_offset_ms": 0,
                "start_offset_sec": 0,
                "title": "Intro",
            }
        )
        chapters.append(
            {
                "length_ms": outro_dur_ms,
                "start_offset_ms": self.get_runtime_length_ms() - outro_dur_ms,
                "start_offset_sec": round(
                    (self.get_runtime_length_ms() - outro_dur_ms) / 1000
                ),
                "title": "Outro",
            }
        )
        chapters.sort(key=operator.itemgetter("start_offset_ms"))
    
        return chapters


class FFMeta:
    SECTION = re.compile(r"\[(?P<header>[^]]+)\]")
    OPTION = re.compile(r"(?P<option>.*?)\s*(?:(?P<vi>=)\s*(?P<value>.*))?$")

    def __init__(self, ffmeta_file: t.Union[str, pathlib.Path]) -> None:
        self._ffmeta_raw = pathlib.Path(ffmeta_file).read_text("utf-8")
        self._ffmeta_parsed = self._parse_ffmeta()

    def _parse_ffmeta(self):
        parsed_dict = {}
        start_section = "_"
        cursec = parsed_dict[start_section] = {}
        num_chap = 0

        for line in iter(self._ffmeta_raw.splitlines()):
            mo = self.SECTION.match(line)
            if mo:
                sec_name = mo.group("header")
                if sec_name == "CHAPTER":
                    num_chap += 1
                    if sec_name not in parsed_dict:
                        parsed_dict[sec_name] = {}
                    cursec = parsed_dict[sec_name][num_chap] = {}
                else:
                    cursec = parsed_dict[sec_name] = {}
            else:
                match = self.OPTION.match(line)
                cursec.update({match.group("option"): match.group("value")})

        return parsed_dict

    def count_chapters(self):
        return len(self._ffmeta_parsed["CHAPTER"])

    def set_chapter_option(self, num, option, value):
        chapter = self._ffmeta_parsed["CHAPTER"][num]
        for chapter_option in chapter:
            if chapter_option == option:
                chapter[chapter_option] = value

    def write(self, filename):
        fp = pathlib.Path(filename).open("w", encoding="utf-8")
        d = "="

        for section in self._ffmeta_parsed:
            if section == "_":
                self._write_section(fp, None, self._ffmeta_parsed[section], d)
            elif section == "CHAPTER":
                # TODO: Tue etwas
                for chapter in self._ffmeta_parsed[section]:
                    self._write_section(
                        fp, section, self._ffmeta_parsed[section][chapter], d
                    )
            else:
                self._write_section(fp, section, self._ffmeta_parsed[section], d)

    @staticmethod
    def _write_section(fp, section_name, section_items, delimiter):
        """Write a single section to the specified `fp`."""
        if section_name is not None:
            fp.write(f"[{section_name}]\n")

        for key, value in section_items.items():
            if value is None:
                fp.write(f"{key}\n")
            else:
                fp.write(f"{key}{delimiter}{value}\n")

    def update_chapters_from_chapter_info(
        self,
        chapter_info: ApiChapterInfo,
        force_rebuild_chapters: bool = False,
        separate_intro_outro: bool = False
    ) -> None:
        if not chapter_info.is_accurate():
            echo("Metadata from API is not accurate. Skip.")
            return

        if chapter_info.count_chapters() != self.count_chapters():
            if force_rebuild_chapters:
                echo("Force rebuild chapters due to chapter mismatch.")
            else:
                raise ChapterError("Chapter mismatch")

        echo(f"Found {chapter_info.count_chapters()} chapters to prepare.")

        api_chapters = chapter_info.get_chapters(separate_intro_outro)

        num_chap = 0
        new_chapters = {}
        for chapter in api_chapters:
            chap_start = chapter["start_offset_ms"]
            chap_end = chap_start + chapter["length_ms"]
            num_chap += 1
            new_chapters[num_chap] = {
                "TIMEBASE": "1/1000",
                "START": chap_start,
                "END": chap_end,
                "title": chapter["title"],
            }
        self._ffmeta_parsed["CHAPTER"] = new_chapters


def _get_voucher_filename(file: pathlib.Path) -> pathlib.Path:
    return file.with_suffix(".voucher")


def _get_chapter_filename(file: pathlib.Path) -> pathlib.Path:
    base_filename = file.stem.rsplit("-", 1)[0]
    return file.with_name(base_filename + "-chapters.json")


def _get_ffmeta_file(file: pathlib.Path, tempdir: pathlib.Path) -> pathlib.Path:
    metaname = file.with_suffix(".meta").name
    metafile = tempdir / metaname
    return metafile


class FfmpegFileDecrypter:
    def __init__(
        self,
        file: pathlib.Path,
        target_dir: pathlib.Path,
        tempdir: pathlib.Path,
        activation_bytes: t.Optional[str],
        overwrite: bool,
        rebuild_chapters: bool,
        force_rebuild_chapters: bool,
        skip_rebuild_chapters: bool,
        separate_intro_outro: bool
    ) -> None:
        file_type = SupportedFiles(file.suffix)

        credentials = None
        if file_type == SupportedFiles.AAX:
            if activation_bytes is None:
                raise AudibleCliException(
                    "No activation bytes found. Do you ever run "
                    "`audible activation-bytes`?"
                )
            credentials = activation_bytes
        elif file_type == SupportedFiles.AAXC:
            voucher_filename = _get_voucher_filename(file)
            credentials = get_aaxc_credentials(voucher_filename)

        self._source = file
        self._credentials: t.Optional[t.Union[str, t.Tuple[str]]] = credentials
        self._target_dir = target_dir
        self._tempdir = tempdir
        self._overwrite = overwrite
        self._rebuild_chapters = rebuild_chapters
        self._force_rebuild_chapters = force_rebuild_chapters
        self._skip_rebuild_chapters = skip_rebuild_chapters
        self._separate_intro_outro = separate_intro_outro
        self._api_chapter: t.Optional[ApiChapterInfo] = None
        self._ffmeta: t.Optional[FFMeta] = None
        self._is_rebuilded: bool = False

    @property
    def api_chapter(self) -> ApiChapterInfo:
        if self._api_chapter is None:
            try:
                voucher_filename = _get_voucher_filename(self._source)
                self._api_chapter = ApiChapterInfo.from_file(voucher_filename)
            except ChapterError:
                voucher_filename = _get_chapter_filename(self._source)
                self._api_chapter = ApiChapterInfo.from_file(voucher_filename)
            echo(f"Using chapters from {voucher_filename}")
        return self._api_chapter

    @property
    def ffmeta(self) -> FFMeta:
        if self._ffmeta is None:
            metafile = _get_ffmeta_file(self._source, self._tempdir)

            base_cmd = [
                "ffmpeg",
                "-v",
                "quiet",
                "-stats",
            ]
            if isinstance(self._credentials, tuple):
                key, iv = self._credentials
                credentials_cmd = [
                    "-audible_key",
                    quote(key),
                    "-audible_iv",
                    quote(iv),
                ]
            else:
                credentials_cmd = [
                    "-activation_bytes",
                    quote(self._credentials),
                ]    
            base_cmd.extend(credentials_cmd)
    
            extract_cmd = [
                "-i",
                quote(str(self._source)),
                "-f",
                "ffmetadata",
                str(metafile),
            ]
            base_cmd.extend(extract_cmd)

            subprocess.check_output(base_cmd, text=True)  # noqa: S603
            self._ffmeta = FFMeta(metafile)

        return self._ffmeta

    def rebuild_chapters(self) -> None:
        if not self._is_rebuilded:
            self.ffmeta.update_chapters_from_chapter_info(
                self.api_chapter, self._force_rebuild_chapters, self._separate_intro_outro
            )
            self._is_rebuilded = True

    def run(self):
        oname = self._source.with_suffix(".m4b").name
        outfile = self._target_dir / oname

        if outfile.exists():
            if self._overwrite:
                secho(f"Overwrite {outfile}: already exists", fg="blue")
            else:
                secho(f"Skip {outfile}: already exists", fg="blue")
                return

        base_cmd = [
            "ffmpeg",
            "-v",
            "quiet",
            "-stats",
        ]
        if self._overwrite:
            base_cmd.append("-y")
        if isinstance(self._credentials, tuple):
            key, iv = self._credentials
            credentials_cmd = [
                "-audible_key",
                quote(key),
                "-audible_iv",
                quote(iv),
            ]
        else:
            credentials_cmd = [
                "-activation_bytes",
                quote(self._credentials),
            ]    
        base_cmd.extend(credentials_cmd)
        base_cmd.extend(
            [
                "-i",
                quote(str(self._source)),
            ]
        )

        if self._rebuild_chapters:
            metafile = _get_ffmeta_file(self._source, self._tempdir)
            try:
                self.rebuild_chapters()
                self.ffmeta.write(metafile)
            except ChapterError:
                if self._skip_rebuild_chapters:
                    echo("Skip rebuild chapters due to chapter mismatch.")
                else:
                    raise
            else:
                base_cmd.extend(
                    [
                        "-i",
                        quote(str(metafile)),
                        "-map_metadata",
                        "0",
                        "-map_chapters",
                        "1",
                    ]
                )

        base_cmd.extend(
            [
                "-c",
                "copy",
                quote(str(outfile)),
            ]
        )

        subprocess.check_output(base_cmd, text=True)  # noqa: S603

        echo(f"File decryption successful: {outfile}")

@click.command("decrypt")
@click.argument("files", nargs=-1)
@click.option(
    "--dir",
    "-d",
    "directory",
    type=click.Path(exists=True, dir_okay=True),
    default=pathlib.Path.cwd(),
    help="Folder where the decrypted files should be saved.",
    show_default=True
)
@click.option(
    "--all",
    "-a",
    "all_",
    is_flag=True,
    help="Decrypt all aax and aaxc files in current folder."
)
@click.option("--overwrite", is_flag=True, help="Overwrite existing files.")
@click.option(
    "--rebuild-chapters",
    "-r",
    is_flag=True,
    help="Rebuild chapters with chapters from voucher or chapter file."
)
@click.option(
    "--force-rebuild-chapters",
    "-f",
    is_flag=True,
    help=(
        "Force rebuild chapters with chapters from voucher or chapter file "
        "if the built-in chapters in the audio file mismatch. "
        "Only use with `--rebuild-chapters`."
    ),
)
@click.option(
    "--skip-rebuild-chapters",
    "-t",
    is_flag=True,
    help=(
        "Decrypt without rebuilding chapters when chapters mismatch. "
        "Only use with `--rebuild-chapters`."
    ),
)
@click.option(
    "--separate-intro-outro",
    "-s",
    is_flag=True,
    help=(
        "Separate Audible Brand Intro and Outro to own Chapter. "
        "Only use with `--rebuild-chapters`."
    ),
)
@pass_session
def cli(
    session,
    files: str,
    directory: t.Union[pathlib.Path, str],
    all_: bool,
    overwrite: bool,
    rebuild_chapters: bool,
    force_rebuild_chapters: bool,
    skip_rebuild_chapters: bool,
    separate_intro_outro: bool,
):
    """Decrypt audiobooks downloaded with audible-cli.

    FILES are the names of the file to decrypt.
    Wildcards `*` and recursive lookup with `**` are supported.

    Only FILES with `aax` or `aaxc` suffix are processed. 
    Other files are skipped silently.
    """
    if not which("ffmpeg"):
        ctx = click.get_current_context()
        ctx.fail("ffmpeg not found")

    if (force_rebuild_chapters or skip_rebuild_chapters or separate_intro_outro) and not rebuild_chapters:
        raise click.BadOptionUsage(
            "",
            "`--force-rebuild-chapters`, `--skip-rebuild-chapters` and `--separate-intro-outro` can "
            "only be used together with `--rebuild-chapters`"
        )

    if force_rebuild_chapters and skip_rebuild_chapters:
        raise click.BadOptionUsage(
            "",
            "`--force-rebuild-chapters` and `--skip-rebuild-chapters` can "
            "not be used together"
        )

    if all_:
        if files:
            raise click.BadOptionUsage(
                "",
                "If using `--all`, no FILES arguments can be used."
            )
        files = [f"*{suffix}" for suffix in SupportedFiles.get_supported_list()]

    files = _get_input_files(files, recursive=True)
    with tempfile.TemporaryDirectory() as tempdir:
        tempdir_cwd = pathlib.Path.cwd()
        for file in files:
            decrypter = FfmpegFileDecrypter(
                file=file,
                target_dir=pathlib.Path(directory),
                tempdir=pathlib.Path(tempdir_cwd),
                activation_bytes=session.auth.activation_bytes,
                overwrite=overwrite,
                rebuild_chapters=rebuild_chapters,
                force_rebuild_chapters=force_rebuild_chapters,
                skip_rebuild_chapters=skip_rebuild_chapters,
                separate_intro_outro=separate_intro_outro
            )
            decrypter.run()

@devnoname120
Copy link
Sponsor Contributor

@philgoetz Did you consider trying this solution?
https://stackoverflow.com/a/25008278

@philgoetz
Copy link
Author

philgoetz commented Apr 9, 2024 via email

@mkb79
Copy link
Owner

mkb79 commented Apr 23, 2024

@philgoetz
Can I close this issue now?

@mkb79 mkb79 added the bug Something isn't working label Apr 23, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

3 participants