jsonpath.jsonpath函数源码
作者:YXN-python 阅读量:105 发布日期:2023-11-07
源码
import re
import sys
def normalize(x):
"""normalize the path expression; outside jsonpath to allow testing"""
subx = []
# replace index/filter expressions with placeholders
# Python anonymous functions (lambdas) are cryptic, hard to debug
def f1(m):
n = len(subx) # before append
g1 = m.group(1)
subx.append(g1)
ret = "[#%d]" % n
# print("f1:", g1, ret)
return ret
x = re.sub(r"[\['](\??\(.*?\))[\]']", f1, x)
# added the negative lookbehind -krhodes
x = re.sub(r"'?(? 1: print("\tf03", key, loc, expr, path)
trace(s(key, expr), obj, path)
walk(loc, x, obj, path, f03)
elif loc == "..":
trace(x, obj, path)
def f04(key, loc, expr, obj, path):
if debug > 1: print("\tf04", key, loc, expr, path)
if isinstance(obj, dict):
if key in obj:
trace(s('..', expr), obj[key], s(path, key))
else:
if key < len(obj):
trace(s('..', expr), obj[key], s(path, key))
walk(loc, x, obj, path, f04)
elif loc == "!":
# Perl jsonpath extension: return keys
def f06(key, loc, expr, obj, path):
if isinstance(obj, dict):
trace(expr, key, path)
walk(loc, x, obj, path, f06)
elif isinstance(obj, dict) and loc in obj:
trace(x, obj[loc], s(path, loc))
elif isinstance(obj, list) and isint(loc):
iloc = int(loc)
if debug: print("----->", iloc, len(obj))
if len(obj) > iloc:
trace(x, obj[iloc], s(path, loc))
else:
# [(index_expression)]
if loc.startswith("(") and loc.endswith(")"):
if debug > 1: print("index", loc)
e = evalx(loc, obj)
trace(s(e,x), obj, path)
return
# ?(filter_expression)
if loc.startswith("?(") and loc.endswith(")"):
if debug > 1: print("filter", loc)
def f05(key, loc, expr, obj, path):
if debug > 1: print("f05", key, loc, expr, path)
if isinstance(obj, dict):
eval_result = evalx(loc, obj[key])
else:
eval_result = evalx(loc, obj[int(key)])
if eval_result:
trace(s(key, expr), obj, path)
loc = loc[2:-1]
walk(loc, x, obj, path, f05)
return
m = re.match(r'(-?[0-9]*):(-?[0-9]*):?(-?[0-9]*)$', loc)
if m:
if isinstance(obj, (dict, list)):
def max(x,y):
if x > y:
return x
return y
def min(x,y):
if x < y:
return x
return y
objlen = len(obj)
s0 = m.group(1)
s1 = m.group(2)
s2 = m.group(3)
# XXX int("badstr") raises exception
start = int(s0) if s0 else 0
end = int(s1) if s1 else objlen
step = int(s2) if s2 else 1
if start < 0:
start = max(0, start+objlen)
else:
start = min(objlen, start)
if end < 0:
end = max(0, end+objlen)
else:
end = min(objlen, end)
for i in xrange(start, end, step):
trace(s(i, x), obj, path)
return
# after (expr) & ?(expr)
if loc.find(",") >= 0:
# [index,index....]
for piece in re.split(r"'?,'?", loc):
if debug > 1: print("piece", piece)
trace(s(piece, x), obj, path)
else:
store(path, obj)
def walk(loc, expr, obj, path, funct):
if isinstance(obj, list):
for i in xrange(0, len(obj)):
funct(i, loc, expr, obj, path)
elif isinstance(obj, dict):
for key in obj:
funct(key, loc, expr, obj, path)
def evalx(loc, obj):
"""eval expression"""
if debug: print("evalx", loc)
# a nod to JavaScript. doesn't work for @.name.name.length
# Write len(@.name.name) instead!!!
loc = loc.replace("@.length", "len(__obj)")
loc = loc.replace("&&", " and ").replace("||", " or ")
# replace !@.name with 'name' not in obj
# XXX handle !@.name.name.name....
def notvar(m):
return "'%s' not in __obj" % m.group(1)
loc = re.sub(r"!@\.([a-zA-Z@_0-9-]*)", notvar, loc)
# replace @.name.... with __obj['name']....
# handle @.name[.name...].length
def varmatch(m):
def brackets(elts):
ret = "__obj"
for e in elts:
if isint(e):
ret += "[%s]" % e # ain't necessarily so
else:
ret += "['%s']" % e # XXX beware quotes!!!!
return ret
g1 = m.group(1)
elts = g1.split('.')
if elts[-1] == "length":
return "len(%s)" % brackets(elts[1:-1])
return brackets(elts[1:])
loc = re.sub(r'(? == translation
# causes problems if a string contains =
# replace @ w/ "__obj", but \@ means a literal @
loc = re.sub(r'(?", v)
return v
# body of jsonpath()
# Get caller globals so eval can pick up user functions!!!
caller_globals = sys._getframe(1).f_globals
result = []
if expr and obj:
cleaned_expr = normalize(expr)
if cleaned_expr.startswith("$;"):
cleaned_expr = cleaned_expr[2:]
# XXX wrap this in a try??
trace(cleaned_expr, obj, '$')
if len(result) > 0:
return result
return False
语法规则参考:python之jsonpath库
自己写一个
最基本的
仅支持简单语法:
$.data[*].a.aa[*].aaa1
def jsonpath1(data, query):
parts = query.split('.')
def helper(cur, part_index):
if part_index >= len(parts):
return []
part = parts[part_index]
if part.startswith('$'):
return helper(cur, part_index + 1)
elif part.endswith(']'):
arr_start = part.find('[')
key = part[:arr_start]
sub = cur.get(key)
if sub is None:
return []
index_spec = part[arr_start + 1: -1]
if index_spec == '*':
sub_results = []
for i in range(len(sub)):
sub_item = sub[i]
sub_result = helper(sub_item, part_index + 1)
if isinstance(sub_result, list):
sub_results.extend(sub_result)
else:
sub_results.append(sub_result)
return sub_results
else:
index = int(index_spec)
if index < len(sub):
return helper(sub[index], part_index + 1)
else:
return []
else:
sub = cur.get(part)
if sub is None:
return []
if part_index == len(parts) - 1:
return sub
else:
return helper(sub, part_index + 1)
return helper(data, 0)
if __name__ == '__main__':
data_dict = {
'data': [
{'a': {'aa': [{'aaa1': 'aaaa1'}, {'aaa1': 'aaaa2'}]}},
{'a': {'aa': [{'aaa1': 'aaaa3'}, {'aaa2': 'aaa'}]}},
{'b': ['bb1', 'bb2']}
]
}
path = '$.data[*].a.aa[*].aaa1'
result = jsonpath1(data_dict, path)
print(result) # 输出: ['aaaa1', 'aaaa2', 'aaaa3']
支持区分数组中的多个值
# 支持区分 数组元素中的多个值
def jsonpath2(data, query):
parts = query.split('.')
def helper(cur, part_index):
if part_index >= len(parts):
return []
part = parts[part_index]
if part.startswith('$'):
return helper(cur, part_index + 1)
elif part.endswith(']'):
arr_start = part.find('[')
key = part[:arr_start]
sub = cur.get(key)
if sub is None:
return []
index_spec = part[arr_start + 1: -1]
if index_spec == '*':
sub_results = []
sub_list = []
for i in range(len(sub)):
sub_item = sub[i]
sub_result = helper(sub_item, part_index + 1)
if isinstance(sub_result, list):
sub_list.extend(sub_result)
else:
sub_list.append(sub_result)
if part_index < len(parts) - 1:
sub_results.append(sub_list)
else:
sub_results.extend(sub_list)
return sub_results
else:
index = int(index_spec)
if index < len(sub):
return helper(sub[index], part_index + 1)
else:
return []
else:
sub = cur.get(part)
if sub is None:
return []
if part_index == len(parts) - 1:
return sub
else:
return helper(sub, part_index + 1)
result = helper(data, 0)
def post_process(lst):
new_lst = []
for item in lst:
if isinstance(item, list):
item = post_process(item)
if len(item) == 1 and isinstance(item[0], list):
item = item[0]
new_lst.append(item)
# 再次检查最外层列表
if len(new_lst) == 1 and isinstance(new_lst[0], list):
new_lst = new_lst[0]
return new_lst
if isinstance(result, list):
return post_process(result)
return result
if __name__ == '__main__':
data_dict = {
'data': [
{'a': {'aa': [{'aaa1': 'aaaa1'}, {'aaa1': 'aaaa2'}]}},
{'a': {'aa': [{'aaa1': 'aaaa3'}, {'aaa2': 'aaa'}]}},
{'b': ['bb1', 'bb2']}
]
}
path = '$.data[*].a.aa[*].aaa1'
result = jsonpath2(data_dict, path)
print(result) # 输出: [['aaaa1', 'aaaa2'], ['aaaa3']]
YXN-python
2023-11-07