6c5b0c86e8a554b3ac2eae294791a6661b76445b
[dpdk.git] / lib / librte_distributor / rte_distributor.c
1 /* SPDX-License-Identifier: BSD-3-Clause
2  * Copyright(c) 2017 Intel Corporation
3  */
4
5 #include <stdio.h>
6 #include <sys/queue.h>
7 #include <string.h>
8 #include <rte_mbuf.h>
9 #include <rte_memory.h>
10 #include <rte_cycles.h>
11 #include <rte_function_versioning.h>
12 #include <rte_memzone.h>
13 #include <rte_errno.h>
14 #include <rte_string_fns.h>
15 #include <rte_eal_memconfig.h>
16 #include <rte_pause.h>
17 #include <rte_tailq.h>
18
19 #include "rte_distributor.h"
20 #include "rte_distributor_single.h"
21 #include "distributor_private.h"
22
23 TAILQ_HEAD(rte_dist_burst_list, rte_distributor);
24
25 static struct rte_tailq_elem rte_dist_burst_tailq = {
26         .name = "RTE_DIST_BURST",
27 };
28 EAL_REGISTER_TAILQ(rte_dist_burst_tailq)
29
30 /**** APIs called by workers ****/
31
32 /**** Burst Packet APIs called by workers ****/
33
34 void
35 rte_distributor_request_pkt(struct rte_distributor *d,
36                 unsigned int worker_id, struct rte_mbuf **oldpkt,
37                 unsigned int count)
38 {
39         struct rte_distributor_buffer *buf = &(d->bufs[worker_id]);
40         unsigned int i;
41
42         volatile int64_t *retptr64;
43
44         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
45                 rte_distributor_request_pkt_single(d->d_single,
46                         worker_id, oldpkt[0]);
47                 return;
48         }
49
50         retptr64 = &(buf->retptr64[0]);
51         /* Spin while handshake bits are set (scheduler clears it).
52          * Sync with worker on GET_BUF flag.
53          */
54         while (unlikely(__atomic_load_n(retptr64, __ATOMIC_ACQUIRE)
55                         & RTE_DISTRIB_GET_BUF)) {
56                 rte_pause();
57                 uint64_t t = rte_rdtsc()+100;
58
59                 while (rte_rdtsc() < t)
60                         rte_pause();
61         }
62
63         /*
64          * OK, if we've got here, then the scheduler has just cleared the
65          * handshake bits. Populate the retptrs with returning packets.
66          */
67
68         for (i = count; i < RTE_DIST_BURST_SIZE; i++)
69                 buf->retptr64[i] = 0;
70
71         /* Set Return bit for each packet returned */
72         for (i = count; i-- > 0; )
73                 buf->retptr64[i] =
74                         (((int64_t)(uintptr_t)(oldpkt[i])) <<
75                         RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
76
77         /*
78          * Finally, set the GET_BUF  to signal to distributor that cache
79          * line is ready for processing
80          * Sync with distributor to release retptrs
81          */
82         __atomic_store_n(retptr64, *retptr64 | RTE_DISTRIB_GET_BUF,
83                         __ATOMIC_RELEASE);
84 }
85
86 int
87 rte_distributor_poll_pkt(struct rte_distributor *d,
88                 unsigned int worker_id, struct rte_mbuf **pkts)
89 {
90         struct rte_distributor_buffer *buf = &d->bufs[worker_id];
91         uint64_t ret;
92         int count = 0;
93         unsigned int i;
94
95         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
96                 pkts[0] = rte_distributor_poll_pkt_single(d->d_single,
97                         worker_id);
98                 return (pkts[0]) ? 1 : 0;
99         }
100
101         /* If bit is set, return
102          * Sync with distributor to acquire bufptrs
103          */
104         if (__atomic_load_n(&(buf->bufptr64[0]), __ATOMIC_ACQUIRE)
105                 & RTE_DISTRIB_GET_BUF)
106                 return -1;
107
108         /* since bufptr64 is signed, this should be an arithmetic shift */
109         for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
110                 if (likely(buf->bufptr64[i] & RTE_DISTRIB_VALID_BUF)) {
111                         ret = buf->bufptr64[i] >> RTE_DISTRIB_FLAG_BITS;
112                         pkts[count++] = (struct rte_mbuf *)((uintptr_t)(ret));
113                 }
114         }
115
116         /*
117          * so now we've got the contents of the cacheline into an  array of
118          * mbuf pointers, so toggle the bit so scheduler can start working
119          * on the next cacheline while we're working.
120          * Sync with distributor on GET_BUF flag. Release bufptrs.
121          */
122         __atomic_store_n(&(buf->bufptr64[0]),
123                 buf->bufptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
124
125         return count;
126 }
127
128 int
129 rte_distributor_get_pkt(struct rte_distributor *d,
130                 unsigned int worker_id, struct rte_mbuf **pkts,
131                 struct rte_mbuf **oldpkt, unsigned int return_count)
132 {
133         int count;
134
135         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
136                 if (return_count <= 1) {
137                         pkts[0] = rte_distributor_get_pkt_single(d->d_single,
138                                 worker_id, oldpkt[0]);
139                         return (pkts[0]) ? 1 : 0;
140                 } else
141                         return -EINVAL;
142         }
143
144         rte_distributor_request_pkt(d, worker_id, oldpkt, return_count);
145
146         count = rte_distributor_poll_pkt(d, worker_id, pkts);
147         while (count == -1) {
148                 uint64_t t = rte_rdtsc() + 100;
149
150                 while (rte_rdtsc() < t)
151                         rte_pause();
152
153                 count = rte_distributor_poll_pkt(d, worker_id, pkts);
154         }
155         return count;
156 }
157
158 int
159 rte_distributor_return_pkt(struct rte_distributor *d,
160                 unsigned int worker_id, struct rte_mbuf **oldpkt, int num)
161 {
162         struct rte_distributor_buffer *buf = &d->bufs[worker_id];
163         unsigned int i;
164
165         if (unlikely(d->alg_type == RTE_DIST_ALG_SINGLE)) {
166                 if (num == 1)
167                         return rte_distributor_return_pkt_single(d->d_single,
168                                 worker_id, oldpkt[0]);
169                 else
170                         return -EINVAL;
171         }
172
173         /* Sync with distributor to acquire retptrs */
174         __atomic_thread_fence(__ATOMIC_ACQUIRE);
175         for (i = 0; i < RTE_DIST_BURST_SIZE; i++)
176                 /* Switch off the return bit first */
177                 buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
178
179         for (i = num; i-- > 0; )
180                 buf->retptr64[i] = (((int64_t)(uintptr_t)oldpkt[i]) <<
181                         RTE_DISTRIB_FLAG_BITS) | RTE_DISTRIB_RETURN_BUF;
182
183         /* set the GET_BUF but even if we got no returns.
184          * Sync with distributor on GET_BUF flag. Release retptrs.
185          */
186         __atomic_store_n(&(buf->retptr64[0]),
187                 buf->retptr64[0] | RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
188
189         return 0;
190 }
191
192 /**** APIs called on distributor core ***/
193
194 /* stores a packet returned from a worker inside the returns array */
195 static inline void
196 store_return(uintptr_t oldbuf, struct rte_distributor *d,
197                 unsigned int *ret_start, unsigned int *ret_count)
198 {
199         if (!oldbuf)
200                 return;
201         /* store returns in a circular buffer */
202         d->returns.mbufs[(*ret_start + *ret_count) & RTE_DISTRIB_RETURNS_MASK]
203                         = (void *)oldbuf;
204         *ret_start += (*ret_count == RTE_DISTRIB_RETURNS_MASK);
205         *ret_count += (*ret_count != RTE_DISTRIB_RETURNS_MASK);
206 }
207
208 /*
209  * Match then flow_ids (tags) of the incoming packets to the flow_ids
210  * of the inflight packets (both inflight on the workers and in each worker
211  * backlog). This will then allow us to pin those packets to the relevant
212  * workers to give us our atomic flow pinning.
213  */
214 void
215 find_match_scalar(struct rte_distributor *d,
216                         uint16_t *data_ptr,
217                         uint16_t *output_ptr)
218 {
219         struct rte_distributor_backlog *bl;
220         uint16_t i, j, w;
221
222         /*
223          * Function overview:
224          * 1. Loop through all worker ID's
225          * 2. Compare the current inflights to the incoming tags
226          * 3. Compare the current backlog to the incoming tags
227          * 4. Add any matches to the output
228          */
229
230         for (j = 0 ; j < RTE_DIST_BURST_SIZE; j++)
231                 output_ptr[j] = 0;
232
233         for (i = 0; i < d->num_workers; i++) {
234                 bl = &d->backlog[i];
235
236                 for (j = 0; j < RTE_DIST_BURST_SIZE ; j++)
237                         for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
238                                 if (d->in_flight_tags[i][j] == data_ptr[w]) {
239                                         output_ptr[j] = i+1;
240                                         break;
241                                 }
242                 for (j = 0; j < RTE_DIST_BURST_SIZE; j++)
243                         for (w = 0; w < RTE_DIST_BURST_SIZE; w++)
244                                 if (bl->tags[j] == data_ptr[w]) {
245                                         output_ptr[j] = i+1;
246                                         break;
247                                 }
248         }
249
250         /*
251          * At this stage, the output contains 8 16-bit values, with
252          * each non-zero value containing the worker ID on which the
253          * corresponding flow is pinned to.
254          */
255 }
256
257
258 /*
259  * When the handshake bits indicate that there are packets coming
260  * back from the worker, this function is called to copy and store
261  * the valid returned pointers (store_return).
262  */
263 static unsigned int
264 handle_returns(struct rte_distributor *d, unsigned int wkr)
265 {
266         struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
267         uintptr_t oldbuf;
268         unsigned int ret_start = d->returns.start,
269                         ret_count = d->returns.count;
270         unsigned int count = 0;
271         unsigned int i;
272
273         /* Sync on GET_BUF flag. Acquire retptrs. */
274         if (__atomic_load_n(&(buf->retptr64[0]), __ATOMIC_ACQUIRE)
275                 & RTE_DISTRIB_GET_BUF) {
276                 for (i = 0; i < RTE_DIST_BURST_SIZE; i++) {
277                         if (buf->retptr64[i] & RTE_DISTRIB_RETURN_BUF) {
278                                 oldbuf = ((uintptr_t)(buf->retptr64[i] >>
279                                         RTE_DISTRIB_FLAG_BITS));
280                                 /* store returns in a circular buffer */
281                                 store_return(oldbuf, d, &ret_start, &ret_count);
282                                 count++;
283                                 buf->retptr64[i] &= ~RTE_DISTRIB_RETURN_BUF;
284                         }
285                 }
286                 d->returns.start = ret_start;
287                 d->returns.count = ret_count;
288                 /* Clear for the worker to populate with more returns.
289                  * Sync with distributor on GET_BUF flag. Release retptrs.
290                  */
291                 __atomic_store_n(&(buf->retptr64[0]), 0, __ATOMIC_RELEASE);
292         }
293         return count;
294 }
295
296 /*
297  * This function releases a burst (cache line) to a worker.
298  * It is called from the process function when a cacheline is
299  * full to make room for more packets for that worker, or when
300  * all packets have been assigned to bursts and need to be flushed
301  * to the workers.
302  * It also needs to wait for any outstanding packets from the worker
303  * before sending out new packets.
304  */
305 static unsigned int
306 release(struct rte_distributor *d, unsigned int wkr)
307 {
308         struct rte_distributor_buffer *buf = &(d->bufs[wkr]);
309         unsigned int i;
310
311         /* Sync with worker on GET_BUF flag */
312         while (!(__atomic_load_n(&(d->bufs[wkr].bufptr64[0]), __ATOMIC_ACQUIRE)
313                 & RTE_DISTRIB_GET_BUF))
314                 rte_pause();
315
316         handle_returns(d, wkr);
317
318         buf->count = 0;
319
320         for (i = 0; i < d->backlog[wkr].count; i++) {
321                 d->bufs[wkr].bufptr64[i] = d->backlog[wkr].pkts[i] |
322                                 RTE_DISTRIB_GET_BUF | RTE_DISTRIB_VALID_BUF;
323                 d->in_flight_tags[wkr][i] = d->backlog[wkr].tags[i];
324         }
325         buf->count = i;
326         for ( ; i < RTE_DIST_BURST_SIZE ; i++) {
327                 buf->bufptr64[i] = RTE_DISTRIB_GET_BUF;
328                 d->in_flight_tags[wkr][i] = 0;
329         }
330
331         d->backlog[wkr].count = 0;
332
333         /* Clear the GET bit.
334          * Sync with worker on GET_BUF flag. Release bufptrs.
335          */
336         __atomic_store_n(&(buf->bufptr64[0]),
337                 buf->bufptr64[0] & ~RTE_DISTRIB_GET_BUF, __ATOMIC_RELEASE);
338         return  buf->count;
339
340 }
341
342
343 /* process a set of packets to distribute them to workers */
344 int
345 rte_distributor_process(struct rte_distributor *d,
346                 struct rte_mbuf **mbufs, unsigned int num_mbufs)
347 {
348         unsigned int next_idx = 0;
349         static unsigned int wkr;
350         struct rte_mbuf *next_mb = NULL;
351         int64_t next_value = 0;
352         uint16_t new_tag = 0;
353         uint16_t flows[RTE_DIST_BURST_SIZE] __rte_cache_aligned;
354         unsigned int i, j, w, wid;
355
356         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
357                 /* Call the old API */
358                 return rte_distributor_process_single(d->d_single,
359                         mbufs, num_mbufs);
360         }
361
362         if (unlikely(num_mbufs == 0)) {
363                 /* Flush out all non-full cache-lines to workers. */
364                 for (wid = 0 ; wid < d->num_workers; wid++) {
365                         /* Sync with worker on GET_BUF flag. */
366                         if (__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
367                                 __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF) {
368                                 release(d, wid);
369                                 handle_returns(d, wid);
370                         }
371                 }
372                 return 0;
373         }
374
375         while (next_idx < num_mbufs) {
376                 uint16_t matches[RTE_DIST_BURST_SIZE];
377                 unsigned int pkts;
378
379                 /* Sync with worker on GET_BUF flag. */
380                 if (__atomic_load_n(&(d->bufs[wkr].bufptr64[0]),
381                         __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF)
382                         d->bufs[wkr].count = 0;
383
384                 if ((num_mbufs - next_idx) < RTE_DIST_BURST_SIZE)
385                         pkts = num_mbufs - next_idx;
386                 else
387                         pkts = RTE_DIST_BURST_SIZE;
388
389                 for (i = 0; i < pkts; i++) {
390                         if (mbufs[next_idx + i]) {
391                                 /* flows have to be non-zero */
392                                 flows[i] = mbufs[next_idx + i]->hash.usr | 1;
393                         } else
394                                 flows[i] = 0;
395                 }
396                 for (; i < RTE_DIST_BURST_SIZE; i++)
397                         flows[i] = 0;
398
399                 switch (d->dist_match_fn) {
400                 case RTE_DIST_MATCH_VECTOR:
401                         find_match_vec(d, &flows[0], &matches[0]);
402                         break;
403                 default:
404                         find_match_scalar(d, &flows[0], &matches[0]);
405                 }
406
407                 /*
408                  * Matches array now contain the intended worker ID (+1) of
409                  * the incoming packets. Any zeroes need to be assigned
410                  * workers.
411                  */
412
413                 for (j = 0; j < pkts; j++) {
414
415                         next_mb = mbufs[next_idx++];
416                         next_value = (((int64_t)(uintptr_t)next_mb) <<
417                                         RTE_DISTRIB_FLAG_BITS);
418                         /*
419                          * User is advocated to set tag value for each
420                          * mbuf before calling rte_distributor_process.
421                          * User defined tags are used to identify flows,
422                          * or sessions.
423                          */
424                         /* flows MUST be non-zero */
425                         new_tag = (uint16_t)(next_mb->hash.usr) | 1;
426
427                         /*
428                          * Uncommenting the next line will cause the find_match
429                          * function to be optimized out, making this function
430                          * do parallel (non-atomic) distribution
431                          */
432                         /* matches[j] = 0; */
433
434                         if (matches[j]) {
435                                 struct rte_distributor_backlog *bl =
436                                                 &d->backlog[matches[j]-1];
437                                 if (unlikely(bl->count ==
438                                                 RTE_DIST_BURST_SIZE)) {
439                                         release(d, matches[j]-1);
440                                 }
441
442                                 /* Add to worker that already has flow */
443                                 unsigned int idx = bl->count++;
444
445                                 bl->tags[idx] = new_tag;
446                                 bl->pkts[idx] = next_value;
447
448                         } else {
449                                 struct rte_distributor_backlog *bl =
450                                                 &d->backlog[wkr];
451                                 if (unlikely(bl->count ==
452                                                 RTE_DIST_BURST_SIZE)) {
453                                         release(d, wkr);
454                                 }
455
456                                 /* Add to current worker worker */
457                                 unsigned int idx = bl->count++;
458
459                                 bl->tags[idx] = new_tag;
460                                 bl->pkts[idx] = next_value;
461                                 /*
462                                  * Now that we've just added an unpinned flow
463                                  * to a worker, we need to ensure that all
464                                  * other packets with that same flow will go
465                                  * to the same worker in this burst.
466                                  */
467                                 for (w = j; w < pkts; w++)
468                                         if (flows[w] == new_tag)
469                                                 matches[w] = wkr+1;
470                         }
471                 }
472                 wkr++;
473                 if (wkr >= d->num_workers)
474                         wkr = 0;
475         }
476
477         /* Flush out all non-full cache-lines to workers. */
478         for (wid = 0 ; wid < d->num_workers; wid++)
479                 /* Sync with worker on GET_BUF flag. */
480                 if ((__atomic_load_n(&(d->bufs[wid].bufptr64[0]),
481                         __ATOMIC_ACQUIRE) & RTE_DISTRIB_GET_BUF))
482                         release(d, wid);
483
484         return num_mbufs;
485 }
486
487 /* return to the caller, packets returned from workers */
488 int
489 rte_distributor_returned_pkts(struct rte_distributor *d,
490                 struct rte_mbuf **mbufs, unsigned int max_mbufs)
491 {
492         struct rte_distributor_returned_pkts *returns = &d->returns;
493         unsigned int retval = (max_mbufs < returns->count) ?
494                         max_mbufs : returns->count;
495         unsigned int i;
496
497         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
498                 /* Call the old API */
499                 return rte_distributor_returned_pkts_single(d->d_single,
500                                 mbufs, max_mbufs);
501         }
502
503         for (i = 0; i < retval; i++) {
504                 unsigned int idx = (returns->start + i) &
505                                 RTE_DISTRIB_RETURNS_MASK;
506
507                 mbufs[i] = returns->mbufs[idx];
508         }
509         returns->start += i;
510         returns->count -= i;
511
512         return retval;
513 }
514
515 /*
516  * Return the number of packets in-flight in a distributor, i.e. packets
517  * being worked on or queued up in a backlog.
518  */
519 static inline unsigned int
520 total_outstanding(const struct rte_distributor *d)
521 {
522         unsigned int wkr, total_outstanding = 0;
523
524         for (wkr = 0; wkr < d->num_workers; wkr++)
525                 total_outstanding += d->backlog[wkr].count;
526
527         return total_outstanding;
528 }
529
530 /*
531  * Flush the distributor, so that there are no outstanding packets in flight or
532  * queued up.
533  */
534 int
535 rte_distributor_flush(struct rte_distributor *d)
536 {
537         unsigned int flushed;
538         unsigned int wkr;
539
540         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
541                 /* Call the old API */
542                 return rte_distributor_flush_single(d->d_single);
543         }
544
545         flushed = total_outstanding(d);
546
547         while (total_outstanding(d) > 0)
548                 rte_distributor_process(d, NULL, 0);
549
550         /* wait 10ms to allow all worker drain the pkts */
551         rte_delay_us(10000);
552
553         /*
554          * Send empty burst to all workers to allow them to exit
555          * gracefully, should they need to.
556          */
557         rte_distributor_process(d, NULL, 0);
558
559         for (wkr = 0; wkr < d->num_workers; wkr++)
560                 handle_returns(d, wkr);
561
562         return flushed;
563 }
564
565 /* clears the internal returns array in the distributor */
566 void
567 rte_distributor_clear_returns(struct rte_distributor *d)
568 {
569         unsigned int wkr;
570
571         if (d->alg_type == RTE_DIST_ALG_SINGLE) {
572                 /* Call the old API */
573                 rte_distributor_clear_returns_single(d->d_single);
574                 return;
575         }
576
577         /* throw away returns, so workers can exit */
578         for (wkr = 0; wkr < d->num_workers; wkr++)
579                 /* Sync with worker. Release retptrs. */
580                 __atomic_store_n(&(d->bufs[wkr].retptr64[0]), 0,
581                                 __ATOMIC_RELEASE);
582 }
583
584 /* creates a distributor instance */
585 struct rte_distributor *
586 rte_distributor_create(const char *name,
587                 unsigned int socket_id,
588                 unsigned int num_workers,
589                 unsigned int alg_type)
590 {
591         struct rte_distributor *d;
592         struct rte_dist_burst_list *dist_burst_list;
593         char mz_name[RTE_MEMZONE_NAMESIZE];
594         const struct rte_memzone *mz;
595         unsigned int i;
596
597         /* TODO Reorganise function properly around RTE_DIST_ALG_SINGLE/BURST */
598
599         /* compilation-time checks */
600         RTE_BUILD_BUG_ON((sizeof(*d) & RTE_CACHE_LINE_MASK) != 0);
601         RTE_BUILD_BUG_ON((RTE_DISTRIB_MAX_WORKERS & 7) != 0);
602
603         if (name == NULL || num_workers >=
604                 (unsigned int)RTE_MIN(RTE_DISTRIB_MAX_WORKERS, RTE_MAX_LCORE)) {
605                 rte_errno = EINVAL;
606                 return NULL;
607         }
608
609         if (alg_type == RTE_DIST_ALG_SINGLE) {
610                 d = malloc(sizeof(struct rte_distributor));
611                 if (d == NULL) {
612                         rte_errno = ENOMEM;
613                         return NULL;
614                 }
615                 d->d_single = rte_distributor_create_single(name,
616                                 socket_id, num_workers);
617                 if (d->d_single == NULL) {
618                         free(d);
619                         /* rte_errno will have been set */
620                         return NULL;
621                 }
622                 d->alg_type = alg_type;
623                 return d;
624         }
625
626         snprintf(mz_name, sizeof(mz_name), RTE_DISTRIB_PREFIX"%s", name);
627         mz = rte_memzone_reserve(mz_name, sizeof(*d), socket_id, NO_FLAGS);
628         if (mz == NULL) {
629                 rte_errno = ENOMEM;
630                 return NULL;
631         }
632
633         d = mz->addr;
634         strlcpy(d->name, name, sizeof(d->name));
635         d->num_workers = num_workers;
636         d->alg_type = alg_type;
637
638         d->dist_match_fn = RTE_DIST_MATCH_SCALAR;
639 #if defined(RTE_ARCH_X86)
640         d->dist_match_fn = RTE_DIST_MATCH_VECTOR;
641 #endif
642
643         /*
644          * Set up the backlog tags so they're pointing at the second cache
645          * line for performance during flow matching
646          */
647         for (i = 0 ; i < num_workers ; i++)
648                 d->backlog[i].tags = &d->in_flight_tags[i][RTE_DIST_BURST_SIZE];
649
650         dist_burst_list = RTE_TAILQ_CAST(rte_dist_burst_tailq.head,
651                                           rte_dist_burst_list);
652
653
654         rte_mcfg_tailq_write_lock();
655         TAILQ_INSERT_TAIL(dist_burst_list, d, next);
656         rte_mcfg_tailq_write_unlock();
657
658         return d;
659 }