Odoo REST API:实际示例和集成教程
根据 Postman 的 2025 年 API 状况报告,73% 的企业现在将其 ERP 与至少三个外部系统集成。 Odoo 为全球超过 1200 万用户提供支持,通过丰富的 API 层公开其整个数据模型。然而,该文档让许多开发人员在身份验证流程、批处理操作和生产级错误处理方面陷入困境。
本教程为每种常见的集成模式提供了 Python 和 Node.js 中的复制粘贴示例。无论您是同步 Shopify 订单、从移动应用程序推送数据还是构建自定义仪表板,本指南都能满足您的需求。
要点
- Odoo 提供三种 API 访问方法:XML-RPC(旧版,所有版本)、JSON-RPC(Web 客户端协议)和 REST API(带有 API 密钥的 Odoo 17+)——每种方法都有不同的身份验证和用例。
- API 密钥身份验证 (Odoo 17+) 是服务器到服务器集成的推荐方法 — 无会话管理、无 CSRF 令牌、简单的 HTTP 标头。
- 搜索域使用Odoo强大的波兰表示法进行过滤——掌握运算符,您就可以查询任何数据组合。
- 批量操作对于性能至关重要 - 通过一次 API 调用创建 1,000 条记录比 1,000 条单独调用快 50 倍。
- 错误处理和速率限制对于生产集成至关重要 - Odoo 返回结构化错误响应,您的代码应妥善解析和处理这些响应。
1. 认证方式
方法 1:API 密钥(推荐 Odoo 17+)
API 密钥是服务器到服务器通信最简单、最安全的方法:
# Generate an API key in Odoo:
# Settings → Users → Select user → Account Security → New API Key
# Test authentication
curl -X GET "https://your-odoo.com/api/res.partner" \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json"
带有 API 密钥的 Python:
import requests
class OdooAPI:
def __init__(self, url, api_key):
self.url = url.rstrip('/')
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
})
def get(self, model, params=None):
response = self.session.get(
f'{self.url}/api/{model}',
params=params or {}
)
response.raise_for_status()
return response.json()
def post(self, model, data):
response = self.session.post(
f'{self.url}/api/{model}',
json=data
)
response.raise_for_status()
return response.json()
# Usage
odoo = OdooAPI('https://your-odoo.com', 'your-api-key-here')
partners = odoo.get('res.partner', {'limit': 10})
带有 API 密钥的 Node.js:
const axios = require('axios');
class OdooAPI {
constructor(url, apiKey) {
this.client = axios.create({
baseURL: `${url}/api`,
headers: {
'Authorization': `Bearer ${apiKey}`,
'Content-Type': 'application/json',
},
timeout: 30000,
});
}
async get(model, params = {}) {
const { data } = await this.client.get(`/${model}`, { params });
return data;
}
async post(model, body) {
const { data } = await this.client.post(`/${model}`, body);
return data;
}
async put(model, id, body) {
const { data } = await this.client.put(`/${model}/${id}`, body);
return data;
}
async delete(model, id) {
const { data } = await this.client.delete(`/${model}/${id}`);
return data;
}
}
// Usage
const odoo = new OdooAPI('https://your-odoo.com', 'your-api-key');
const partners = await odoo.get('res.partner', { limit: 10 });
方法 2:JSON-RPC(所有版本)
JSON-RPC 是 Odoo 的 Web 客户端内部使用的协议。它适用于所有 Odoo 版本:
import requests
import json
class OdooJsonRpc:
def __init__(self, url, db, username, password):
self.url = url.rstrip('/')
self.db = db
self.session = requests.Session()
self.uid = self._authenticate(username, password)
def _authenticate(self, username, password):
response = self._call('/web/session/authenticate', {
'db': self.db,
'login': username,
'password': password,
})
if not response.get('uid'):
raise Exception(f"Authentication failed: {response}")
return response['uid']
def _call(self, endpoint, params):
payload = {
'jsonrpc': '2.0',
'method': 'call',
'params': params,
'id': None,
}
response = self.session.post(
f'{self.url}{endpoint}',
json=payload,
headers={'Content-Type': 'application/json'}
)
result = response.json()
if result.get('error'):
raise Exception(result['error']['data']['message'])
return result.get('result')
def search_read(self, model, domain=None, fields=None, limit=80, offset=0, order=None):
return self._call('/web/dataset/call_kw', {
'model': model,
'method': 'search_read',
'args': [domain or []],
'kwargs': {
'fields': fields or [],
'limit': limit,
'offset': offset,
'order': order or '',
},
})
def create(self, model, values):
return self._call('/web/dataset/call_kw', {
'model': model,
'method': 'create',
'args': [values],
'kwargs': {},
})
def write(self, model, ids, values):
return self._call('/web/dataset/call_kw', {
'model': model,
'method': 'write',
'args': [ids, values],
'kwargs': {},
})
# Usage
odoo = OdooJsonRpc('https://your-odoo.com', 'mydb', 'admin', 'password')
orders = odoo.search_read('sale.order', [('state', '=', 'sale')],
fields=['name', 'amount_total', 'partner_id'],
limit=20)
方法 3:XML-RPC(传统、通用)
import xmlrpc.client
url = 'https://your-odoo.com'
db = 'mydb'
username = 'admin'
password = 'password'
# Authenticate
common = xmlrpc.client.ServerProxy(f'{url}/xmlrpc/2/common')
uid = common.authenticate(db, username, password, {})
# Create models proxy
models = xmlrpc.client.ServerProxy(f'{url}/xmlrpc/2/object')
# Search and read
partners = models.execute_kw(db, uid, password,
'res.partner', 'search_read',
[[('is_company', '=', True), ('country_id.code', '=', 'US')]],
{'fields': ['name', 'email', 'phone'], 'limit': 10}
)
2.增删改查操作
创建记录
# Create a single contact
partner_id = odoo.create('res.partner', {
'name': 'Acme Corporation',
'is_company': True,
'email': '[email protected]',
'phone': '+1-555-0123',
'street': '123 Main Street',
'city': 'San Francisco',
'state_id': 5, # California
'country_id': 233, # United States
'category_id': [(6, 0, [1, 3])], # Tags: replace all with IDs 1 and 3
})
# Create a sale order with lines
order_id = odoo.create('sale.order', {
'partner_id': partner_id,
'date_order': '2026-03-23',
'order_line': [
(0, 0, {
'product_id': 42,
'product_uom_qty': 5,
'price_unit': 99.99,
}),
(0, 0, {
'product_id': 43,
'product_uom_qty': 2,
'price_unit': 149.99,
}),
],
})
Node.js 等效项:
// Create a contact
const partnerId = await odoo.post('res.partner', {
name: 'Acme Corporation',
is_company: true,
email: '[email protected]',
phone: '+1-555-0123',
street: '123 Main Street',
city: 'San Francisco',
country_id: 233,
});
// Create sale order with lines
const orderId = await odoo.post('sale.order', {
partner_id: partnerId,
date_order: '2026-03-23',
order_line: [
[0, 0, { product_id: 42, product_uom_qty: 5, price_unit: 99.99 }],
[0, 0, { product_id: 43, product_uom_qty: 2, price_unit: 149.99 }],
],
});
读取记录
# Read specific fields from specific records
data = odoo.search_read('sale.order',
domain=[('state', '=', 'sale'), ('amount_total', '>', 500)],
fields=['name', 'partner_id', 'amount_total', 'date_order', 'state'],
limit=50,
offset=0,
order='date_order desc'
)
# Read a single record by ID (REST API)
# GET /api/sale.order/42?fields=name,amount_total
// Node.js — read with pagination
async function fetchAllOrders(odoo) {
const pageSize = 100;
let offset = 0;
let allOrders = [];
while (true) {
const orders = await odoo.get('sale.order', {
domain: JSON.stringify([['state', '=', 'sale']]),
fields: 'name,partner_id,amount_total',
limit: pageSize,
offset,
order: 'date_order desc',
});
allOrders = allOrders.concat(orders);
if (orders.length < pageSize) break;
offset += pageSize;
}
return allOrders;
}
更新记录
# Update a single record
odoo.write('res.partner', [partner_id], {
'phone': '+1-555-9999',
'website': 'https://acme.com',
})
# Update multiple records at once
draft_orders = odoo.search_read('sale.order',
[('state', '=', 'draft'), ('date_order', '<', '2026-01-01')],
fields=['id']
)
ids = [o['id'] for o in draft_orders]
odoo.write('sale.order', ids, {'note': 'Reviewed Q1 2026'})
删除记录
# Delete records (use with caution)
odoo.write('res.partner', [obsolete_id], {'active': False}) # Prefer archiving
# Actually delete (rarely needed)
# models.execute_kw(db, uid, password, 'res.partner', 'unlink', [[obsolete_id]])
3. 高级搜索域
Odoo 的搜索域使用波兰表示法(前缀表示法)来组合条件。条件之间的默认运算符是 AND。使用管道“|”对于 OR,与号“&”对于显式 AND。每个叶子都是字段名称、运算符和值的元组。 Odoo 支持点符号来过滤相关模型字段,例如“partner_id.country_id.code”来按客户所在国家/地区过滤订单。
# Complex domain examples
# Orders from US customers with amount > $1000, created this year
domain = [
('partner_id.country_id.code', '=', 'US'),
('amount_total', '>', 1000),
('date_order', '>=', '2026-01-01'),
('state', 'in', ['sale', 'done']),
]
# OR condition: email contains 'gmail' OR 'yahoo'
domain = [
'|',
('email', 'ilike', 'gmail.com'),
('email', 'ilike', 'yahoo.com'),
]
# Complex: (state=sale AND amount>1000) OR (state=done AND amount>5000)
domain = [
'|',
'&', ('state', '=', 'sale'), ('amount_total', '>', 1000),
'&', ('state', '=', 'done'), ('amount_total', '>', 5000),
]
# Negation: NOT archived
domain = [('active', '!=', False)]
# NULL check: has no email
domain = [('email', '=', False)]
# Hierarchical: all child categories of 'Electronics'
domain = [('categ_id', 'child_of', electronics_id)]
# Date ranges
from datetime import datetime, timedelta
domain = [
('create_date', '>=', (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')),
('create_date', '<', datetime.now().strftime('%Y-%m-%d')),
]
运算符参考
| 操作员 | 描述 | 示例 |
|---|---|---|
| 代码0 | 精确匹配 | 代码1 |
| 代码0 | 不等于 | 代码1 |
| 代码0、代码1、代码2、代码3 | 比较 | 代码4 |
| 代码0 | 列表中的值 | 代码1 |
| 代码0 | 值不在列表中 | 代码1 |
| 代码0 | SQL LIKE(区分大小写) | 代码1 |
| 代码0 | 不区分大小写__代码1__ | |
| 代码0 | 模式匹配 | 代码1 |
| 代码0 | 等级后代 | 代码1 |
| 代码0 | 等级祖先 | 代码1 |
4. 批量操作
批处理操作对于性能至关重要。切勿在循环中一次创建一个记录:
# BAD: 1000 API calls (slow, ~300 seconds)
for customer in customers:
odoo.create('res.partner', customer)
# GOOD: 1 API call with batch (fast, ~3 seconds)
# Using JSON-RPC batch create
partner_ids = odoo._call('/web/dataset/call_kw', {
'model': 'res.partner',
'method': 'create',
'args': [customers], # Pass list of dicts
'kwargs': {},
})
// Node.js batch pattern with chunking
async function batchCreate(odoo, model, records, chunkSize = 200) {
const results = [];
for (let i = 0; i < records.length; i += chunkSize) {
const chunk = records.slice(i, i + chunkSize);
console.log(`Creating ${model} batch ${i / chunkSize + 1}/${Math.ceil(records.length / chunkSize)}`);
const ids = await odoo.post(model, chunk);
results.push(...(Array.isArray(ids) ? ids : [ids]));
// Respect rate limits
if (i + chunkSize < records.length) {
await new Promise(resolve => setTimeout(resolve, 500));
}
}
return results;
}
// Usage
const customers = generateCustomerData(); // Array of 5000 objects
const ids = await batchCreate(odoo, 'res.partner', customers);
console.log(`Created ${ids.length} partners`);
5. 错误处理
生产集成必须妥善处理错误:
import requests
import logging
import time
logger = logging.getLogger(__name__)
class OdooAPIClient:
MAX_RETRIES = 3
RETRY_DELAY = 2 # seconds
def __init__(self, url, api_key):
self.url = url.rstrip('/')
self.session = requests.Session()
self.session.headers.update({
'Authorization': f'Bearer {api_key}',
'Content-Type': 'application/json',
})
def _request(self, method, endpoint, **kwargs):
last_error = None
for attempt in range(self.MAX_RETRIES):
try:
response = self.session.request(
method,
f'{self.url}{endpoint}',
timeout=30,
**kwargs
)
if response.status_code == 429:
retry_after = int(response.headers.get('Retry-After', 60))
logger.warning(f"Rate limited. Retrying after {retry_after}s")
time.sleep(retry_after)
continue
if response.status_code == 401:
raise AuthenticationError("Invalid API key or session expired")
if response.status_code == 403:
raise PermissionError(f"Access denied: {response.text}")
if response.status_code == 404:
raise RecordNotFoundError(f"Record not found: {endpoint}")
if response.status_code >= 500:
logger.error(f"Server error {response.status_code}: {response.text}")
if attempt < self.MAX_RETRIES - 1:
time.sleep(self.RETRY_DELAY * (attempt + 1))
continue
raise ServerError(f"Server error after {self.MAX_RETRIES} retries")
response.raise_for_status()
return response.json()
except requests.exceptions.ConnectionError as e:
last_error = e
logger.warning(f"Connection error (attempt {attempt + 1}): {e}")
if attempt < self.MAX_RETRIES - 1:
time.sleep(self.RETRY_DELAY * (attempt + 1))
except requests.exceptions.Timeout as e:
last_error = e
logger.warning(f"Request timeout (attempt {attempt + 1}): {e}")
if attempt < self.MAX_RETRIES - 1:
time.sleep(self.RETRY_DELAY * (attempt + 1))
raise ConnectionError(f"Failed after {self.MAX_RETRIES} attempts: {last_error}")
class AuthenticationError(Exception): pass
class PermissionError(Exception): pass
class RecordNotFoundError(Exception): pass
class ServerError(Exception): pass
// Node.js error handling with axios interceptors
const axios = require('axios');
function createOdooClient(url, apiKey) {
const client = axios.create({
baseURL: `${url}/api`,
headers: { Authorization: `Bearer ${apiKey}` },
timeout: 30000,
});
// Response interceptor for error handling
client.interceptors.response.use(
(response) => response,
async (error) => {
const { config, response } = error;
config.__retryCount = config.__retryCount || 0;
// Rate limiting
if (response?.status === 429) {
const retryAfter = parseInt(response.headers['retry-after'] || '60', 10);
console.warn(`Rate limited. Waiting ${retryAfter}s...`);
await new Promise((r) => setTimeout(r, retryAfter * 1000));
return client(config);
}
// Retry on server errors (max 3)
if (response?.status >= 500 && config.__retryCount < 3) {
config.__retryCount += 1;
const delay = config.__retryCount * 2000;
console.warn(`Server error ${response.status}. Retry ${config.__retryCount}/3 in ${delay}ms`);
await new Promise((r) => setTimeout(r, delay));
return client(config);
}
// Structured error response
const errorMessage = response?.data?.error?.message
|| response?.data?.message
|| error.message;
throw new Error(`Odoo API Error [${response?.status}]: ${errorMessage}`);
}
);
return client;
}
6. 实际集成示例
Shopify 到 Odoo 订单同步
class ShopifyOdooSync:
def __init__(self, odoo_client, shopify_client):
self.odoo = odoo_client
self.shopify = shopify_client
def sync_order(self, shopify_order):
# 1. Find or create customer
partner = self._get_or_create_partner(shopify_order['customer'])
# 2. Map products
order_lines = []
for item in shopify_order['line_items']:
product_id = self._find_product_by_sku(item['sku'])
if not product_id:
logger.warning(f"Product not found for SKU: {item['sku']}")
continue
order_lines.append((0, 0, {
'product_id': product_id,
'product_uom_qty': item['quantity'],
'price_unit': float(item['price']),
'discount': self._calc_discount(item),
}))
# 3. Create sale order
order_id = self.odoo.create('sale.order', {
'partner_id': partner['id'],
'client_order_ref': shopify_order['name'], # Shopify order #
'order_line': order_lines,
'note': f"Shopify Order: {shopify_order['id']}",
})
# 4. Auto-confirm if paid
if shopify_order['financial_status'] == 'paid':
self.odoo._call('/web/dataset/call_kw', {
'model': 'sale.order',
'method': 'action_confirm',
'args': [[order_id]],
'kwargs': {},
})
return order_id
def _get_or_create_partner(self, customer):
# Search by email first
existing = self.odoo.search_read('res.partner',
[('email', '=', customer['email'])],
fields=['id', 'name'], limit=1)
if existing:
return existing[0]
return {'id': self.odoo.create('res.partner', {
'name': f"{customer['first_name']} {customer['last_name']}",
'email': customer['email'],
'phone': customer.get('phone'),
})}
def _find_product_by_sku(self, sku):
products = self.odoo.search_read('product.product',
[('default_code', '=', sku)],
fields=['id'], limit=1)
return products[0]['id'] if products else None
Webhook 接收器(Flask 示例)
from flask import Flask, request, jsonify
import hmac
import hashlib
app = Flask(__name__)
@app.route('/webhook/odoo/order', methods=['POST'])
def handle_odoo_webhook():
# Verify signature
signature = request.headers.get('X-Odoo-Signature')
expected = hmac.new(
WEBHOOK_SECRET.encode(),
request.data,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected):
return jsonify({'error': 'Invalid signature'}), 401
payload = request.json
event_type = payload.get('event')
if event_type == 'sale.order.confirmed':
handle_order_confirmed(payload['data'])
elif event_type == 'stock.picking.done':
handle_shipment_complete(payload['data'])
return jsonify({'status': 'ok'}), 200
7. 性能提示
| 提示 | 影响 | 详情 |
|---|---|---|
使用 fields 参数 | 高 | 仅请求所需字段 — 减少有效负载 5-10 倍 |
| 批量创建 | 高 | 1 个呼叫包含 500 条记录与 500 个呼叫 — 快 50 倍 |
| 对大型数据集进行分页 | 中等 | 使用 limit 和 offset — 避免加载 100K 记录 |
| 缓存只读数据 | 中等 | 缓存产品目录、类别(TTL 5-15 分钟) |
使用 search_count | 低 | 获取之前先计数 — 避免仅仅为了计数而加载数据 |
| 连接池 | 中等 | 重用 HTTP 会话 — 节省 TLS 握手开销 |
常见问题
Odoo 中的 XML-RPC、JSON-RPC 和 REST API 有什么区别?
XML-RPC 是所有 Odoo 版本中可用的传统协议 — 它很冗长,但得到普遍支持。 JSON-RPC 是 Odoo 的 Web 客户端使用的协议,并提供与 JSON 有效负载相同的功能。 REST API 在 Odoo 17 中引入,并提供标准 HTTP 端点和 API 密钥身份验证,使其成为现代集成的最简单选择。对于新项目,如果您使用的是 Odoo 17 或更高版本,请使用 REST API。
如何处理 Odoo API 速率限制?
Odoo.sh 根据您的计划等级应用速率限制。当您收到 429 响应时,请读取 Retry-After 标头并在重试之前等待。对于大批量集成,请实施指数退避、批量操作以减少 API 调用数量,并考虑使用 Odoo 的计划操作进行批量处理,而不是进行非关键同步的实时 API 调用。
我可以通过 API 调用自定义 Python 方法吗?
是的。 Odoo 模型上的任何公共方法都可以使用execute_kw 通过XML-RPC 或JSON-RPC 进行调用。对于 REST API,您需要使用 @http.route 创建自定义控制器端点。以下划线开头的方法是私有的,不能通过 XML-RPC 外部调用。始终验证自定义方法中的输入以防止注入攻击。
如何高效同步大型数据集?
使用策略组合:初始完全同步与批处理操作和分页(每个请求限制 200 条记录),然后使用 write_date 过滤进行增量同步,仅获取自上次同步以来修改的记录。存储上次同步时间戳并将其用作域过滤器。对于超过 100,000 条记录的超大型数据集,请考虑直接数据库复制而不是 API 同步。
Odoo REST API 在 Odoo 社区版中可用吗?
Odoo 17 Enterprise 中引入了具有 API 密钥身份验证的本机 REST API。对于 Odoo 社区,您可以使用所有版本中都可用的 XML-RPC 或 JSON-RPC,或者安装社区模块,例如添加 RESTful 端点的 OCA 的 Rest-framework。 ECOSIRE 的集成服务支持所有 Odoo 版本和 API 协议。
如何处理 API 调用中的 Many2many 和 One2many 字段?
关系字段使用特殊命令元组:(0, 0,values) 创建并链接新记录,(1,id,values) 更新链接记录,(2,id,0) 删除链接记录,(3,id,0) 取消链接而不删除,(4,id,0) 链接现有记录,(5,0,0) 取消所有记录,(6,0, [ids]) 替换所有链接。对于读取,这些字段默认返回 ID 列表 - 使用 search_read 和字段名称来获取完整数据。
后续步骤
API集成是现代业务系统的支柱。无论您是构建简单的数据同步还是复杂的多平台编排,本指南中的模式都将为您提供良好的服务。
相关资源:
- Odoo Python 开发指南 — 深入了解 Odoo 的 Python 后端
- Webhook 调试指南 — Webhook 集成故障排除
- ECOSIRE 市场连接器 — 针对主要平台的预构建集成
需要 Odoo API 集成方面的帮助吗? ECOSIRE 的集成团队 已将 Odoo 连接到 50 多个外部平台,包括 Shopify、Amazon、Salesforce 和自定义 ERP。从简单的数据同步到实时双向编排,我们构建了可扩展的集成。 安排免费技术咨询。
作者
ECOSIRE TeamTechnical Writing
The ECOSIRE technical writing team covers Odoo ERP, Shopify eCommerce, AI agents, Power BI analytics, GoHighLevel automation, and enterprise software best practices. Our guides help businesses make informed technology decisions.
相关文章
AI 支持的客户细分:从 RFM 到预测聚类
了解 AI 如何将客户细分从静态 RFM 分析转变为动态预测聚类。使用 Python、Odoo 和真实 ROI 数据的实施指南。
用于供应链优化的人工智能:可见性、预测和自动化
利用人工智能改变供应链运营:需求感知、供应商风险评分、路线优化、仓库自动化和中断预测。 2026年指南。
API 集成模式:企业架构最佳实践
掌握企业系统的 API 集成模式。 REST、GraphQL、gRPC、事件驱动架构、saga 模式、API 网关和版本控制指南。