Skip to content
This repository was archived by the owner on Nov 5, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 14 additions & 14 deletions rds/kubernetes/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ type epLister struct {
kClient *client

mu sync.RWMutex // Mutex for names and cache
names []string
cache map[string]*epInfo
keys []resourceKey
cache map[resourceKey]*epInfo
l *logger.Logger
}

Expand Down Expand Up @@ -67,16 +67,16 @@ func (lister *epLister) listResources(req *pb.ListResourcesRequest) ([]*pb.Resou
lister.mu.RLock()
defer lister.mu.RUnlock()

for _, name := range lister.names {
if epName != "" && name != epName {
for _, key := range lister.keys {
if epName != "" && key.name != epName {
continue
}

if nameFilter != nil && !nameFilter.Match(name, lister.l) {
if nameFilter != nil && !nameFilter.Match(key.name, lister.l) {
continue
}

epi := lister.cache[name]
epi := lister.cache[key]
if nsFilter != nil && !nsFilter.Match(epi.Metadata.Namespace, lister.l) {
continue
}
Expand Down Expand Up @@ -141,7 +141,7 @@ func (epi *epInfo) resources(portFilter *filter.RegexFilter, l *logger.Logger) (
return
}

func parseEndpointsJSON(resp []byte) (names []string, endpoints map[string]*epInfo, err error) {
func parseEndpointsJSON(resp []byte) (keys []resourceKey, endpoints map[resourceKey]*epInfo, err error) {
var itemList struct {
Items []*epInfo
}
Expand All @@ -150,11 +150,11 @@ func parseEndpointsJSON(resp []byte) (names []string, endpoints map[string]*epIn
return
}

names = make([]string, len(itemList.Items))
endpoints = make(map[string]*epInfo)
keys = make([]resourceKey, len(itemList.Items))
endpoints = make(map[resourceKey]*epInfo)
for i, item := range itemList.Items {
names[i] = item.Metadata.Name
endpoints[item.Metadata.Name] = item
keys[i] = resourceKey{item.Metadata.Namespace, item.Metadata.Name}
endpoints[keys[i]] = item
}

return
Expand All @@ -166,16 +166,16 @@ func (lister *epLister) expand() {
lister.l.Warningf("epLister.expand(): error while getting endpoints list from API: %v", err)
}

names, endpoints, err := parseEndpointsJSON(resp)
keys, endpoints, err := parseEndpointsJSON(resp)
if err != nil {
lister.l.Warningf("epLister.expand(): error while parsing endpoints API response (%s): %v", string(resp), err)
}

lister.l.Infof("epLister.expand(): got %d endpoints", len(names))
lister.l.Infof("epLister.expand(): got %d endpoints", len(keys))

lister.mu.Lock()
defer lister.mu.Unlock()
lister.names = names
lister.keys = keys
lister.cache = endpoints
}

Expand Down
18 changes: 11 additions & 7 deletions rds/kubernetes/endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,24 @@ func TestParseEndpoints(t *testing.T) {
if err != nil {
t.Fatalf("error reading test data file: %s", epListFile)
}
_, epByName, err := parseEndpointsJSON(data)
_, epByKey, err := parseEndpointsJSON(data)
if err != nil {
t.Fatalf("error reading test data file: %s", epListFile)
}

testNames := []string{"cloudprober", "cloudprober-test", "kubernetes"}
for _, testP := range testNames {
if epByName[testP] == nil {
t.Errorf("didn't get endpoints by the name: %s", testP)
testKeys := []resourceKey{
{"default", "cloudprober"},
{"default", "cloudprober-test"},
{"system", "kubernetes"},
}
for _, key := range testKeys {
if epByKey[key] == nil {
t.Errorf("didn't get endpoints for %+v", key)
}
}

for _, name := range testNames[:1] {
epi := epByName[name]
for _, key := range testKeys[:1] {
epi := epByKey[key]
if epi.Metadata.Labels["app"] != "cloudprober" {
t.Errorf("cloudprober endpoints app label: got=%s, want=cloudprober", epi.Metadata.Labels["app"])
}
Expand Down
4 changes: 4 additions & 0 deletions rds/kubernetes/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ type kMetadata struct {
Labels map[string]string
}

type resourceKey struct {
namespace, name string
}

// ListResources returns the list of resources from the cache.
func (p *Provider) ListResources(req *pb.ListResourcesRequest) (*pb.ListResourcesResponse, error) {
tok := strings.SplitN(req.GetResourcePath(), "/", 2)
Expand Down
28 changes: 14 additions & 14 deletions rds/kubernetes/pods.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type podsLister struct {
kClient *client

mu sync.RWMutex // Mutex for names and cache
names []string
cache map[string]*podInfo
keys []resourceKey
cache map[resourceKey]*podInfo
l *logger.Logger
}

Expand All @@ -59,12 +59,12 @@ func (pl *podsLister) listResources(req *pb.ListResourcesRequest) ([]*pb.Resourc
pl.mu.RLock()
defer pl.mu.RUnlock()

for _, name := range pl.names {
if nameFilter != nil && !nameFilter.Match(name, pl.l) {
for _, key := range pl.keys {
if nameFilter != nil && !nameFilter.Match(key.name, pl.l) {
continue
}

pod := pl.cache[name]
pod := pl.cache[key]
if nsFilter != nil && !nsFilter.Match(pod.Metadata.Namespace, pl.l) {
continue
}
Expand All @@ -73,7 +73,7 @@ func (pl *podsLister) listResources(req *pb.ListResourcesRequest) ([]*pb.Resourc
}

resources = append(resources, &pb.Resource{
Name: proto.String(name),
Name: proto.String(key.name),
Ip: proto.String(pod.Status.PodIP),
Labels: pod.Metadata.Labels,
})
Expand All @@ -91,7 +91,7 @@ type podInfo struct {
}
}

func parsePodsJSON(resp []byte) (names []string, pods map[string]*podInfo, err error) {
func parsePodsJSON(resp []byte) (keys []resourceKey, pods map[resourceKey]*podInfo, err error) {
var itemList struct {
Items []*podInfo
}
Expand All @@ -100,14 +100,14 @@ func parsePodsJSON(resp []byte) (names []string, pods map[string]*podInfo, err e
return
}

names = make([]string, len(itemList.Items))
pods = make(map[string]*podInfo)
keys = make([]resourceKey, len(itemList.Items))
pods = make(map[resourceKey]*podInfo)
for i, item := range itemList.Items {
if item.Status.Phase != "Running" {
continue
}
names[i] = item.Metadata.Name
pods[item.Metadata.Name] = item
keys[i] = resourceKey{item.Metadata.Namespace, item.Metadata.Name}
pods[keys[i]] = item
}

return
Expand All @@ -119,16 +119,16 @@ func (pl *podsLister) expand() {
pl.l.Warningf("podsLister.expand(): error while getting pods list from API: %v", err)
}

names, pods, err := parsePodsJSON(resp)
keys, pods, err := parsePodsJSON(resp)
if err != nil {
pl.l.Warningf("podsLister.expand(): error while parsing pods API response (%s): %v", string(resp), err)
}

pl.l.Infof("podsLister.expand(): got %d pods", len(names))
pl.l.Infof("podsLister.expand(): got %d pods", len(keys))

pl.mu.Lock()
defer pl.mu.Unlock()
pl.names = names
pl.keys = keys
pl.cache = pods
}

Expand Down
73 changes: 44 additions & 29 deletions rds/kubernetes/pods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,26 +24,33 @@ import (
)

func testPodInfo(name, ns, ip string, labels map[string]string) *podInfo {
labels["namespace"] = ns
pi := &podInfo{Metadata: kMetadata{Name: name, Namespace: ns, Labels: labels}}
pi.Status.PodIP = ip
return pi
}

func TestListResources(t *testing.T) {
pl := &podsLister{}
pl.names = []string{"podA", "podB", "podC"}
pl.cache = map[string]*podInfo{
"podA": testPodInfo("podA", "nsAB", "10.1.1.1", map[string]string{"app": "appA"}),
"podB": testPodInfo("podB", "nsAB", "10.1.1.2", map[string]string{"app": "appB"}),
"podC": testPodInfo("podC", "nsC", "10.1.1.3", map[string]string{"app": "appC", "func": "web"}),
pl := &podsLister{
cache: make(map[resourceKey]*podInfo),
}
for _, pi := range []*podInfo{
testPodInfo("podA", "nsAB", "10.1.1.1", map[string]string{"app": "appA"}),
testPodInfo("podB", "nsAB", "10.1.1.2", map[string]string{"app": "appB"}),
testPodInfo("podC", "nsC", "10.1.1.3", map[string]string{"app": "appC", "func": "web"}),
testPodInfo("podC", "devC", "10.2.1.3", map[string]string{"app": "appC", "func": "web"}),
} {
key := resourceKey{pi.Metadata.Namespace, pi.Metadata.Name}
pl.keys = append(pl.keys, key)
pl.cache[key] = pi
}

tests := []struct {
desc string
nameFilter string
filters map[string]string
labelsFilter map[string]string
wantPods []string
wantPods []resourceKey
wantErr bool
}{
{
Expand All @@ -54,17 +61,22 @@ func TestListResources(t *testing.T) {
{
desc: "only name filter for podB and podC",
filters: map[string]string{"name": "pod(B|C)"},
wantPods: []string{"podB", "podC"},
wantPods: []resourceKey{{"nsAB", "podB"}, {"nsC", "podC"}, {"devC", "podC"}},
},
{
desc: "name filter for podB and podC, and namespace filter",
filters: map[string]string{"name": "pod(B|C)", "namespace": "ns.*"},
wantPods: []resourceKey{{"nsAB", "podB"}, {"nsC", "podC"}},
},
{
desc: "name and namespace filter for podB",
filters: map[string]string{"name": "pod(B|C)", "namespace": "nsAB"},
wantPods: []string{"podB"},
wantPods: []resourceKey{{"nsAB", "podB"}},
},
{
desc: "only namespace filter for podA and podB",
filters: map[string]string{"namespace": "nsAB"},
wantPods: []string{"podA", "podB"},
wantPods: []resourceKey{{"nsAB", "podA"}, {"nsAB", "podB"}},
},
}

Expand All @@ -83,13 +95,13 @@ func TestListResources(t *testing.T) {
return
}

var gotNames []string
var gotPods []resourceKey
for _, res := range results {
gotNames = append(gotNames, res.GetName())
gotPods = append(gotPods, resourceKey{res.GetLabels()["namespace"], res.GetName()})
}

if !reflect.DeepEqual(gotNames, test.wantPods) {
t.Errorf("pods.listResources: got=%v, expected=%v", gotNames, test.wantPods)
if !reflect.DeepEqual(gotPods, test.wantPods) {
t.Errorf("pods.listResources: got=%v, expected=%v", gotPods, test.wantPods)
}
})
}
Expand All @@ -102,28 +114,31 @@ func TestParseResourceList(t *testing.T) {
if err != nil {
t.Fatalf("error reading test data file: %s", podsListFile)
}
_, podsByName, err := parsePodsJSON(data)
_, podsByKey, err := parsePodsJSON(data)

if err != nil {
t.Fatalf("Error while parsing pods JSON data: %v", err)
}

cpPod := "cloudprober-54778d95f5-7hqtd"
if podsByName[cpPod] == nil {
t.Errorf("didn't get pod by the name: %s", cpPod)
for _, ns := range []string{"prod", "dev"} {
cpPodKey := resourceKey{ns, "cloudprober-54778d95f5-7hqtd"}
if podsByKey[cpPodKey] == nil {
t.Errorf("didn't get pod by the key: %+v", cpPodKey)
continue
}

if podsByKey[cpPodKey].Metadata.Labels["app"] != "cloudprober" {
t.Errorf("cloudprober pod app label: got=%s, want=cloudprober", podsByKey[cpPodKey].Metadata.Labels["app"])
}

cpPodIP := "10.28.0.3"
if podsByKey[cpPodKey].Status.PodIP != cpPodIP {
t.Errorf("cloudprober pod ip: got=%s, want=%s", podsByKey[cpPodKey].Status.PodIP, cpPodIP)
}
}

// Verify that we got the pending pod.
if podsByName["test"] != nil {
// Verify that we didn't the pending pod.
if podsByKey[resourceKey{"default", "test"}] != nil {
t.Error("got a non-running pod in the list: test")
}

if podsByName[cpPod].Metadata.Labels["app"] != "cloudprober" {
t.Errorf("cloudprober pod app label: got=%s, want=cloudprober", podsByName[cpPod].Metadata.Labels["app"])
}

cpPodIP := "10.28.0.3"
if podsByName[cpPod].Status.PodIP != cpPodIP {
t.Errorf("cloudprober pod ip: got=%s, want=%s", podsByName[cpPod].Status.PodIP, cpPodIP)
}
}
Loading