“Low-Level Server”这个名字,源于软件开发中一个常见的术语:“low-level”(低层/底层)。它不是说“服务器很差”,而是强调它比高级封装更接近底层、原始的实现,意思是:
在 MCP 框架中,Low-Level Server 是一种更接近协议核心、需要你手动控制生命周期、注册处理函数的服务器写法,也就是说:
你负责:
call_tool()
、list_prompts()
等)server.run()
)它不像官方的 mcp run
或 mcp dev
那样封装好一整套流程(这些是 high-level 工具,自动帮你处理各种事情)。
和 “high-level”(高级封装) 相对:
uv run mcp run
server.run(...)
,你控制 lifespan
,你处理 stream就像 Python 的 asyncio
:
asyncio.run(main())
loop = asyncio.get_event_loop()
手动跑 loop用它是为了“更细粒度的控制”,适合这几种场景:
场景 | 为什么用 low-level server |
---|---|
你想注册多个自定义 handler,比如 get_prompt 、call_tool |
高级接口可能不够灵活 |
你想控制资源生命周期,比如连接数据库、清理缓存 | 需要用 lifespan |
你要接入自定义协议或更复杂的启动流程 | 高级封装不支持 |
你不想使用 mcp run 或 uv run |
这些不支持 low-level server |
“low-level server” 指的是一种更底层、自由度更高、但需要你自己处理细节的服务器写法。适合需要完全控制启动流程、资源管理、自定义能力的高级开发者。
“Low-Level Server”这个名字,源于软件开发中一个常见的术语:“low-level”(低层/底层)。它不是说“服务器很差”,而是强调它比高级封装更接近底层、原始的实现,意思是:
在 MCP 框架中,Low-Level Server 是一种更接近协议核心、需要你手动控制生命周期、注册处理函数的服务器写法,也就是说:
你负责:
call_tool()
、list_prompts()
等)server.run()
)它不像官方的 mcp run
或 mcp dev
那样封装好一整套流程(这些是 high-level 工具,自动帮你处理各种事情)。
和 “high-level”(高级封装) 相对:
uv run mcp run
server.run(...)
,你控制 lifespan
,你处理 stream就像 Python 的 asyncio
:
asyncio.run(main())
loop = asyncio.get_event_loop()
手动跑 loop用它是为了“更细粒度的控制”,适合这几种场景:
场景 | 为什么用 low-level server |
---|---|
你想注册多个自定义 handler,比如 get_prompt 、call_tool |
高级接口可能不够灵活 |
你想控制资源生命周期,比如连接数据库、清理缓存 | 需要用 lifespan |
你要接入自定义协议或更复杂的启动流程 | 高级封装不支持 |
你不想使用 mcp run 或 uv run |
这些不支持 low-level server |
“low-level server” 指的是一种更底层、自由度更高、但需要你自己处理细节的服务器写法。适合需要完全控制启动流程、资源管理、自定义能力的高级开发者。
“Low-Level Server”这个名字,源于软件开发中一个常见的术语:“low-level”(低层/底层)。它不是说“服务器很差”,而是强调它比高级封装更接近底层、原始的实现,意思是:
在 MCP 框架中,Low-Level Server 是一种更接近协议核心、需要你手动控制生命周期、注册处理函数的服务器写法,也就是说:
你负责:
call_tool()
、list_prompts()
等)server.run()
)它不像官方的 mcp run
或 mcp dev
那样封装好一整套流程(这些是 high-level 工具,自动帮你处理各种事情)。
和 “high-level”(高级封装) 相对:
uv run mcp run
server.run(...)
,你控制 lifespan
,你处理 stream就像 Python 的 asyncio
:
asyncio.run(main())
loop = asyncio.get_event_loop()
手动跑 loop用它是为了“更细粒度的控制”,适合这几种场景:
场景 | 为什么用 low-level server |
---|---|
你想注册多个自定义 handler,比如 get_prompt 、call_tool |
高级接口可能不够灵活 |
你想控制资源生命周期,比如连接数据库、清理缓存 | 需要用 lifespan |
你要接入自定义协议或更复杂的启动流程 | 高级封装不支持 |
你不想使用 mcp run 或 uv run |
这些不支持 low-level server |
“low-level server” 指的是一种更底层、自由度更高、但需要你自己处理细节的服务器写法。适合需要完全控制启动流程、资源管理、自定义能力的高级开发者。
“Low-Level Server”这个名字,源于软件开发中一个常见的术语:“low-level”(低层/底层)。它不是说“服务器很差”,而是强调它比高级封装更接近底层、原始的实现,意思是:
在 MCP 框架中,Low-Level Server 是一种更接近协议核心、需要你手动控制生命周期、注册处理函数的服务器写法,也就是说:
你负责:
call_tool()
、list_prompts()
等)server.run()
)它不像官方的 mcp run
或 mcp dev
那样封装好一整套流程(这些是 high-level 工具,自动帮你处理各种事情)。
和 “high-level”(高级封装) 相对:
uv run mcp run
server.run(...)
,你控制 lifespan
,你处理 stream就像 Python 的 asyncio
:
asyncio.run(main())
loop = asyncio.get_event_loop()
手动跑 loop用它是为了“更细粒度的控制”,适合这几种场景:
场景 | 为什么用 low-level server |
---|---|
你想注册多个自定义 handler,比如 get_prompt 、call_tool |
高级接口可能不够灵活 |
你想控制资源生命周期,比如连接数据库、清理缓存 | 需要用 lifespan |
你要接入自定义协议或更复杂的启动流程 | 高级封装不支持 |
你不想使用 mcp run 或 uv run |
这些不支持 low-level server |
“low-level server” 指的是一种更底层、自由度更高、但需要你自己处理细节的服务器写法。适合需要完全控制启动流程、资源管理、自定义能力的高级开发者。
您好,你这个没有加载acl模块的配置吗?
我的只要在配置文件中添加authorizer.class.name=org.apache.kafka.metadata.authorizer.StandardAuthorizer就报错:
[2025-07-01 16:16:57,612] INFO Awaiting socket connections on 0.0.0.0:9093. (kafka.network.DataPlaneAcceptor)
[2025-07-01 16:16:57,614] ERROR [RaftManager id=1] Unexpected error UNKNOWN_SERVER_ERROR in FETCH response: InboundResponse(correlationId=9, data=FetchResponseData(throttleTimeMs=0, errorCode=-1, sessionId=0, responses=[], nodeEndpoints=[]), source=10.206.68.13:9093 (id: -4 rack: null)) (org.apache.kafka.raft.KafkaRaftClient)
[2025-07-01 16:16:57,635] INFO [ControllerServer id=1] Waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)
[2025-07-01 16:16:57,635] INFO [ControllerServer id=1] Finished waiting for all of the authorizer futures to be completed (kafka.server.ControllerServer)
[2025-07-01 16:16:57,635] INFO [ControllerServer id=1] Waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)
[2025-07-01 16:16:57,636] INFO [ControllerServer id=1] Finished waiting for all of the SocketServer Acceptors to be started (kafka.server.ControllerServer)
[2025-07-01 16:16:57,636] INFO [ControllerRegistrationManager id=1 incarnation=8KtSks7wT_eLZJdfrTCDxw] initialized channel manager. (kafka.server.ControllerRegistrationManager)
[2025-07-01 16:16:57,636] INFO [BrokerServer id=1] Transition from SHUTDOWN to STARTING (kafka.server.BrokerServer)
[2025-07-01 16:16:57,637] INFO [ControllerRegistrationManager id=1 incarnation=8KtSks7wT_eLZJdfrTCDxw] maybeSendControllerRegistration: cannot register yet because the metadata.version is still 3.0-IV1, which does not support KIP-919 controller registration. (kafka.server.ControllerRegistrationManager)
[2025-07-01 16:16:57,637] INFO [BrokerServer id=1] Starting broker (kafka.server.BrokerServer)
[2025-07-01 16:16:57,652] INFO [controller-1-to-controller-registration-channel-manager]: Starting (kafka.server.NodeToControllerRequestThread)
[2025-07-01 16:16:57,664] INFO [broker-1-ThrottledChannelReaper-Fetch]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-07-01 16:16:57,664] INFO [broker-1-ThrottledChannelReaper-Produce]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-07-01 16:16:57,665] INFO [broker-1-ThrottledChannelReaper-Request]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-07-01 16:16:57,672] ERROR [ControllerApis nodeId=1] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-1, correlationId=11, headerVersion=2) -- FetchRequestData(clusterId='L9aHy0-zTSeGkTjjgiHU7g', replicaId=-1, replicaState=ReplicaState(replicaId=1, replicaEpoch=-1), maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=2GZE2nfgQvaYonlRrNSrrw)])], forgottenTopicsData=[], rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-1, correlationId=11, headerVersion=2), connectionId='10.206.68.11:9093-10.206.68.11:38142-0', clientAddress=/10.206.68.11, principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.9.1), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@6c15c06c]) (kafka.server.ControllerApis)
org.apache.kafka.common.errors.AuthorizerNotReadyException
[2025-07-01 16:16:57,674] INFO [broker-1-ThrottledChannelReaper-ControllerMutation]: Starting (kafka.server.ClientQuotaManager$ThrottledChannelReaper)
[2025-07-01 16:16:57,683] ERROR [ControllerApis nodeId=1] Unexpected error handling request RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-2, correlationId=4, headerVersion=2) -- FetchRequestData(clusterId='L9aHy0-zTSeGkTjjgiHU7g', replicaId=-1, replicaState=ReplicaState(replicaId=2, replicaEpoch=-1), maxWaitMs=500, minBytes=0, maxBytes=8388608, isolationLevel=0, sessionId=0, sessionEpoch=-1, topics=[FetchTopic(topic='', topicId=AAAAAAAAAAAAAAAAAAAAAQ, partitions=[FetchPartition(partition=0, currentLeaderEpoch=0, fetchOffset=0, lastFetchedEpoch=0, logStartOffset=-1, partitionMaxBytes=0, replicaDirectoryId=Rl3w3w2JRnuwNZ78qQXWrQ)])], forgottenTopicsData=[], rackId='') with context RequestContext(header=RequestHeader(apiKey=FETCH, apiVersion=17, clientId=raft-client-2, correlationId=4, headerVersion=2), connectionId='10.206.68.11:9093-10.206.68.12:5298-0', clientAddress=/10.206.68.12, principal=User:ANONYMOUS, listenerName=ListenerName(CONTROLLER), securityProtocol=PLAINTEXT, clientInformation=ClientInformation(softwareName=apache-kafka-java, softwareVersion=3.9.1), fromPrivilegedListener=false, principalSerde=Optional[org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder@1ed5187]) (kafka.server.ControllerApis)
org.apache.kafka.common.errors.AuthorizerNotReadyException
[2025-07-01 16:16:57,686] ERROR [RaftManager id=1] Unexpected error UNKNOWN_SERVER_ERROR in FETCH response: InboundResponse(correlationId=10, data=FetchResponseData(throttleTimeMs=0, errorCode=-1, sessionId=0, responses=[], nodeEndpoints=[]), source=10.206.68.11:9093 (id: -2 rack: null)) (org.apache.kafka.raft.KafkaRaftClient)
有知道是怎么回事吗?
是的,kafka 2.4
还不支持2.0
,所以你的还是1.0
版本。
kafka 2.6+
后是支持的。可参考:https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
是的,kafka 2.4
还不支持2.0
,所以你的还是1.0
版本。
kafka 2.6+
后是支持的。可参考:https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0
请问我配置完zookeeper和kafka的sasl/plain认证后,启动kafka,有一个节点一直报错,
Failed authentication with /192.168.66.211 (Unexpected Kafka request of type METADATA during SASL handshake.) (org.apache.kafka.common.network.Selector)。
但是192.168.66.211这台服务器并不是zookeeper和kafka节点,请问这是什么原因?
嗯,k8s的版本,所以对参数有区别,可以参考:https://www.kubebiz.com/KubeBiz/kafka
里面有k8s的版本选择,会自动补全。
一般是基础镜像的问题:
先手动拉取:
docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kicbase:v0.0.45
然后手动指定基础镜像:
minikube start --force --base-image='registry.cn-hangzhou.aliyuncs.com/google_containers/kicbase:v0.0.45'
enabled mechanisms are []
启用的机制是空,并没有生效,先看看kafka日志中是否有什么异常。
另外,我看你配置里有些其他的认证方式,建议你注掉,防止干扰。
可参考:https://www.orchome.com/1966
先保证命令行可以运行成功。
enabled mechanisms are []
启用的机制是空,并没有生效,先看看kafka日志中是否有什么异常。
另外,我看你配置里有些其他的认证方式,建议你注掉,防止干扰。
可参考:https://www.orchome.com/1966
先保证命令行可以运行成功。
LISTENERS=listeners=PLAINTEXT://phm-data02:9092,
这个换成
LISTENERS=listeners=SASL_PLAINTEXT://phm-data02:9092
一般是基础镜像的问题:
先手动拉取:
docker pull registry.cn-hangzhou.aliyuncs.com/google_containers/kicbase:v0.0.45
然后手动指定基础镜像:
minikube start --force --base-image='registry.cn-hangzhou.aliyuncs.com/google_containers/kicbase:v0.0.45'
无法删除是因为命名空间中仍然存在的资源引起的。
以下命令显示命名空间中剩余的资源:
kubectl api-resources --verbs=list --namespaced -o name \
| xargs -n 1 kubectl get --show-kind --ignore-not-found -n <namespace>
一旦你移除了这些资源之后,命名空间就能删掉了。
感谢大佬的指点,目前已经全部调通,包括kerberos环境!
非kerberos环境最后配置的格式就是上面贴的。
kerberos环境 大致还需要以下几点。
1、kafka-server端加了环境变量
export KAFKA_OPTS="-Djava.security.auth.login.config=/usr/hdp/current/kafka-broker/conf/kafka_jaas.conf"
2、/etc/krb5.conf文件可能需要加一行udp_preference_limit = 1 将udp改成tcp防止丢包(这个不一定需要)
3、客户端需要一个kafka_client_jaas.conf
KafkaClient {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="kafka";
};
Client {
com.sun.security.auth.module.Krb5LoginModule required
useTicketCache=true
renewTicket=true
serviceName="zookeeper";
};
4、然后一些sasl的配置,监听器的配置就不赘述了
总结:之前对“主动发现集群机制”了解不够,也不知道消费时要对每一个broker都开一个长连接* 加上报错一直都是权限验证失败让人感觉是kerberos的问题,绕了很久。后面排除无关的因素,就很明显了。另外提醒ambari安装的kafka不管界面上配置的advertised.listeners是多少,内部代码还是会强行将listeners的值赋给advertised.listeners。
还是很感谢大佬的及时回复 耐心指导。期待以后更多的交流
对应的是pod的.metadata.uid
:
for d in /var/lib/kubelet/pods/*; do
p_u=$(basename "$d")
kubectl get po -A -o json | \
jq --arg pod_uuid "$p_u" -r '.items[]
| select(.metadata.uid == $pod_uuid)
| "uuid \($pod_uuid) is \(.metadata.name)"'
done
类似如下输出:
"Labels": {
"annotation.io.kubernetes.container.hash": "e44bee94",
"annotation.io.kubernetes.container.restartCount": "4",
"annotation.io.kubernetes.container.terminationMessagePath": "/dev/termination-log",
"annotation.io.kubernetes.container.terminationMessagePolicy": "File",
"annotation.io.kubernetes.pod.terminationGracePeriod": "30",
"io.kubernetes.container.logpath": "/var/log/pods/kube-system_storage-provisioner_b4aa3b1c-62c1-4661-a302-4c06b305b7c0/storage-provisioner/4.log",
"io.kubernetes.container.name": "storage-provisioner",
"io.kubernetes.docker.type": "container",
"io.kubernetes.pod.name": "storage-provisioner",
"io.kubernetes.pod.namespace": "kube-system",
"io.kubernetes.pod.uid": "b4aa3b1c-62c1-4661-a302-4c06b305b7c0",
"io.kubernetes.sandbox.id": "3950ec60121fd13116230cad388a4c6c4e417c660b7da475436f9ad5c9cf6738"
}