docs: update stream-proxy.md test case link (#7665)
[apisix.git] / apisix / core / etcd.lua
index a57a5d0c86bf6a9e583d3dec9a0cf2eb01bd51d1..7ac08334e2d051f1283662a566335d6e97ee387f 100644 (file)
@@ -21,6 +21,7 @@
 
 local fetch_local_conf  = require("apisix.core.config_local").local_conf
 local array_mt          = require("apisix.core.json").array_mt
+local v3_adapter        = require("apisix.admin.v3_adapter")
 local etcd              = require("resty.etcd")
 local clone_tab         = require("table.clone")
 local health_check      = require("resty.etcd.health_check")
@@ -29,25 +30,63 @@ local setmetatable      = setmetatable
 local string            = string
 local tonumber          = tonumber
 local ngx_config_prefix = ngx.config.prefix()
+local ngx_socket_tcp    = ngx.socket.tcp
 
 
 local is_http = ngx.config.subsystem == "http"
 local _M = {}
 
 
--- this function create the etcd client instance used in the Admin API
+local function has_mtls_support()
+    local s = ngx_socket_tcp()
+    return s.tlshandshake ~= nil
+end
+
+
+local function _new(etcd_conf)
+    local prefix = etcd_conf.prefix
+    etcd_conf.http_host = etcd_conf.host
+    etcd_conf.host = nil
+    etcd_conf.prefix = nil
+    etcd_conf.protocol = "v3"
+    etcd_conf.api_prefix = "/v3"
+
+    -- default to verify etcd cluster certificate
+    etcd_conf.ssl_verify = true
+    if etcd_conf.tls then
+        if etcd_conf.tls.verify == false then
+            etcd_conf.ssl_verify = false
+        end
+
+        if etcd_conf.tls.cert then
+            etcd_conf.ssl_cert_path = etcd_conf.tls.cert
+            etcd_conf.ssl_key_path = etcd_conf.tls.key
+        end
+
+        if etcd_conf.tls.sni then
+            etcd_conf.sni = etcd_conf.tls.sni
+        end
+    end
+
+    local etcd_cli, err = etcd.new(etcd_conf)
+    if not etcd_cli then
+        return nil, nil, err
+    end
+
+    return etcd_cli, prefix
+end
+
+
 local function new()
     local local_conf, err = fetch_local_conf()
     if not local_conf then
         return nil, nil, err
     end
 
-    local etcd_conf
+    local etcd_conf = clone_tab(local_conf.etcd)
     local proxy_by_conf_server = false
 
     if local_conf.deployment then
-        etcd_conf = clone_tab(local_conf.deployment.etcd)
-
         if local_conf.deployment.role == "traditional"
             -- we proxy the etcd requests in traditional mode so we can test the CP's behavior in
             -- daily development. However, a stream proxy can't be the CP.
@@ -62,34 +101,33 @@ local function new()
             proxy_by_conf_server = true
 
         elseif local_conf.deployment.role == "control_plane" then
-            -- TODO: add the proxy conf in control_plane
-            proxy_by_conf_server = true
-        end
-    else
-        etcd_conf = clone_tab(local_conf.etcd)
-    end
+            local addr = local_conf.deployment.role_control_plane.conf_server.listen
+            etcd_conf.host = {"https://" .. addr}
+            etcd_conf.tls = {
+                verify = false,
+            }
 
-    local prefix = etcd_conf.prefix
-    etcd_conf.http_host = etcd_conf.host
-    etcd_conf.host = nil
-    etcd_conf.prefix = nil
-    etcd_conf.protocol = "v3"
-    etcd_conf.api_prefix = "/v3"
+            if has_mtls_support() and local_conf.deployment.certs.cert then
+                local cert = local_conf.deployment.certs.cert
+                local cert_key = local_conf.deployment.certs.cert_key
+                etcd_conf.tls.cert = cert
+                etcd_conf.tls.key = cert_key
+            end
 
-    -- default to verify etcd cluster certificate
-    etcd_conf.ssl_verify = true
-    if etcd_conf.tls then
-        if etcd_conf.tls.verify == false then
-            etcd_conf.ssl_verify = false
-        end
+            proxy_by_conf_server = true
 
-        if etcd_conf.tls.cert then
-            etcd_conf.ssl_cert_path = etcd_conf.tls.cert
-            etcd_conf.ssl_key_path = etcd_conf.tls.key
-        end
+        elseif local_conf.deployment.role == "data_plane" then
+            if has_mtls_support() and local_conf.deployment.certs.cert then
+                local cert = local_conf.deployment.certs.cert
+                local cert_key = local_conf.deployment.certs.cert_key
 
-        if etcd_conf.tls.sni then
-            etcd_conf.sni = etcd_conf.tls.sni
+                if not etcd_conf.tls then
+                    etcd_conf.tls = {}
+                end
+
+                etcd_conf.tls.cert = cert
+                etcd_conf.tls.key = cert_key
+            end
         end
     end
 
@@ -106,15 +144,28 @@ local function new()
         })
     end
 
-    local etcd_cli
-    etcd_cli, err = etcd.new(etcd_conf)
-    if not etcd_cli then
+    return _new(etcd_conf)
+end
+_M.new = new
+
+
+---
+-- Create an etcd client which will connect to etcd without being proxyed by conf server.
+-- This method is used in init_worker phase when the conf server is not ready.
+--
+-- @function core.etcd.new_without_proxy
+-- @treturn table|nil the etcd client, or nil if failed.
+-- @treturn string|nil the configured prefix of etcd keys, or nil if failed.
+-- @treturn nil|string the error message.
+function _M.new_without_proxy()
+    local local_conf, err = fetch_local_conf()
+    if not local_conf then
         return nil, nil, err
     end
 
-    return etcd_cli, prefix
+    local etcd_conf = clone_tab(local_conf.etcd)
+    return _new(etcd_conf)
 end
-_M.new = new
 
 
 -- convert ETCD v3 entry to v2 one
@@ -168,7 +219,7 @@ function _M.get_format(res, real_key, is_dir, formatter)
         return not_found(res)
     end
 
-    res.body.action = "get"
+    v3_adapter.to_v3(res.body, "get")
 
     if formatter then
         return formatter(res)
@@ -196,6 +247,7 @@ function _M.get_format(res, real_key, is_dir, formatter)
     end
 
     res.body.kvs = nil
+    v3_adapter.to_v3_list(res.body)
     return res
 end
 
@@ -269,10 +321,14 @@ local function set(key, value, ttl)
         return nil, err
     end
 
+    if res.body.error then
+        return nil, res.body.error
+    end
+
     res.headers["X-Etcd-Index"] = res.body.header.revision
 
     -- etcd v3 set would not return kv info
-    res.body.action = "set"
+    v3_adapter.to_v3(res.body, "set")
     res.body.node = {}
     res.body.node.key = prefix .. key
     res.body.node.value = value
@@ -335,7 +391,7 @@ function _M.atomic_set(key, value, ttl, mod_revision)
 
     res.headers["X-Etcd-Index"] = res.body.header.revision
     -- etcd v3 set would not return kv info
-    res.body.action = "compareAndSwap"
+    v3_adapter.to_v3(res.body, "compareAndSwap")
     res.body.node = {
         key = key,
         value = value,
@@ -373,7 +429,7 @@ function _M.push(key, value, ttl)
         return nil, err
     end
 
-    res.body.action = "create"
+    v3_adapter.to_v3(res.body, "create")
     return res, nil
 end
 
@@ -397,7 +453,7 @@ function _M.delete(key)
     end
 
     -- etcd v3 set would not return kv info
-    res.body.action = "delete"
+    v3_adapter.to_v3(res.body, "delete")
     res.body.node = {}
     res.body.key = prefix .. key
 
@@ -417,7 +473,7 @@ end
 -- --   etcdserver = "3.5.0"
 -- -- }
 function _M.server_version()
-    local etcd_cli, err = new()
+    local etcd_cli, _, err = new()
     if not etcd_cli then
         return nil, err
     end